[spark] branch master updated (f1f856d5463 -> 96bac6c033b)

2023-10-12 Thread yangjie01
This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from f1f856d5463 [SPARK-45526][PYTHON][DOCS] Improve the example of 
DataFrameReader/Writer.options to take a dictionary
 add 96bac6c033b [SPARK-45508][CORE] Add 
"--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED" so Platform can access 
Cleaner on Java 9+

No new revisions were added by this update.

Summary of changes:
 common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java  | 7 ++-
 .../src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java   | 7 +++
 .../src/main/java/org/apache/spark/launcher/JavaModuleOptions.java | 1 +
 pom.xml| 1 +
 project/SparkBuild.scala   | 1 +
 5 files changed, 16 insertions(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-45526][PYTHON][DOCS] Improve the example of DataFrameReader/Writer.options to take a dictionary

2023-10-12 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f1f856d5463 [SPARK-45526][PYTHON][DOCS] Improve the example of 
DataFrameReader/Writer.options to take a dictionary
f1f856d5463 is described below

commit f1f856d546360d34ca1f7ee1ddc163381586b180
Author: Hyukjin Kwon 
AuthorDate: Fri Oct 13 14:23:09 2023 +0900

[SPARK-45526][PYTHON][DOCS] Improve the example of 
DataFrameReader/Writer.options to take a dictionary

### What changes were proposed in this pull request?

This PR proposes to add the example of DataFrameReader/Writer.options to 
take a dictionary.

### Why are the changes needed?

For users to know how to set options in a dictionary ay PySpark.

### Does this PR introduce _any_ user-facing change?

Yes, it describes an example for setting the options with a dictionary.

### How was this patch tested?

Existing doctests in this PR's CI.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #43357

Closes #43358 from HyukjinKwon/SPARK-45528.

Authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/readwriter.py   | 14 --
 python/pyspark/sql/streaming/readwriter.py | 10 ++
 2 files changed, 22 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index ea429a75e15..81977c9e8cc 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -220,7 +220,12 @@ class DataFrameReader(OptionUtils):
 
 Examples
 
->>> spark.read.option("key", "value")
+>>> spark.read.options(key="value")
+<...readwriter.DataFrameReader object ...>
+
+Specify options in a dictionary.
+
+>>> spark.read.options(**{"k1": "v1", "k2": "v2"})
 <...readwriter.DataFrameReader object ...>
 
 Specify the option 'nullValue' and 'header' with reading a CSV file.
@@ -1172,7 +1177,12 @@ class DataFrameWriter(OptionUtils):
 
 Examples
 
->>> spark.range(1).write.option("key", "value")
+>>> spark.range(1).write.options(key="value")
+<...readwriter.DataFrameWriter object ...>
+
+Specify options in a dictionary.
+
+>>> spark.range(1).write.options(**{"k1": "v1", "k2": "v2"})
 <...readwriter.DataFrameWriter object ...>
 
 Specify the option 'nullValue' and 'header' with writing a CSV file.
diff --git a/python/pyspark/sql/streaming/readwriter.py 
b/python/pyspark/sql/streaming/readwriter.py
index 2026651ce12..b0f01c06b2e 100644
--- a/python/pyspark/sql/streaming/readwriter.py
+++ b/python/pyspark/sql/streaming/readwriter.py
@@ -224,6 +224,11 @@ class DataStreamReader(OptionUtils):
 >>> spark.readStream.options(x="1", y=2)
 <...streaming.readwriter.DataStreamReader object ...>
 
+Specify options in a dictionary.
+
+>>> spark.readStream.options(**{"k1": "v1", "k2": "v2"})
+<...streaming.readwriter.DataStreamReader object ...>
+
 The example below specifies 'rowsPerSecond' and 'numPartitions' 
options to
 Rate source in order to generate 10 rows with 10 partitions every 
second.
 
@@ -943,6 +948,11 @@ class DataStreamWriter:
 >>> df.writeStream.option("x", 1)
 <...streaming.readwriter.DataStreamWriter object ...>
 
+Specify options in a dictionary.
+
+>>> df.writeStream.options(**{"k1": "v1", "k2": "v2"})
+<...streaming.readwriter.DataStreamWriter object ...>
+
 The example below specifies 'numRows' and 'truncate' options to 
Console source in order
 to print 3 rows for every batch without truncating the results.
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.5 updated: [SPARK-45498][CORE] Followup: Ignore task completion from old stage a…

2023-10-12 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new b5f3dc9e760 [SPARK-45498][CORE] Followup: Ignore task completion from 
old stage a…
b5f3dc9e760 is described below

commit b5f3dc9e76082a81357555ace0c489df97e6f81a
Author: mayurb 
AuthorDate: Fri Oct 13 10:17:56 2023 +0800

[SPARK-45498][CORE] Followup: Ignore task completion from old stage a…

### What changes were proposed in this pull request?
With [SPARK-45182](https://issues.apache.org/jira/browse/SPARK-45182), we 
added a fix for not letting laggard tasks of the older attempts of the 
indeterminate stage from marking the partition has completed in the map output 
tracker.

When a task is completed, the DAG scheduler also notifies all the task sets 
of the stage about that partition being completed. Tasksets would not schedule 
such tasks if they are not already scheduled. This is not correct for the 
indeterminate stage, since we want to re-run all the tasks on a re-attempt

### Why are the changes needed?
Since the partition is not completed by older attempts and the partition 
from the newer attempt also doesn't get scheduled, the stage will have to be 
rescheduled to complete that partition. Since the stage is indeterminate, all 
the partitions will be recomputed

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added check in existing unit test

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43326 from mayurdb/indeterminateFix.

Authored-by: mayurb 
Signed-off-by: Wenchen Fan 
(cherry picked from commit fb3b707bc1c875c14ff7c6e7a3f39b5c4b852c86)
Signed-off-by: Wenchen Fan 
---
 core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala   | 6 +++---
 .../test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala   | 5 -
 2 files changed, 7 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index d73bb633901..d8adaae19b9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1847,9 +1847,9 @@ private[spark] class DAGScheduler(
   case Success =>
 // An earlier attempt of a stage (which is zombie) may still have 
running tasks. If these
 // tasks complete, they still count and we can mark the corresponding 
partitions as
-// finished. Here we notify the task scheduler to skip running tasks 
for the same partition,
-// to save resource.
-if (task.stageAttemptId < stage.latestInfo.attemptNumber()) {
+// finished if the stage is determinate. Here we notify the task 
scheduler to skip running
+// tasks for the same partition to save resource.
+if (!stage.isIndeterminate && task.stageAttemptId < 
stage.latestInfo.attemptNumber()) {
   taskScheduler.notifyPartitionCompletion(stageId, task.partitionId)
 }
 
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index e351f8b95bb..9b7c5d5ace3 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -3169,13 +3169,16 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
   makeMapStatus("hostB",
 2)))
 
-// The second task of the  shuffle map stage 1 from 1st attempt succeeds
+// The second task of the shuffle map stage 1 from 1st attempt succeeds
 runEvent(makeCompletionEvent(
   taskSets(1).tasks(1),
   Success,
   makeMapStatus("hostC",
 2)))
 
+// Above task completion should not mark the partition 1 complete from 2nd 
attempt
+assert(!tasksMarkedAsCompleted.contains(taskSets(3).tasks(1)))
+
 // This task completion should get ignored and partition 1 should be 
missing
 // for shuffle map stage 1
 assert(mapOutputTracker.findMissingPartitions(shuffleId2) == Some(Seq(1)))


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-45498][CORE] Followup: Ignore task completion from old stage a…

2023-10-12 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new fb3b707bc1c [SPARK-45498][CORE] Followup: Ignore task completion from 
old stage a…
fb3b707bc1c is described below

commit fb3b707bc1c875c14ff7c6e7a3f39b5c4b852c86
Author: mayurb 
AuthorDate: Fri Oct 13 10:17:56 2023 +0800

[SPARK-45498][CORE] Followup: Ignore task completion from old stage a…

### What changes were proposed in this pull request?
With [SPARK-45182](https://issues.apache.org/jira/browse/SPARK-45182), we 
added a fix for not letting laggard tasks of the older attempts of the 
indeterminate stage from marking the partition has completed in the map output 
tracker.

When a task is completed, the DAG scheduler also notifies all the task sets 
of the stage about that partition being completed. Tasksets would not schedule 
such tasks if they are not already scheduled. This is not correct for the 
indeterminate stage, since we want to re-run all the tasks on a re-attempt

### Why are the changes needed?
Since the partition is not completed by older attempts and the partition 
from the newer attempt also doesn't get scheduled, the stage will have to be 
rescheduled to complete that partition. Since the stage is indeterminate, all 
the partitions will be recomputed

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added check in existing unit test

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43326 from mayurdb/indeterminateFix.

Authored-by: mayurb 
Signed-off-by: Wenchen Fan 
---
 core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala   | 6 +++---
 .../test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala   | 5 -
 2 files changed, 7 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index a456f91d4c9..07a71ebed08 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1847,9 +1847,9 @@ private[spark] class DAGScheduler(
   case Success =>
 // An earlier attempt of a stage (which is zombie) may still have 
running tasks. If these
 // tasks complete, they still count and we can mark the corresponding 
partitions as
-// finished. Here we notify the task scheduler to skip running tasks 
for the same partition,
-// to save resource.
-if (task.stageAttemptId < stage.latestInfo.attemptNumber()) {
+// finished if the stage is determinate. Here we notify the task 
scheduler to skip running
+// tasks for the same partition to save resource.
+if (!stage.isIndeterminate && task.stageAttemptId < 
stage.latestInfo.attemptNumber()) {
   taskScheduler.notifyPartitionCompletion(stageId, task.partitionId)
 }
 
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 7bb8f49e6bf..7691b98f620 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -3169,13 +3169,16 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
   makeMapStatus("hostB",
 2)))
 
-// The second task of the  shuffle map stage 1 from 1st attempt succeeds
+// The second task of the shuffle map stage 1 from 1st attempt succeeds
 runEvent(makeCompletionEvent(
   taskSets(1).tasks(1),
   Success,
   makeMapStatus("hostC",
 2)))
 
+// Above task completion should not mark the partition 1 complete from 2nd 
attempt
+assert(!tasksMarkedAsCompleted.contains(taskSets(3).tasks(1)))
+
 // This task completion should get ignored and partition 1 should be 
missing
 // for shuffle map stage 1
 assert(mapOutputTracker.findMissingPartitions(shuffleId2) == Some(Seq(1)))


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (f72b87b90be -> 12880c846b5)

2023-10-12 Thread ueshin
This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from f72b87b90be [SPARK-45515][CORE][SQL] Use enhanced `switch` expressions 
to replace the regular `switch` statement
 add 12880c846b5 [SPARK-45266][PYTHON][FOLLOWUP] Fix to resolve 
UnresolvedPolymorphicPythonUDTF when the table argument is specified as a named 
argument

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/tests/test_udtf.py  | 272 -
 .../spark/sql/catalyst/analysis/Analyzer.scala |   2 +
 2 files changed, 156 insertions(+), 118 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



Re: [PR] Add canonical links to the PySpark docs page for published docs [spark-website]

2023-10-12 Thread via GitHub


panbingkun commented on PR #482:
URL: https://github.com/apache/spark-website/pull/482#issuecomment-1760614768

   > @panbingkun thanks for doing this. However, I discovered that some of the 
canonical links generated are not a valid URL, for example: 
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.groupBy.html
 Is there a way to update this canonical link to the actual latest 
documentation for groupBy?
   
   Yes, I also noticed this, and the reason is not the issue of updating the 
logic,
   Because the location of the same document may change in different versions, 
such as:
   In version 3.1.1, the location of the `pyspark.sql.DataFrame.groupBy` file 
is: 
`https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.groupBy.html?highlight=groupby#pyspark.sql.DataFrame.groupBy`
   
   According to normal logic, `canonical link` should be: `https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.groupBy.html;
 />`
   But in the new version, this document has been moved to a different 
location: 
https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.groupBy.html?highlight=groupby#pyspark.sql.DataFrame.groupBy
   So, from the perspective of the old document, it seems that `canonical link` 
is incorrect.
   This issue will always exist. If the position of the document changes in the 
new version, the `canonical link` of the old document will need to be updated 
synchronously.
   Of course, we can do it manually, but what should we do if we update the 
document location later? This is a difficult problem.
   Additionally, there will be another issue, which is the disappearance of 
documents. How to handle this?
   Do not add `canonical link`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-45515][CORE][SQL] Use enhanced `switch` expressions to replace the regular `switch` statement

2023-10-12 Thread yangjie01
This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f72b87b90be [SPARK-45515][CORE][SQL] Use enhanced `switch` expressions 
to replace the regular `switch` statement
f72b87b90be is described below

commit f72b87b90bea137050e3e2edceaf962eb7924f13
Author: yangjie01 
AuthorDate: Fri Oct 13 08:49:21 2023 +0800

[SPARK-45515][CORE][SQL] Use enhanced `switch` expressions to replace the 
regular `switch` statement

### What changes were proposed in this pull request?
This pr use enhanced  `switch` Expressions to replace the regular `switch` 
statement in Spark Java code refer to [JEP 361](https://openjdk.org/jeps/361)

Example:

```java
double getPrice(String fruit) {
  switch (fruit) {
case "Apple":
  return 1.0;
case "Orange":
  return 1.5;
case "Mango":
  return 2.0;
default:
  throw new IllegalArgumentException();
   }
 }
```

Can be changed to

```java
double getPrice(String fruit) {
  return switch (fruit) {
case "Apple" -> 1.0;
case "Orange" -> 1.5;
case "Mango" -> 2.0;
default -> throw new IllegalArgumentException();
  };
}
```

This pr does not include parts of the `hive-thriftserver` module.

### Why are the changes needed?
Using `JEP 361: Switch Expressions` can bring the following benefits:

1. **More concise syntax**: `switch` can be used as an expression, not just 
a statement. This makes the code more concise and easier to read.

2. **Safer**: In `switch` expressions, if we forget the `break`, there will 
be no unexpected `fall-through` behavior. At the same time, the compiler will 
check whether all possible cases are covered. If not all cases are covered, the 
compiler will report an error.

3. **Easier to understand**: The new `switch` expression syntax is closer 
to our decision-making pattern in daily life, making the code easier to 
understand.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43349 from LuciferYang/jep-361.

Authored-by: yangjie01 
Signed-off-by: yangjie01 
---
 .../org/apache/spark/network/protocol/Message.java | 34 ++---
 .../spark/network/protocol/MessageDecoder.java | 59 ++
 .../network/server/BlockPushNonFatalFailure.java   | 16 +++---
 .../org/apache/spark/network/util/DBProvider.java  | 13 +++--
 .../org/apache/spark/network/util/NettyUtils.java  | 36 +
 .../apache/spark/network/RpcIntegrationSuite.java  | 18 +++
 .../org/apache/spark/network/StreamTestHelper.java | 24 -
 .../shuffle/protocol/BlockTransferMessage.java | 42 +++
 .../network/shuffle/ExternalBlockHandlerSuite.java | 34 ++---
 .../shuffle/ExternalShuffleIntegrationSuite.java   | 10 ++--
 .../network/yarn/YarnShuffleServiceMetrics.java| 17 ++-
 .../apache/spark/unsafe/UnsafeAlignedOffset.java   | 12 ++---
 .../catalyst/expressions/ExpressionImplUtils.java  | 23 -
 .../sql/connector/expressions/NullOrdering.java| 12 ++---
 .../sql/connector/expressions/SortDirection.java   | 12 ++---
 .../sql/connector/util/V2ExpressionSQLBuilder.java |  8 ++-
 .../datasources/parquet/ParquetColumnVector.java   | 12 ++---
 .../parquet/VectorizedRleValuesReader.java | 11 ++--
 18 files changed, 155 insertions(+), 238 deletions(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/protocol/Message.java
 
b/common/network-common/src/main/java/org/apache/spark/network/protocol/Message.java
index 12ebee8da96..0bcce788ec4 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/protocol/Message.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/protocol/Message.java
@@ -55,23 +55,23 @@ public interface Message extends Encodable {
 
 public static Type decode(ByteBuf buf) {
   byte id = buf.readByte();
-  switch (id) {
-case 0: return ChunkFetchRequest;
-case 1: return ChunkFetchSuccess;
-case 2: return ChunkFetchFailure;
-case 3: return RpcRequest;
-case 4: return RpcResponse;
-case 5: return RpcFailure;
-case 6: return StreamRequest;
-case 7: return StreamResponse;
-case 8: return StreamFailure;
-case 9: return OneWayMessage;
-case 10: return UploadStream;
-case 11: return MergedBlockMetaRequest;
-case 12: return MergedBlockMetaSuccess;
-case -1: throw new IllegalArgumentException("User type messages cannot 
be 

[spark] branch master updated: [SPARK-45521][ML] Avoid re-computation of nnz in `VectorAssembler`

2023-10-12 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 4df9fa24f56 [SPARK-45521][ML] Avoid re-computation of nnz in 
`VectorAssembler`
4df9fa24f56 is described below

commit 4df9fa24f56161f7aab08611fa32efc1a89a0ab2
Author: Ruifeng Zheng 
AuthorDate: Fri Oct 13 08:37:04 2023 +0800

[SPARK-45521][ML] Avoid re-computation of nnz in `VectorAssembler`

### What changes were proposed in this pull request?
1, add a new private `compressed` method with given `nnz`, since we can 
know it sometime;
2, minor change `Array.range(0, length)` -> `Iterator.range(0, length)` to 
avoid array creation;

### Why are the changes needed?
in `VectorAssembler`, the `nnz` if already known before vector 
construction, the scan to compute nnz can be skipped;

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #43353 from zhengruifeng/ml_vec_opt.

Authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
---
 .../src/main/scala/org/apache/spark/ml/linalg/Vectors.scala   | 5 +++--
 .../main/scala/org/apache/spark/ml/feature/VectorAssembler.scala  | 8 +---
 2 files changed, 8 insertions(+), 5 deletions(-)

diff --git 
a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala 
b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala
index 985f67fc3c3..827ca3f8b9d 100644
--- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala
+++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala
@@ -184,8 +184,9 @@ sealed trait Vector extends Serializable {
* Returns a vector in either dense or sparse format, whichever uses less 
storage.
*/
   @Since("2.0.0")
-  def compressed: Vector = {
-val nnz = numNonzeros
+  def compressed: Vector = compressed(numNonzeros)
+
+  private[ml] def compressed(nnz: Int): Vector = {
 // A dense vector needs 8 * size + 8 bytes, while a sparse vector needs 12 
* nnz + 20 bytes.
 if (1.5 * (nnz + 1.0) < size) {
   toSparseWithSize(nnz)
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
index 7bc5e56aaeb..761352e34a3 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
@@ -279,8 +279,8 @@ object VectorAssembler extends 
DefaultParamsReadable[VectorAssembler] {
 featureIndex += vec.size
   case null =>
 if (keepInvalid) {
-  val length: Int = lengths(inputColumnIndex)
-  Array.range(0, length).foreach { i =>
+  val length = lengths(inputColumnIndex)
+  Iterator.range(0, length).foreach { i =>
 indices += featureIndex + i
 values += Double.NaN
   }
@@ -295,6 +295,8 @@ object VectorAssembler extends 
DefaultParamsReadable[VectorAssembler] {
   case o =>
 throw new SparkException(s"$o of type ${o.getClass.getName} is not 
supported.")
 }
-Vectors.sparse(featureIndex, indices.result(), values.result()).compressed
+
+val (idxArray, valArray) = (indices.result(), values.result())
+Vectors.sparse(featureIndex, idxArray, 
valArray).compressed(idxArray.length)
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-45418][SQL][PYTHON][CONNECT] Change current_database() column alias to current_schema()

2023-10-12 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 12638b851f3 [SPARK-45418][SQL][PYTHON][CONNECT] Change 
current_database() column alias to current_schema()
12638b851f3 is described below

commit 12638b851f37832ed85b63374d7f83dfbb924cd6
Author: Michael Zhang 
AuthorDate: Fri Oct 13 08:21:01 2023 +0800

[SPARK-45418][SQL][PYTHON][CONNECT] Change current_database() column alias 
to current_schema()

### What changes were proposed in this pull request?

Change column alias for current_database() to current_schema.

### Why are the changes needed?

To better align with preferred usage of schema rather than database for 
three part namespace.

### Does this PR introduce _any_ user-facing change?

Yes, `current_database()` column alias is now `current_schema()`.

### How was this patch tested?

Unit tests pass.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #43235 from michaelzhan-db/SPARK-45418.

Authored-by: Michael Zhang 
Signed-off-by: Wenchen Fan 
---
 .../function_current_database.explain|  2 +-
 .../explain-results/function_current_schema.explain  |  2 +-
 python/pyspark/sql/functions.py  | 20 ++--
 .../apache/spark/sql/catalyst/expressions/misc.scala |  2 +-
 .../resources/sql-functions/sql-expression-schema.md |  4 ++--
 .../current_database_catalog.sql.out |  2 +-
 .../analyzer-results/sql-session-variables.sql.out   |  2 +-
 .../results/current_database_catalog.sql.out |  2 +-
 8 files changed, 18 insertions(+), 18 deletions(-)

diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/function_current_database.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/function_current_database.explain
index 93dfac524d9..481c0a478c8 100644
--- 
a/connector/connect/common/src/test/resources/query-tests/explain-results/function_current_database.explain
+++ 
b/connector/connect/common/src/test/resources/query-tests/explain-results/function_current_database.explain
@@ -1,2 +1,2 @@
-Project [current_database() AS current_database()#0]
+Project [current_schema() AS current_schema()#0]
 +- LocalRelation , [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/function_current_schema.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/function_current_schema.explain
index 93dfac524d9..481c0a478c8 100644
--- 
a/connector/connect/common/src/test/resources/query-tests/explain-results/function_current_schema.explain
+++ 
b/connector/connect/common/src/test/resources/query-tests/explain-results/function_current_schema.explain
@@ -1,2 +1,2 @@
-Project [current_database() AS current_database()#0]
+Project [current_schema() AS current_schema()#0]
 +- LocalRelation , [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 25958bdf15d..31e5884e9eb 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -8719,11 +8719,11 @@ def current_database() -> Column:
 Examples
 
 >>> spark.range(1).select(current_database()).show()
-+--+
-|current_database()|
-+--+
-|   default|
-+--+
+++
+|current_schema()|
+++
+| default|
+++
 """
 return _invoke_function("current_database")
 
@@ -8738,11 +8738,11 @@ def current_schema() -> Column:
 
 >>> import pyspark.sql.functions as sf
 >>> spark.range(1).select(sf.current_schema()).show()
-+--+
-|current_database()|
-+--+
-|   default|
-+--+
+++
+|current_schema()|
+++
+| default|
+++
 """
 return _invoke_function("current_schema")
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
index 4a54ccf4a31..60bf5c603d9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
@@ -189,7 +189,7 @@ object AssertTrue {
 case class CurrentDatabase() extends LeafExpression with Unevaluable {
   override def dataType: DataType = StringType
   override def nullable: Boolean = false
-  override def prettyName: String = "current_database"
+  

[spark] branch master updated: [SPARK-45505][PYTHON] Refactor analyzeInPython to make it reusable

2023-10-12 Thread ueshin
This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 280f6b33110d [SPARK-45505][PYTHON] Refactor analyzeInPython to make it 
reusable
280f6b33110d is described below

commit 280f6b33110d707ebee6fec6e5bafa45b45213ae
Author: allisonwang-db 
AuthorDate: Thu Oct 12 17:02:41 2023 -0700

[SPARK-45505][PYTHON] Refactor analyzeInPython to make it reusable

### What changes were proposed in this pull request?

Currently, the `analyzeInPython` method in UserDefinedPythonTableFunction 
object can starts a Python process in driver and run a Python function in the 
Python process. This PR aims to refactor this logic into a reusable runner 
class.

### Why are the changes needed?

To make the code more reusable.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43340 from allisonwang-db/spark-45505-refactor-analyze-in-py.

Authored-by: allisonwang-db 
Signed-off-by: Takuya UESHIN 
---
 python/pyspark/sql/worker/analyze_udtf.py  |   6 +-
 .../org/apache/spark/sql/internal/SQLConf.scala|  18 +-
 .../sql/execution/python/PythonPlannerRunner.scala | 177 
 .../python/UserDefinedPythonFunction.scala | 321 +++--
 4 files changed, 286 insertions(+), 236 deletions(-)

diff --git a/python/pyspark/sql/worker/analyze_udtf.py 
b/python/pyspark/sql/worker/analyze_udtf.py
index a6aa381eb14a..9e84b880fc96 100644
--- a/python/pyspark/sql/worker/analyze_udtf.py
+++ b/python/pyspark/sql/worker/analyze_udtf.py
@@ -98,14 +98,14 @@ def main(infile: IO, outfile: IO) -> None:
 """
 Runs the Python UDTF's `analyze` static method.
 
-This process will be invoked from 
`UserDefinedPythonTableFunction.analyzeInPython` in JVM
-and receive the Python UDTF and its arguments for the `analyze` static 
method,
+This process will be invoked from 
`UserDefinedPythonTableFunctionAnalyzeRunner.runInPython`
+in JVM and receive the Python UDTF and its arguments for the `analyze` 
static method,
 and call the `analyze` static method, and send back a AnalyzeResult as a 
result of the method.
 """
 try:
 check_python_version(infile)
 
-memory_limit_mb = 
int(os.environ.get("PYSPARK_UDTF_ANALYZER_MEMORY_MB", "-1"))
+memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", 
"-1"))
 setup_memory_limits(memory_limit_mb)
 
 setup_spark_files(infile)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 12ec9e911d31..000694f6f1bb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3008,14 +3008,14 @@ object SQLConf {
   .booleanConf
   .createWithDefault(false)
 
-  val PYTHON_TABLE_UDF_ANALYZER_MEMORY =
-buildConf("spark.sql.analyzer.pythonUDTF.analyzeInPython.memory")
-  .doc("The amount of memory to be allocated to PySpark for Python UDTF 
analyzer, in MiB " +
-"unless otherwise specified. If set, PySpark memory for Python UDTF 
analyzer will be " +
-"limited to this amount. If not set, Spark will not limit Python's " +
-"memory use and it is up to the application to avoid exceeding the 
overhead memory space " +
-"shared with other non-JVM processes.\nNote: Windows does not support 
resource limiting " +
-"and actual resource is not limited on MacOS.")
+  val PYTHON_PLANNER_EXEC_MEMORY =
+buildConf("spark.sql.planner.pythonExecution.memory")
+  .doc("Specifies the memory allocation for executing Python code in Spark 
driver, in MiB. " +
+"When set, it caps the memory for Python execution to the specified 
amount. " +
+"If not set, Spark will not limit Python's memory usage and it is up 
to the application " +
+"to avoid exceeding the overhead memory space shared with other 
non-JVM processes.\n" +
+"Note: Windows does not support resource limiting and actual resource 
is not limited " +
+"on MacOS.")
   .version("4.0.0")
   .bytesConf(ByteUnit.MiB)
   .createOptional
@@ -5157,7 +5157,7 @@ class SQLConf extends Serializable with Logging with 
SqlApiConf {
   def pysparkWorkerPythonExecutable: Option[String] =
 getConf(SQLConf.PYSPARK_WORKER_PYTHON_EXECUTABLE)
 
-  def pythonUDTFAnalyzerMemory: Option[Long] = 
getConf(PYTHON_TABLE_UDF_ANALYZER_MEMORY)
+  def pythonPlannerExecMemory: Option[Long] = 
getConf(PYTHON_PLANNER_EXEC_MEMORY)
 
   def 

[spark] branch master updated: [SPARK-45516][CONNECT] Include QueryContext in SparkThrowable proto message

2023-10-12 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new e720cce1813e [SPARK-45516][CONNECT] Include QueryContext in 
SparkThrowable proto message
e720cce1813e is described below

commit e720cce1813e384847d4ef0bac48a202b2e39848
Author: Yihong He 
AuthorDate: Fri Oct 13 08:36:51 2023 +0900

[SPARK-45516][CONNECT] Include QueryContext in SparkThrowable proto message

### What changes were proposed in this pull request?

- Include QueryContext in SparkThrowable proto message
- Reconstruct QueryContext for SparkThrowable exceptions on the client side

### Why are the changes needed?

- Better integration with the error framework

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

`build/sbt "connect-client-jvm/testOnly *ClientE2ETestSuite"`

### Was this patch authored or co-authored using generative AI tooling?

Closes #43352 from heyihong/SPARK-45516.

Lead-authored-by: Yihong He 
Co-authored-by: Yihong He 
Signed-off-by: Hyukjin Kwon 
---
 .../org/apache/spark/sql/ClientE2ETestSuite.scala  |  4 ++
 .../src/main/protobuf/spark/connect/base.proto | 28 -
 .../connect/client/GrpcExceptionConverter.scala| 50 
 .../spark/sql/connect/utils/ErrorUtils.scala   | 11 
 python/pyspark/sql/connect/proto/base_pb2.py   | 22 +++
 python/pyspark/sql/connect/proto/base_pb2.pyi  | 69 +-
 6 files changed, 159 insertions(+), 25 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index 6e0a04cf4eb4..04d284f2ec23 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -129,6 +129,10 @@ class ClientE2ETestSuite extends RemoteSparkSession with 
SQLHelper with PrivateM
 assert(!ex.messageParameters.isEmpty)
 assert(ex.getSqlState != null)
 assert(!ex.isInternalError)
+assert(ex.getQueryContext.length == 1)
+assert(ex.getQueryContext.head.startIndex() == 7)
+assert(ex.getQueryContext.head.stopIndex() == 7)
+assert(ex.getQueryContext.head.fragment() == "x")
 assert(
   ex.getStackTrace
 
.find(_.getClassName.contains("org.apache.spark.sql.catalyst.analysis.CheckAnalysis"))
diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/base.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
index 5b8858f40d26..273512272225 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/base.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
@@ -819,13 +819,39 @@ message FetchErrorDetailsResponse {
 int32 line_number = 4;
   }
 
+  // QueryContext defines the schema for the query context of a SparkThrowable.
+  // It helps users understand where the error occurs while executing queries.
+  message QueryContext {
+// The object type of the query which throws the exception.
+// If the exception is directly from the main query, it should be an empty 
string.
+// Otherwise, it should be the exact object type in upper case. For 
example, a "VIEW".
+string object_type = 1;
+
+// The object name of the query which throws the exception.
+// If the exception is directly from the main query, it should be an empty 
string.
+// Otherwise, it should be the object name. For example, a view name "V1".
+string object_name = 2;
+
+// The starting index in the query text which throws the exception. The 
index starts from 0.
+int32 start_index = 3;
+
+// The stopping index in the query which throws the exception. The index 
starts from 0.
+int32 stop_index = 4;
+
+// The corresponding fragment of the query which throws the exception.
+string fragment = 5;
+  }
+
   // SparkThrowable defines the schema for SparkThrowable exceptions.
   message SparkThrowable {
 // Succinct, human-readable, unique, and consistent representation of the 
error category.
 optional string error_class = 1;
 
-// message parameters for the error framework.
+// The message parameters for the error framework.
 map message_parameters = 2;
+
+// The query context of a SparkThrowable.
+repeated QueryContext query_contexts = 3;
   }
 
   // Error defines the schema for the representing exception.
diff --git 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
 

Re: [PR] Add canonical links to the PySpark docs page for published docs [spark-website]

2023-10-12 Thread via GitHub


allisonwang-db commented on PR #482:
URL: https://github.com/apache/spark-website/pull/482#issuecomment-1760125845

   @panbingkun thanks for doing this. However, I discovered that some of the 
canonical links generated are not a valid URL, for example: 
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.groupBy.html
   Is there a way to update this canonical link to the actual latest 
documentation for groupBy?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-45499][CORE][TESTS][FOLLOWUP] Use `ReferenceQueue#remove` instead of `ReferenceQueue#poll`

2023-10-12 Thread yangjie01
This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7663fdfa3e84 [SPARK-45499][CORE][TESTS][FOLLOWUP] Use 
`ReferenceQueue#remove` instead of `ReferenceQueue#poll`
7663fdfa3e84 is described below

commit 7663fdfa3e84d7231784c39e4d3445e6f2f079fd
Author: yangjie01 
AuthorDate: Fri Oct 13 01:24:01 2023 +0800

[SPARK-45499][CORE][TESTS][FOLLOWUP] Use `ReferenceQueue#remove` instead of 
`ReferenceQueue#poll`

### What changes were proposed in this pull request?
This pr replaces `refQueue.poll()` with `refQueue.remove()` in the test 
case `reference to sub iterator should not be available after completion` to 
ensure that a `PhantomReference` object can be retrieved from `refQueue`.

### Why are the changes needed?
https://github.com/apache/spark/pull/43325 replaces `Reference#isEnqueued` 
with `Reference#refersTo(null)` to eliminate the use of deprecated APIs.

However, there are some differences between `ref.isEnqueued` and 
`ref.refersTo(null)`.

- The `ref.isEnqueued` method is used to check whether this 
`PhantomReference` object has been added to its reference queue by the garbage 
collector. When the garbage collector decides to recycle an object, if this 
object has one or more `PhantomReference`, then these `PhantomReference` will 
be added to their reference queues. So, if `ref.isEnqueued` returns `true`, it 
means that this `PhantomReference` has been added to the reference queue, which 
means that the object it references h [...]

- The `ref.refersTo(null)` method is used to check whether this 
`PhantomReference` object refers to the specified object. In the current code, 
`ref.refersTo(null)` is used to check whether `ref` still refers to `sub`. If 
`ref.refersTo(null)` returns `true`, it means that `ref` no longer refers to 
`sub`, which means that `sub` might have been recycled by the garbage 
collector. But this does not mean that this `ref` has been added to the 
reference queue.

So we can see the following test failure in GA:

https://github.com/apache/spark/actions/runs/6484510414/job/17608536854

```
[info] - reference to sub iterator should not be available after completion 
*** FAILED *** (287 milliseconds)
[info]   null did not equal java.lang.ref.PhantomReference11e8f090 
(CompletionIteratorSuite.scala:67)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at 
org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at 
org.apache.spark.util.CompletionIteratorSuite.$anonfun$new$3(CompletionIteratorSuite.scala:67)
```

To solve this issue, this PR replaces `refQueue.poll()` with 
`refQueue.remove()` to allow for waiting until `ref` is put into `refQueue` and 
can be retrieved from `refQueue`.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43345 from LuciferYang/ref-remove.

Authored-by: yangjie01 
Signed-off-by: yangjie01 
---
 core/src/test/scala/org/apache/spark/util/CompletionIteratorSuite.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/core/src/test/scala/org/apache/spark/util/CompletionIteratorSuite.scala 
b/core/src/test/scala/org/apache/spark/util/CompletionIteratorSuite.scala
index 297e4fd53ab4..6153c2c74353 100644
--- a/core/src/test/scala/org/apache/spark/util/CompletionIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/CompletionIteratorSuite.scala
@@ -64,6 +64,6 @@ class CompletionIteratorSuite extends SparkFunSuite {
   }
 }
 assert(ref.refersTo(null))
-assert(refQueue.poll() === ref)
+assert(refQueue.remove(1000) === ref)
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-45501][CORE][SQL] Use pattern matching for type checking and conversion

2023-10-12 Thread yangjie01
This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b0576fff9b72 [SPARK-45501][CORE][SQL] Use pattern matching for type 
checking and conversion
b0576fff9b72 is described below

commit b0576fff9b72880cd81a9d22c044dec329bc67d0
Author: yangjie01 
AuthorDate: Thu Oct 12 23:53:06 2023 +0800

[SPARK-45501][CORE][SQL] Use pattern matching for type checking and 
conversion

### What changes were proposed in this pull request?
This pr change to use pattern matching for type checking and conversion 
instead of the explicit type casting statement in Java code.

The change refer to [JEP 394](https://openjdk.org/jeps/394), and this pr 
does not include parts of the `hive-thriftserver` module.

Example:

```java
  if (obj instanceof String) {
String str = (String) obj;
System.out.println(str);
  }
```
Can be replaced with
```java
  if (obj instanceof String str) {
System.out.println(str);
  }
```

### Why are the changes needed?
Using `JEP 394: Pattern Matching for instanceof` can bring the following 
benefits:

1. **Code conciseness**: By eliminating explicit type conversion and 
redundant variable declarations, the code becomes more concise and easy to read.
2. **Improved safety**: In the past, explicit type conversion was required, 
and if accidentally converted to the wrong type, a `ClassCastException` would 
be thrown at runtime. Now, as type checking and type conversion occur in the 
same step, such errors are no longer possible.
3. **Better semantics**: Previously, instanceof and type casting were two 
independent steps, which could lead to unclear code intentions. Now, these two 
steps are merged into one, making the intentions of the code clearer.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43327 from LuciferYang/jep-394.

Authored-by: yangjie01 
Signed-off-by: yangjie01 
---
 .../spark/util/kvstore/ArrayKeyIndexType.java  |  3 +--
 .../org/apache/spark/util/kvstore/CustomType1.java |  3 +--
 .../org/apache/spark/util/kvstore/CustomType2.java |  3 +--
 .../org/apache/spark/util/kvstore/IntKeyType.java  |  3 +--
 .../spark/network/protocol/ChunkFetchFailure.java  |  3 +--
 .../spark/network/protocol/ChunkFetchRequest.java  |  3 +--
 .../spark/network/protocol/ChunkFetchSuccess.java  |  3 +--
 .../network/protocol/MergedBlockMetaRequest.java   |  3 +--
 .../spark/network/protocol/MessageEncoder.java |  3 +--
 .../spark/network/protocol/OneWayMessage.java  |  3 +--
 .../apache/spark/network/protocol/RpcFailure.java  |  3 +--
 .../apache/spark/network/protocol/RpcRequest.java  |  3 +--
 .../apache/spark/network/protocol/RpcResponse.java |  3 +--
 .../spark/network/protocol/StreamChunkId.java  |  3 +--
 .../spark/network/protocol/StreamFailure.java  |  3 +--
 .../spark/network/protocol/StreamRequest.java  |  3 +--
 .../spark/network/protocol/StreamResponse.java |  3 +--
 .../spark/network/protocol/UploadStream.java   |  3 +--
 .../apache/spark/network/sasl/SparkSaslClient.java |  9 +++-
 .../apache/spark/network/sasl/SparkSaslServer.java | 12 --
 .../network/server/TransportChannelHandler.java|  3 +--
 .../network/shuffle/ExternalBlockHandler.java  |  3 +--
 .../network/shuffle/ShuffleTransportContext.java   |  6 ++---
 .../shuffle/protocol/BlockPushReturnCode.java  |  3 +--
 .../network/shuffle/protocol/BlocksRemoved.java|  3 +--
 .../shuffle/protocol/ExecutorShuffleInfo.java  |  3 +--
 .../shuffle/protocol/FinalizeShuffleMerge.java |  3 +--
 .../shuffle/protocol/GetLocalDirsForExecutors.java |  3 +--
 .../shuffle/protocol/LocalDirsForExecutors.java|  3 +--
 .../network/shuffle/protocol/MergeStatuses.java|  3 +--
 .../spark/network/shuffle/protocol/OpenBlocks.java |  3 +--
 .../network/shuffle/protocol/PushBlockStream.java  |  3 +--
 .../network/shuffle/protocol/RegisterExecutor.java |  3 +--
 .../network/shuffle/protocol/RemoveBlocks.java |  3 +--
 .../shuffle/protocol/RemoveShuffleMerge.java   |  3 +--
 .../network/shuffle/protocol/StreamHandle.java |  3 +--
 .../network/shuffle/protocol/UploadBlock.java  |  3 +--
 .../shuffle/protocol/UploadBlockStream.java|  3 +--
 .../network/yarn/YarnShuffleServiceMetrics.java|  9 +++-
 .../org/apache/spark/util/sketch/BitArray.java |  3 +--
 .../apache/spark/util/sketch/BloomFilterImpl.java  | 11 +++--
 .../spark/util/sketch/CountMinSketchImpl.java  |  8 ++-
 .../org/apache/spark/unsafe/types/UTF8String.java  |  6 ++---
 

[spark] branch master updated: [SPARK-45502][BUILD] Upgrade Kafka to 3.6.0

2023-10-12 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d1bd21a2a219 [SPARK-45502][BUILD] Upgrade Kafka to 3.6.0
d1bd21a2a219 is described below

commit d1bd21a2a219ebe6c5ac3fcb1e17db75af3c670c
Author: dengziming 
AuthorDate: Thu Oct 12 08:47:25 2023 -0700

[SPARK-45502][BUILD] Upgrade Kafka to 3.6.0

### What changes were proposed in this pull request?
Upgrade Apache Kafka from 3.4.1 to 3.6.0

### Why are the changes needed?

- https://downloads.apache.org/kafka/3.6.0/RELEASE_NOTES.html
- https://downloads.apache.org/kafka/3.5.1/RELEASE_NOTES.html
- https://archive.apache.org/dist/kafka/3.5.0/RELEASE_NOTES.html

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
GitHub CI.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43348 from dengziming/kafka-3.6.0.

Authored-by: dengziming 
Signed-off-by: Dongjoon Hyun 
---
 .../apache/spark/sql/kafka010/KafkaTestUtils.scala |  4 ++--
 .../spark/streaming/kafka010/KafkaRDDSuite.scala   | 16 --
 .../spark/streaming/kafka010/KafkaTestUtils.scala  |  4 ++--
 .../streaming/kafka010/mocks/MockScheduler.scala   | 25 +++---
 pom.xml|  2 +-
 5 files changed, 26 insertions(+), 25 deletions(-)

diff --git 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index c54afc6290b1..2b0c13ed443d 100644
--- 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -28,7 +28,6 @@ import scala.io.Source
 import scala.jdk.CollectionConverters._
 
 import com.google.common.io.Files
-import kafka.api.Request
 import kafka.server.{HostedPartition, KafkaConfig, KafkaServer}
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.zk.KafkaZkClient
@@ -40,6 +39,7 @@ import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.requests.FetchRequest
 import org.apache.kafka.common.security.auth.SecurityProtocol.{PLAINTEXT, 
SASL_PLAINTEXT}
 import org.apache.kafka.common.serialization.StringSerializer
 import org.apache.kafka.common.utils.SystemTime
@@ -597,7 +597,7 @@ class KafkaTestUtils(
 .getPartitionInfo(topic, partition) match {
   case Some(partitionState) =>
 zkClient.getLeaderForPartition(new TopicPartition(topic, 
partition)).isDefined &&
-  Request.isValidBrokerId(partitionState.leader) &&
+  FetchRequest.isValidBrokerId(partitionState.leader) &&
   !partitionState.replicas.isEmpty
 
   case _ =>
diff --git 
a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
 
b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
index 735ec2f7b448..ae941b1fddd5 100644
--- 
a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
+++ 
b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
@@ -24,12 +24,14 @@ import scala.concurrent.duration._
 import scala.jdk.CollectionConverters._
 import scala.util.Random
 
-import kafka.log.{CleanerConfig, LogCleaner, LogConfig, 
ProducerStateManagerConfig, UnifiedLog}
-import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
+import kafka.log.{LogCleaner, UnifiedLog}
+import kafka.server.BrokerTopicStats
 import kafka.utils.Pool
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords, 
SimpleRecord}
 import org.apache.kafka.common.serialization.StringDeserializer
+import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, 
LogDirFailureChannel, ProducerStateManagerConfig}
 import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
 
 import org.apache.spark._
@@ -90,13 +92,13 @@ class KafkaRDDSuite extends SparkFunSuite {
 val dir = new File(logDir, topic + "-" + partition)
 dir.mkdirs()
 val logProps = new ju.Properties()
-logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
-logProps.put(LogConfig.MinCleanableDirtyRatioProp, 
java.lang.Float.valueOf(0.1f))
+logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_COMPACT)
+logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, 

[spark] branch master updated: [SPARK-45132][SQL] Fix IDENTIFIER for function invocation

2023-10-12 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f0b2e6da5211 [SPARK-45132][SQL] Fix IDENTIFIER for function invocation
f0b2e6da5211 is described below

commit f0b2e6da52113802f64f7879f207064d3bdbc7b0
Author: srielau 
AuthorDate: Thu Oct 12 21:34:49 2023 +0800

[SPARK-45132][SQL] Fix IDENTIFIER for function invocation

### What changes were proposed in this pull request?

Due to a quirk in the parser, in some cases, IDENTIFIER()() 
is not properly recognized as a function invocation.

The change is to remove the explicit IDENTIFIER-clause rule in the function 
invocation grammar and instead recognize
IDENTIFIER() within visitFunctionCall.

### Why are the changes needed?

Function invocation support for IDENTIFIER is incomplete otherwise

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added new testcases to identifier-clause.sql

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #42888 from srielau/SPARK-45132.

Lead-authored-by: srielau 
Co-authored-by: Wenchen Fan 
Co-authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/parser/SqlBaseParser.g4 |  2 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala | 43 --
 .../analyzer-results/identifier-clause.sql.out | 28 --
 .../sql-tests/inputs/identifier-clause.sql |  3 +-
 .../sql-tests/results/identifier-clause.sql.out| 27 +-
 5 files changed, 77 insertions(+), 26 deletions(-)

diff --git 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 6a6d39e96ca2..77a9108e0632 100644
--- 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++ 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -967,7 +967,6 @@ primaryExpression
 | qualifiedName DOT ASTERISK   
#star
 | LEFT_PAREN namedExpression (COMMA namedExpression)+ RIGHT_PAREN  
#rowConstructor
 | LEFT_PAREN query RIGHT_PAREN 
#subqueryExpression
-| IDENTIFIER_KW LEFT_PAREN expression RIGHT_PAREN  
#identifierClause
 | functionName LEFT_PAREN (setQuantifier? argument+=functionArgument
(COMMA argument+=functionArgument)*)? RIGHT_PAREN
(FILTER LEFT_PAREN WHERE where=booleanExpression RIGHT_PAREN)?
@@ -1196,6 +1195,7 @@ qualifiedNameList
 
 functionName
 : IDENTIFIER_KW LEFT_PAREN expression RIGHT_PAREN
+| identFunc=IDENTIFIER_KW   // IDENTIFIER itself is also a valid function 
name.
 | qualifiedName
 | FILTER
 | LEFT
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index c2bc6e9eb65a..9abca8b95cf7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -2246,13 +2246,6 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
 }
   }
 
-  /**
-   * Create an expression for the IDENTIFIER() clause.
-   */
-  override def visitIdentifierClause(ctx: IdentifierClauseContext): Expression 
= withOrigin(ctx) {
-ExpressionWithUnresolvedIdentifier(expression(ctx.expression), 
UnresolvedAttribute(_))
-  }
-
   /**
* Create a (windowed) Function expression.
*/
@@ -2274,19 +2267,31 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
 val filter = Option(ctx.where).map(expression(_))
 val ignoreNulls =
   Option(ctx.nullsOption).map(_.getType == 
SqlBaseParser.IGNORE).getOrElse(false)
-val funcCtx = ctx.functionName
-val func = withFuncIdentClause(
-  funcCtx,
-  ident => UnresolvedFunction(ident, arguments, isDistinct, filter, 
ignoreNulls)
-)
 
-// Check if the function is evaluated in a windowed context.
-ctx.windowSpec match {
-  case spec: WindowRefContext =>
-UnresolvedWindowExpression(func, visitWindowRef(spec))
-  case spec: WindowDefContext =>
-WindowExpression(func, visitWindowDef(spec))
-  case _ => func
+// Is this an IDENTIFIER clause instead of a function call?
+if (ctx.functionName.identFunc != null &&
+  arguments.length == 1 && // One argument
+  ctx.setQuantifier == null && // No other clause
+  

[spark] branch branch-3.5 updated: [SPARK-45132][SQL] Fix IDENTIFIER for function invocation

2023-10-12 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 249533bcc8c7 [SPARK-45132][SQL] Fix IDENTIFIER for function invocation
249533bcc8c7 is described below

commit 249533bcc8c7fa7f578961ce21d4d7118565dfc1
Author: srielau 
AuthorDate: Thu Oct 12 21:34:49 2023 +0800

[SPARK-45132][SQL] Fix IDENTIFIER for function invocation

### What changes were proposed in this pull request?

Due to a quirk in the parser, in some cases, IDENTIFIER()() 
is not properly recognized as a function invocation.

The change is to remove the explicit IDENTIFIER-clause rule in the function 
invocation grammar and instead recognize
IDENTIFIER() within visitFunctionCall.

### Why are the changes needed?

Function invocation support for IDENTIFIER is incomplete otherwise

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added new testcases to identifier-clause.sql

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #42888 from srielau/SPARK-45132.

Lead-authored-by: srielau 
Co-authored-by: Wenchen Fan 
Co-authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
(cherry picked from commit f0b2e6da52113802f64f7879f207064d3bdbc7b0)
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/parser/SqlBaseParser.g4 |  2 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala | 43 --
 .../analyzer-results/identifier-clause.sql.out | 28 --
 .../sql-tests/inputs/identifier-clause.sql |  3 +-
 .../sql-tests/results/identifier-clause.sql.out| 27 +-
 5 files changed, 77 insertions(+), 26 deletions(-)

diff --git 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 85dbc499fbde..04128216be07 100644
--- 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++ 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -951,7 +951,6 @@ primaryExpression
 | qualifiedName DOT ASTERISK   
#star
 | LEFT_PAREN namedExpression (COMMA namedExpression)+ RIGHT_PAREN  
#rowConstructor
 | LEFT_PAREN query RIGHT_PAREN 
#subqueryExpression
-| IDENTIFIER_KW LEFT_PAREN expression RIGHT_PAREN  
#identifierClause
 | functionName LEFT_PAREN (setQuantifier? argument+=functionArgument
(COMMA argument+=functionArgument)*)? RIGHT_PAREN
(FILTER LEFT_PAREN WHERE where=booleanExpression RIGHT_PAREN)?
@@ -1176,6 +1175,7 @@ qualifiedNameList
 
 functionName
 : IDENTIFIER_KW LEFT_PAREN expression RIGHT_PAREN
+| identFunc=IDENTIFIER_KW   // IDENTIFIER itself is also a valid function 
name.
 | qualifiedName
 | FILTER
 | LEFT
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 83938632e534..b80ea8fddcfe 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -2223,13 +2223,6 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
 }
   }
 
-  /**
-   * Create an expression for the IDENTIFIER() clause.
-   */
-  override def visitIdentifierClause(ctx: IdentifierClauseContext): Expression 
= withOrigin(ctx) {
-ExpressionWithUnresolvedIdentifier(expression(ctx.expression), 
UnresolvedAttribute(_))
-  }
-
   /**
* Create a (windowed) Function expression.
*/
@@ -2251,19 +2244,31 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
 val filter = Option(ctx.where).map(expression(_))
 val ignoreNulls =
   Option(ctx.nullsOption).map(_.getType == 
SqlBaseParser.IGNORE).getOrElse(false)
-val funcCtx = ctx.functionName
-val func = withFuncIdentClause(
-  funcCtx,
-  ident => UnresolvedFunction(ident, arguments, isDistinct, filter, 
ignoreNulls)
-)
 
-// Check if the function is evaluated in a windowed context.
-ctx.windowSpec match {
-  case spec: WindowRefContext =>
-UnresolvedWindowExpression(func, visitWindowRef(spec))
-  case spec: WindowDefContext =>
-WindowExpression(func, visitWindowDef(spec))
-  case _ => func
+// Is this an IDENTIFIER clause instead of a function call?
+if (ctx.functionName.identFunc != null &&
+  

[spark] branch branch-3.4 updated: [SPARK-45433][SQL][3.4] Fix CSV/JSON schema inference when timestamps do not match specified timestampFormat

2023-10-12 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new f985d716e164 [SPARK-45433][SQL][3.4] Fix CSV/JSON schema inference 
when timestamps do not match specified timestampFormat
f985d716e164 is described below

commit f985d716e164885575ec7f36a7782694411da024
Author: Jia Fan 
AuthorDate: Thu Oct 12 17:09:48 2023 +0500

[SPARK-45433][SQL][3.4] Fix CSV/JSON schema inference when timestamps do 
not match specified timestampFormat

### What changes were proposed in this pull request?
This is a backport PR of #43243. Fix the bug of schema inference when 
timestamps do not match specified timestampFormat. Please check #43243 for 
detail.

### Why are the changes needed?
Fix schema inference bug on 3.4.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
add new test.

### Was this patch authored or co-authored using generative AI tooling?

Closes #43343 from Hisoka-X/backport-SPARK-45433-inference-schema.

Authored-by: Jia Fan 
Signed-off-by: Max Gekk 
---
 .../org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala |  8 ++--
 .../org/apache/spark/sql/catalyst/json/JsonInferSchema.scala   |  7 +--
 .../apache/spark/sql/catalyst/csv/CSVInferSchemaSuite.scala| 10 ++
 .../apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala  |  8 
 4 files changed, 29 insertions(+), 4 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
index 51586a0065e9..dd8ac3985f19 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.util.{DateFormatter, 
TimestampFormatter}
 import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.sql.types._
 
 class CSVInferSchema(val options: CSVOptions) extends Serializable {
@@ -202,8 +203,11 @@ class CSVInferSchema(val options: CSVOptions) extends 
Serializable {
 // We can only parse the value as TimestampNTZType if it does not have 
zone-offset or
 // time-zone component and can be parsed with the timestamp formatter.
 // Otherwise, it is likely to be a timestamp with timezone.
-if (timestampNTZFormatter.parseWithoutTimeZoneOptional(field, 
false).isDefined) {
-  SQLConf.get.timestampType
+val timestampType = SQLConf.get.timestampType
+if ((SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY ||
+timestampType == TimestampNTZType) &&
+timestampNTZFormatter.parseWithoutTimeZoneOptional(field, 
false).isDefined) {
+  timestampType
 } else {
   tryParseTimestamp(field)
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
index 5385afe8c935..7e4767750fd3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
@@ -148,11 +149,13 @@ private[sql] class JsonInferSchema(options: JSONOptions) 
extends Serializable {
   val bigDecimal = decimalParser(field)
 DecimalType(bigDecimal.precision, bigDecimal.scale)
 }
+val timestampType = SQLConf.get.timestampType
 if (options.prefersDecimal && decimalTry.isDefined) {
   decimalTry.get
-} else if (options.inferTimestamp &&
+} else if (options.inferTimestamp && 
(SQLConf.get.legacyTimeParserPolicy ==
+  LegacyBehaviorPolicy.LEGACY || timestampType == TimestampNTZType) &&
 timestampNTZFormatter.parseWithoutTimeZoneOptional(field, 
false).isDefined) {
-  SQLConf.get.timestampType
+  timestampType
 } else if (options.inferTimestamp &&
 timestampFormatter.parseOptional(field).isDefined) {
   TimestampType
diff --git 

[spark] branch master updated: [SPARK-43664][CONNECT][PS] Raise exception for `ps.sql` with Pandas-on-Spark object on Spark Connect

2023-10-12 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 470aaf32a43e [SPARK-43664][CONNECT][PS] Raise exception for `ps.sql` 
with Pandas-on-Spark object on Spark Connect
470aaf32a43e is described below

commit 470aaf32a43e3f778e28050df3b81ffd16cd7ff2
Author: Haejoon Lee 
AuthorDate: Thu Oct 12 19:56:53 2023 +0900

[SPARK-43664][CONNECT][PS] Raise exception for `ps.sql` with 
Pandas-on-Spark object on Spark Connect

### What changes were proposed in this pull request?

This PR proposes to raise proper exception for `ps.sql` with 
Pandas-on-Spark DataFrame on Spark Connect

### Why are the changes needed?

To improve error message

### Does this PR introduce _any_ user-facing change?

No API change, but it's error message improvement.
**Before**
```python
>>> psdf = ps.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
>>> ps.sql("SELECT {col}, {col2} FROM {tbl}", col=psdf.A, col2=psdf.B, 
tbl=psdf)
Traceback (most recent call last):
...
pyspark.errors.exceptions.connect.AnalysisException: 
[TABLE_OR_VIEW_NOT_FOUND] The table or view 
`_pandas_api_32aa6c7b33ac442bab790cfb49f65ca1` cannot be found. Verify the 
spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() 
output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF 
EXISTS.; line 1 pos 17;
'Project ['A, 'B]
+- 'UnresolvedRelation [_pandas_api_32aa6c7b33ac442bab790cfb49f65ca1], [], 
false

JVM stacktrace:
org.apache.spark.sql.catalyst.ExtendedAnalysisException: 
[TABLE_OR_VIEW_NOT_FOUND] The table or view 
`_pandas_api_32aa6c7b33ac442bab790cfb49f65ca1` cannot be found. Verify the 
spelling and correctness of the schema and catalog.
...
```

**After**
```python
>>> psdf = ps.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
>>> ps.sql("SELECT {col}, {col2} FROM {tbl}", col=psdf.A, col2=psdf.B, 
tbl=psdf)
Traceback (most recent call last):
...
pyspark.errors.exceptions.base.PySparkTypeError: [UNSUPPORTED_DATA_TYPE] 
Unsupported DataType `DataFrame`.
```

### How was this patch tested?

The existing CI should pass

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #43237 from itholic/SPARK-43664.

Lead-authored-by: Haejoon Lee 
Co-authored-by: Haejoon Lee <44108233+itho...@users.noreply.github.com>
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/pandas/sql_formatter.py | 13 +
 python/pyspark/pandas/tests/connect/test_parity_sql.py |  4 ++--
 2 files changed, 15 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/pandas/sql_formatter.py 
b/python/pyspark/pandas/sql_formatter.py
index 91c4f0b7d77b..9800037016c5 100644
--- a/python/pyspark/pandas/sql_formatter.py
+++ b/python/pyspark/pandas/sql_formatter.py
@@ -30,6 +30,8 @@ from pyspark.sql import SparkSession
 from pyspark.pandas.utils import default_session
 from pyspark.pandas.frame import DataFrame
 from pyspark.pandas.series import Series
+from pyspark.errors import PySparkTypeError
+from pyspark.sql.utils import is_remote
 
 
 __all__ = ["sql"]
@@ -59,6 +61,9 @@ def sql(
 
 Also the method can bind named parameters to SQL literals from `args`.
 
+.. note::
+pandas-on-Spark DataFrame is not supported for Spark Connect.
+
 Parameters
 --
 query : str
@@ -198,6 +203,14 @@ def sql(
 session = default_session()
 formatter = PandasSQLStringFormatter(session)
 try:
+# ps.DataFrame are not supported for Spark Connect currently.
+if is_remote():
+for obj in kwargs.values():
+if isinstance(obj, ps.DataFrame):
+raise PySparkTypeError(
+error_class="UNSUPPORTED_DATA_TYPE",
+message_parameters={"data_type": type(obj).__name__},
+)
 sdf = session.sql(formatter.format(query, **kwargs), args)
 finally:
 formatter.clear()
diff --git a/python/pyspark/pandas/tests/connect/test_parity_sql.py 
b/python/pyspark/pandas/tests/connect/test_parity_sql.py
index c042de6b9007..2e503cac07a8 100644
--- a/python/pyspark/pandas/tests/connect/test_parity_sql.py
+++ b/python/pyspark/pandas/tests/connect/test_parity_sql.py
@@ -22,11 +22,11 @@ from pyspark.testing.pandasutils import 
PandasOnSparkTestUtils
 
 
 class SQLParityTests(SQLTestsMixin, PandasOnSparkTestUtils, 
ReusedConnectTestCase):
-@unittest.skip("TODO(SPARK-43664): Fix TABLE_OR_VIEW_NOT_FOUND from 
SQLParityTests.")
+@unittest.skip("Test depends on temp view issue on 

[spark] branch master updated: [SPARK-45510][SQL] Replace `scala.collection.generic.Growable` to `scala.collection.mutable.Growable`

2023-10-12 Thread yangjie01
This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 385a2f5f0475 [SPARK-45510][SQL] Replace 
`scala.collection.generic.Growable` to `scala.collection.mutable.Growable`
385a2f5f0475 is described below

commit 385a2f5f0475ce63180abc7ebb7577e0214ca2fb
Author: Jia Fan 
AuthorDate: Thu Oct 12 16:48:36 2023 +0800

[SPARK-45510][SQL] Replace `scala.collection.generic.Growable` to 
`scala.collection.mutable.Growable`

### What changes were proposed in this pull request?
Since scala 2.13.0, `scala.collection.generic.Growable` marked as 
deprecated. This PR change it to `scala.collection.mutable.Growable`

### Why are the changes needed?
Remove deprecated api.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
exist test.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43347 from Hisoka-X/SPARK-45510-replace-growable.

Authored-by: Jia Fan 
Signed-off-by: yangjie01 
---
 .../org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
index 7bbc930ceab5..d0d4ca659057 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
@@ -17,8 +17,8 @@
 
 package org.apache.spark.sql.catalyst.expressions.aggregate
 
-import scala.collection.generic.Growable
 import scala.collection.mutable
+import scala.collection.mutable.Growable
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-45488][SQL] XML: Add support for value in 'rowTag' element

2023-10-12 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new e69752aa66c0 [SPARK-45488][SQL] XML: Add support for value in 'rowTag' 
element
e69752aa66c0 is described below

commit e69752aa66c09df843adaebe86a63cf799961292
Author: Shujing Yang 
AuthorDate: Thu Oct 12 16:17:37 2023 +0900

[SPARK-45488][SQL] XML: Add support for value in 'rowTag' element

### What changes were proposed in this pull request?

The following XML with rowTag 'book' will yield a schema with just "_id" 
column and not the value:

```
 Great Book 
```

Let's parse value as well. The scope of this PR is to keep the rowTag's 
behavior of `valueTag` consistent with the inner objects.

### Why are the changes needed?

The semantics for attributes and `valueTag` should be consistent

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43319 from shujingyang-db/rootlevel-valuetag.

Lead-authored-by: Shujing Yang 
Co-authored-by: Shujing Yang 
<135740748+shujingyang...@users.noreply.github.com>
Signed-off-by: Hyukjin Kwon 
---
 .../spark/sql/catalyst/xml/StaxXmlParser.scala | 17 -
 .../spark/sql/catalyst/xml/XmlInferSchema.scala| 16 +
 .../xml-resources/root-level-value-none.xml|  8 +++
 .../test-data/xml-resources/root-level-value.xml   |  9 +++
 .../sql/execution/datasources/xml/XmlSuite.scala   | 75 ++
 5 files changed, 123 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
index ac29e234e5f9..dcb760aca9d2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
@@ -103,7 +103,12 @@ class StaxXmlParser(
   }
   val parser = StaxXmlParserUtils.filteredReader(xml)
   val rootAttributes = StaxXmlParserUtils.gatherRootAttributes(parser)
-  Some(convertObject(parser, schema, options, rootAttributes))
+  // A structure object is an attribute-only element
+  // if it only consists of attributes and valueTags.
+  val isRootAttributesOnly = schema.fields.forall { f =>
+f.name == options.valueTag || 
f.name.startsWith(options.attributePrefix)
+  }
+  Some(convertObject(parser, schema, options, rootAttributes, 
isRootAttributesOnly))
 } catch {
   case e: SparkUpgradeException => throw e
   case e@(_: RuntimeException | _: XMLStreamException | _: 
MalformedInputException
@@ -305,7 +310,8 @@ class StaxXmlParser(
   parser: XMLEventReader,
   schema: StructType,
   options: XmlOptions,
-  rootAttributes: Array[Attribute] = Array.empty): InternalRow = {
+  rootAttributes: Array[Attribute] = Array.empty,
+  isRootAttributesOnly: Boolean = false): InternalRow = {
 val row = new Array[Any](schema.length)
 val nameToIndex = schema.map(_.name).zipWithIndex.toMap
 // If there are attributes, then we process them first.
@@ -371,6 +377,13 @@ class StaxXmlParser(
 badRecordException = badRecordException.orElse(Some(e))
 }
 
+case c: Characters if !c.isWhiteSpace && isRootAttributesOnly =>
+  nameToIndex.get(options.valueTag) match {
+case Some(index) =>
+  row(index) = convertTo(c.getData, schema(index).dataType, 
options)
+case None => // do nothing
+  }
+
 case _: EndElement =>
   shouldStop = StaxXmlParserUtils.checkEndElement(parser)
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala
index 3eabf4525b4e..8bddb8f5bd99 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala
@@ -219,12 +219,28 @@ private[sql] object XmlInferSchema {
   dataTypes += inferredType
   nameToDataType += (field -> dataTypes)
 
+case c: Characters if !c.isWhiteSpace =>
+  // This can be an attribute-only object
+  val valueTagType = inferFrom(c.getData, options)
+  nameToDataType += options.valueTag -> ArrayBuffer(valueTagType)
+
 case _: EndElement =>
   shouldStop = StaxXmlParserUtils.checkEndElement(parser)
 
 case _ => // do nothing
   }
 }
+// A structure object is an