[spark] branch master updated (13732922cca -> b828ff0bdba)

2023-08-10 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 13732922cca [SPARK-44763][SQL] Fix a bug of promoting string as double 
in binary arithmetic with interval
 add b828ff0bdba [SPARK-44771][INFRA] Remove 'sudo' in 'pip install' 
suggestions of dev scripts

No new revisions were added by this update.

Summary of changes:
 dev/create-release/releaseutils.py | 4 ++--
 dev/github_jira_sync.py| 2 +-
 dev/merge_spark_pr.py  | 2 +-
 3 files changed, 4 insertions(+), 4 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[GitHub] [spark-website] srowen closed pull request #471: Added IOMETE to powered by Spark docs

2023-08-10 Thread via GitHub


srowen closed pull request #471: Added IOMETE to powered by Spark docs
URL: https://github.com/apache/spark-website/pull/471


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark-website] branch asf-site updated: Added IOMETE to powered by Spark docs

2023-08-10 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/spark-website.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new 61c79d6c34 Added IOMETE to powered by Spark docs
61c79d6c34 is described below

commit 61c79d6c34c151586a2bb02be1d0c4d86627ce31
Author: Fuad Musayev 
AuthorDate: Thu Aug 10 21:27:35 2023 -0500

Added IOMETE to powered by Spark docs

Added IOMETE Data Lakehouse platform to Powered by Spark docs.

Author: Fuad Musayev 

Closes #471 from fmusayev/powered-by-iomete.
---
 powered-by.md| 1 +
 site/powered-by.html | 1 +
 2 files changed, 2 insertions(+)

diff --git a/powered-by.md b/powered-by.md
index 048108882b..8b2cfa4df1 100644
--- a/powered-by.md
+++ b/powered-by.md
@@ -131,6 +131,7 @@ and external data sources, driving holistic and actionable 
insights.
 - http://www.infoobjects.com;>InfoObjects
   - Award winning Big Data consulting company with focus on Spark and Hadoop
 - http://en.inspur.com;>Inspur
+- https://iomete.com;>IOMETE - IOMETE offers a modern Cloud-Prem 
Data Lakehouse platform, extending cloud-like experience to on-premise and 
private clouds. Utilizing Apache Spark as the query engine, we enable running 
Spark Jobs and ML applications on AWS, Azure, GCP, or On-Prem. Discover more at 
https://iomete.com;>IOMETE.
 - http://www.sehir.edu.tr/en/;>Istanbul Sehir University
 - http://www.kenshoo.com/;>Kenshoo
   - Digital marketing solutions and predictive media optimization
diff --git a/site/powered-by.html b/site/powered-by.html
index de8eb55ce2..aa07b10347 100644
--- a/site/powered-by.html
+++ b/site/powered-by.html
@@ -319,6 +319,7 @@ environments or on bare-metal infrastructures.
 
   
   http://en.inspur.com;>Inspur
+  https://iomete.com;>IOMETE - IOMETE offers a modern 
Cloud-Prem Data Lakehouse platform, extending cloud-like experience to 
on-premise and private clouds. Utilizing Apache Spark as the query engine, we 
enable running Spark Jobs and ML applications on AWS, Azure, GCP, or On-Prem. 
Discover more at https://iomete.com;>IOMETE.
   http://www.sehir.edu.tr/en/;>Istanbul Sehir University
   http://www.kenshoo.com/;>Kenshoo
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (88de5663cc8 -> 13732922cca)

2023-08-10 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 88de5663cc8 Revert "[SPARK-44760][INFRA] Fix list index out of range 
for JIRA resolution in merge_spark_pr"
 add 13732922cca [SPARK-44763][SQL] Fix a bug of promoting string as double 
in binary arithmetic with interval

No new revisions were added by this update.

Summary of changes:
 .../spark/sql/catalyst/analysis/TypeCoercion.scala  |  9 +++--
 .../spark/sql/catalyst/analysis/TypeCoercionSuite.scala | 17 +
 .../sql-tests/analyzer-results/interval.sql.out | 14 +++---
 .../test/resources/sql-tests/results/interval.sql.out   | 14 +++---
 .../test/scala/org/apache/spark/sql/SQLQuerySuite.scala |  6 ++
 5 files changed, 44 insertions(+), 16 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: Revert "[SPARK-44760][INFRA] Fix list index out of range for JIRA resolution in merge_spark_pr"

2023-08-10 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 88de5663cc8 Revert "[SPARK-44760][INFRA] Fix list index out of range 
for JIRA resolution in merge_spark_pr"
88de5663cc8 is described below

commit 88de5663cc8d93b1f50dce52e0519005aa1f1c60
Author: Hyukjin Kwon 
AuthorDate: Fri Aug 11 11:14:12 2023 +0900

Revert "[SPARK-44760][INFRA] Fix list index out of range for JIRA 
resolution in merge_spark_pr"

This reverts commit 3164ff51dd249670b8505a5ea8d361cf53c9db94.
---
 dev/merge_spark_pr.py | 37 +++--
 1 file changed, 15 insertions(+), 22 deletions(-)

diff --git a/dev/merge_spark_pr.py b/dev/merge_spark_pr.py
index 88792e77161..41b00b463f1 100755
--- a/dev/merge_spark_pr.py
+++ b/dev/merge_spark_pr.py
@@ -237,6 +237,15 @@ def cherry_pick(pr_num, merge_hash, default_branch):
 return pick_ref
 
 
+def fix_version_from_branch(branch, versions):
+# Note: Assumes this is a sorted (newest->oldest) list of un-released 
versions
+if branch == "master":
+return versions[0]
+else:
+branch_ver = branch.replace("branch-", "")
+return list(filter(lambda x: x.name.startswith(branch_ver), 
versions))[-1]
+
+
 def resolve_jira_issue(merge_branches, comment, default_jira_id=""):
 asf_jira = jira.client.JIRA(
 {"server": JIRA_API_BASE}, basic_auth=(JIRA_USERNAME, JIRA_PASSWORD)
@@ -271,30 +280,14 @@ def resolve_jira_issue(merge_branches, comment, 
default_jira_id=""):
 )
 
 versions = asf_jira.project_versions("SPARK")
-# Consider only x.y.z, unreleased, unarchived versions
-versions = [
-x
-for x in versions
-if not x.raw["released"] and not x.raw["archived"] and 
re.match(r"\d+\.\d+\.\d+", x.name)
-]
 versions = sorted(versions, key=lambda x: x.name, reverse=True)
+versions = list(filter(lambda x: x.raw["released"] is False, versions))
+# Consider only x.y.z versions
+versions = list(filter(lambda x: re.match(r"\d+\.\d+\.\d+", x.name), 
versions))
 
-default_fix_versions = []
-for b in merge_branches:
-if b == "master":
-default_fix_versions.append(versions[0])
-else:
-found = False
-for v in versions:
-if v.name.startswith(b.replace("branch-", "")):
-default_fix_versions.append(v)
-found = True
-if not found:
-print(
-"Target version for %s is not found on JIRA, it may be 
archived or "
-"not created. Skipping it." % b
-)
-
+default_fix_versions = list(
+map(lambda x: fix_version_from_branch(x, versions).name, 
merge_branches)
+)
 for v in default_fix_versions:
 # Handles the case where we have forked a release branch but not yet 
made the release.
 # In this case, if the PR is committed to the master branch and the 
release branch, we


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-43872][PS] Support `(DataFrame|Series).plot` with pandas 2.0.0 and above

2023-08-10 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 1c3f618ee38 [SPARK-43872][PS] Support `(DataFrame|Series).plot` with 
pandas 2.0.0 and above
1c3f618ee38 is described below

commit 1c3f618ee388e0830c74117b872144303f40cebf
Author: itholic 
AuthorDate: Fri Aug 11 11:10:14 2023 +0900

[SPARK-43872][PS] Support `(DataFrame|Series).plot` with pandas 2.0.0 and 
above

### What changes were proposed in this pull request?

This PR proposes to remove parameter `sort_columns` from 
`(DataFrame|Series).plot` to support pandas 2.0.0.

Also enabling the multiple plot tests:
- test_area_plot
- test_area_plot_stacked_false
- test_area_plot_y
- test_bar_plot
- test_bar_with_x_y
- test_barh_plot_with_x_y
- test_barh_plot
- test_line_plot
- test_pie_plot
- test_scatter_plot
- test_hist_plot
- test_kde_plot

### Why are the changes needed?

To support pandas 2.0.0 & match the behavior.

### Does this PR introduce _any_ user-facing change?

`sort_columns` will no longer available.

### How was this patch tested?

Closes #42390 from itholic/remove_sort_columns.

Lead-authored-by: itholic 
Co-authored-by: Haejoon Lee <44108233+itho...@users.noreply.github.com>
Signed-off-by: Hyukjin Kwon 
---
 .../source/migration_guide/pyspark_upgrade.rst |  1 +
 python/pyspark/pandas/plot/matplotlib.py   | 13 -
 .../tests/plot/test_frame_plot_matplotlib.py   | 56 --
 3 files changed, 1 insertion(+), 69 deletions(-)

diff --git a/python/docs/source/migration_guide/pyspark_upgrade.rst 
b/python/docs/source/migration_guide/pyspark_upgrade.rst
index 98630133e0c..36d073d4a70 100644
--- a/python/docs/source/migration_guide/pyspark_upgrade.rst
+++ b/python/docs/source/migration_guide/pyspark_upgrade.rst
@@ -35,6 +35,7 @@ Upgrading from PySpark 3.5 to 4.0
 * In Spark 4.0, ``include_start`` and ``include_end`` parameters from 
``DataFrame.between_time`` have been removed from pandas API on Spark, use 
``inclusive`` instead.
 * In Spark 4.0, ``include_start`` and ``include_end`` parameters from 
``Series.between_time`` have been removed from pandas API on Spark, use 
``inclusive`` instead.
 * In Spark 4.0, the various datetime attributes of ``DatetimeIndex`` (``day``, 
``month``, ``year`` etc.) are now ``int32`` instead of ``int64`` from pandas 
API on Spark.
+* In Spark 4.0, ``sort_columns`` parameter from ``DataFrame.plot`` and 
`Series.plot`` has been removed from pandas API on Spark.
 
 
 Upgrading from PySpark 3.3 to 3.4
diff --git a/python/pyspark/pandas/plot/matplotlib.py 
b/python/pyspark/pandas/plot/matplotlib.py
index 39e862bbae8..36cfc759f83 100644
--- a/python/pyspark/pandas/plot/matplotlib.py
+++ b/python/pyspark/pandas/plot/matplotlib.py
@@ -15,7 +15,6 @@
 # limitations under the License.
 #
 
-import warnings
 from distutils.version import LooseVersion
 
 import matplotlib as mat
@@ -750,7 +749,6 @@ def plot_frame(
 yerr=None,
 xerr=None,
 secondary_y=False,
-sort_columns=False,
 **kwds,
 ):
 """
@@ -836,11 +834,6 @@ def plot_frame(
 mark_right : boolean, default True
 When using a secondary_y axis, automatically mark the column
 labels with "(right)" in the legend
-sort_columns: bool, default is False
-When True, will sort values on plots.
-
-.. deprecated:: 3.4.0
-
 **kwds : keywords
 Options to pass to matplotlib plotting method
 
@@ -856,11 +849,6 @@ def plot_frame(
   for bar plot layout by `position` keyword.
   From 0 (left/bottom-end) to 1 (right/top-end). Default is 0.5 (center)
 """
-warnings.warn(
-"Argument `sort_columns` will be removed in 4.0.0.",
-FutureWarning,
-)
-
 return _plot(
 data,
 kind=kind,
@@ -891,7 +879,6 @@ def plot_frame(
 sharey=sharey,
 secondary_y=secondary_y,
 layout=layout,
-sort_columns=sort_columns,
 **kwds,
 )
 
diff --git a/python/pyspark/pandas/tests/plot/test_frame_plot_matplotlib.py 
b/python/pyspark/pandas/tests/plot/test_frame_plot_matplotlib.py
index a47968597b4..365d34b1f55 100644
--- a/python/pyspark/pandas/tests/plot/test_frame_plot_matplotlib.py
+++ b/python/pyspark/pandas/tests/plot/test_frame_plot_matplotlib.py
@@ -18,7 +18,6 @@
 import base64
 from io import BytesIO
 import unittest
-from distutils.version import LooseVersion
 
 import pandas as pd
 import numpy as np
@@ -79,11 +78,6 @@ class DataFramePlotMatplotlibTestsMixin:
 plt.close(ax.figure)
 return b64_data
 
-@unittest.skipIf(
-LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
-"TODO(SPARK-43641): Enable 

[spark] branch master updated: [SPARK-44705][PYTHON] Make PythonRunner single-threaded

2023-08-10 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8aaff558394 [SPARK-44705][PYTHON] Make PythonRunner single-threaded
8aaff558394 is described below

commit 8aaff55839493e80e3ce376f928c04aa8f31d18c
Author: Utkarsh 
AuthorDate: Fri Aug 11 10:34:05 2023 +0900

[SPARK-44705][PYTHON] Make PythonRunner single-threaded

### What changes were proposed in this pull request?
PythonRunner, a utility that executes Python UDFs in Spark, uses two 
threads in a producer-consumer model today. This multi-threading model is 
problematic and confusing as Spark's execution model within a task is commonly 
understood to be single-threaded.
More importantly, this departure of a double-threaded execution resulted in 
a series of customer issues involving [race 
conditions](https://issues.apache.org/jira/browse/SPARK-33277) and 
[deadlocks](https://issues.apache.org/jira/browse/SPARK-38677) between threads 
as the code was hard to reason about. There have been multiple attempts to 
reign in these issues, viz., [fix 
1](https://issues.apache.org/jira/browse/SPARK-22535), [fix 
2](https://github.com/apache/spark/pull/30177), [fix 3 [...]

 Current Execution Model in Spark for Python UDFs
For queries containing Python UDFs, the main Java task thread spins up a 
new writer thread to pipe data from the child Spark plan into the Python worker 
evaluating the UDF. The writer thread runs in a tight loop: evaluates the child 
Spark plan, and feeds the resulting output to the Python worker. The main task 
thread simultaneously consumes the Python UDF’s output and evaluates the parent 
Spark plan to produce the final result.
The I/O to/from the Python worker uses blocking Java Sockets necessitating 
the use of two threads, one responsible for input to the Python worker and the 
other for output. Without two threads, it is easy to run into a deadlock. For 
example, the task can block forever waiting for the output from the Python 
worker. The output will never arrive until the input is supplied to the Python 
worker, which is not possible as the task thread is blocked while waiting on 
output.

 Proposed Fix

The proposed fix is to move to the standard single-threaded execution model 
within a task, i.e., to do away with the writer thread. In addition to 
mitigating the crashes, the fix reduces the complexity of the existing code by 
doing away with many safety checks in place to track deadlocks in the 
double-threaded execution model.

In the new model, the main task thread alternates between consuming/feeding 
data to the Python worker using asynchronous I/O through Java’s 
[SocketChannel](https://docs.oracle.com/javase/7/docs/api/java/nio/channels/SocketChannel.html).
 See the `read()` method in the code below for approximately how this is 
achieved.

```
case class PythonUDFRunner {

  private var nextRow: Row = _
  private var endOfStream = false
  private var childHasNext = true
  private var buffer: ByteBuffer = _

  def hasNext(): Boolean = nextRow != null || {
 if (!endOfStream) {
   read(buffer)
   nextRow = deserialize(buffer)
   hasNext
 } else {
   false
 }
  }

  def next(): Row = {
 if (hasNext) {
   val outputRow = nextRow
   nextRow = null
   outputRow
 } else {
   null
 }
  }

  def read(buf: Array[Byte]): Row = {
var n = 0
while (n == 0) {
// Alternate between reading/writing to the Python worker using async 
I/O
if (pythonWorker.isReadable) {
  n = pythonWorker.read(buf)
}
if (pythonWorker.isWritable) {
  consumeChildPlanAndWriteDataToPythonWorker()
}
  }

  def consumeChildPlanAndWriteDataToPythonWorker(): Unit = {
  // Tracks whether the connection to the Python worker can be written 
to.
  var socketAcceptsInput = true
  while (socketAcceptsInput && (childHasNext || buffer.hasRemaining)) {
if (!buffer.hasRemaining && childHasNext) {
  // Consume data from the child and buffer it.
  writeToBuffer(childPlan.next(), buffer)
  childHasNext = childPlan.hasNext()
  if (!childHasNext) {
// Exhausted child plan’s output. Write a keyword to the Python 
worker signaling the end of data input.
writeToBuffer(endOfStream)
  }
}
// Try to write as much buffered data as possible to the Python 
worker.
while (buffer.hasRemaining && socketAcceptsInput) {
  val n = writeToPythonWorker(buffer)
  // `writeToPythonWorker()` returns 

[spark] branch branch-3.5 updated: [SPARK-44765][CONNECT] Simplify retries of ReleaseExecute

2023-08-10 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 98cd980f57e [SPARK-44765][CONNECT] Simplify retries of ReleaseExecute
98cd980f57e is described below

commit 98cd980f57e9e78e5a18288cc28f3f4653a02ebe
Author: Juliusz Sompolski 
AuthorDate: Fri Aug 11 10:28:25 2023 +0900

[SPARK-44765][CONNECT] Simplify retries of ReleaseExecute

### What changes were proposed in this pull request?

Simplify retries of ReleaseExecute in 
ExecutePlanResponseReattachableIterator.
Instead of chaining asynchronous calls, use the common retry loop logic 
from the asynchronous onError after first error.
This allows to reuse the common `retry` function instead of having to 
duplicate
```
case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < 
retryPolicy.maxRetries =>
  Thread.sleep(
(retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
  .pow(retryPolicy.backoffMultiplier, 
currentRetryNum)).toMillis)
```
logic.

This also brings this retries to be more similar to how it is in the python 
client.

### Why are the changes needed?

Code simplification.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Checked retries by printing `new Exception().printStackTrace` from the 
handler:
```
java.lang.Exception
at 
org.apache.spark.sql.connect.client.ExecutePlanResponseReattachableIterator$$anon$1.$anonfun$onError$1(ExecutePlanResponseReattachableIterator.scala:242)
at 
org.apache.spark.sql.connect.client.GrpcRetryHandler$.retry(GrpcRetryHandler.scala:169)
at 
org.apache.spark.sql.connect.client.ExecutePlanResponseReattachableIterator.org$apache$spark$sql$connect$client$ExecutePlanResponseReattachableIterator$$retry(ExecutePlanResponseReattachableIterator.scala:306)
at 
org.apache.spark.sql.connect.client.ExecutePlanResponseReattachableIterator$$anon$1.onError(ExecutePlanResponseReattachableIterator.scala:241)
at 
io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:491)
at 
io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:567)
at 
io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:71)
at 
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:735)
at 
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:716)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at 
io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
java.lang.Exception
at 
org.apache.spark.sql.connect.client.ExecutePlanResponseReattachableIterator$$anon$1.$anonfun$onError$1(ExecutePlanResponseReattachableIterator.scala:242)
at 
org.apache.spark.sql.connect.client.GrpcRetryHandler$.retry(GrpcRetryHandler.scala:169)
at 
org.apache.spark.sql.connect.client.ExecutePlanResponseReattachableIterator.org$apache$spark$sql$connect$client$ExecutePlanResponseReattachableIterator$$retry(ExecutePlanResponseReattachableIterator.scala:306)
at 
org.apache.spark.sql.connect.client.ExecutePlanResponseReattachableIterator$$anon$1.onError(ExecutePlanResponseReattachableIterator.scala:241)
at 
io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:491)
at 
io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:567)
at 
io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:71)
at 
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:735)
at 
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:716)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at 
io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
...
```

Closes #42438 from juliuszsompolski/SPARK-44765.

Authored-by: Juliusz Sompolski 
Signed-off-by: Hyukjin Kwon 

[spark] branch master updated (42eb4223628 -> 9bde882fcb3)

2023-08-10 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 42eb4223628 [SPARK-43032][FOLLOWUP][SS][CONNECT] StreamingQueryManager 
bug fix
 add 9bde882fcb3 [SPARK-44765][CONNECT] Simplify retries of ReleaseExecute

No new revisions were added by this update.

Summary of changes:
 .../ExecutePlanResponseReattachableIterator.scala  | 36 +-
 1 file changed, 21 insertions(+), 15 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.5 updated: [SPARK-43032][FOLLOWUP][SS][CONNECT] StreamingQueryManager bug fix

2023-08-10 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 1a3dce36e2a [SPARK-43032][FOLLOWUP][SS][CONNECT] StreamingQueryManager 
bug fix
1a3dce36e2a is described below

commit 1a3dce36e2ad8d0e0bb2e1123864764077320466
Author: Wei Liu 
AuthorDate: Fri Aug 11 10:23:56 2023 +0900

[SPARK-43032][FOLLOWUP][SS][CONNECT] StreamingQueryManager bug fix

When calling `spark.streams.get(q.id)` on a stopped query q. It should 
return None in python and null in scala client. But right now it throws a null 
pointer exception. This PR fixes this issue.

Bug fix

No

Added unit tests

Closes #42437 from WweiL/streaming-query-manager-get-bug-fix.

Authored-by: Wei Liu 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 42eb4223628653db71950f161a745432d1b45502)
Signed-off-by: Hyukjin Kwon 
---
 .../spark/sql/streaming/ClientStreamingQuerySuite.scala |  2 ++
 .../spark/sql/connect/planner/SparkConnectPlanner.scala |  5 +++--
 python/pyspark/sql/tests/streaming/test_streaming.py| 13 +
 3 files changed, 18 insertions(+), 2 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala
index f9e6e686495..ab92431bc11 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala
@@ -268,6 +268,8 @@ class ClientStreamingQuerySuite extends QueryTest with 
SQLHelper with Logging {
 
 q.stop()
 assert(!q1.isActive)
+
+assert(spark.streams.get(q.id) == null)
   }
 
   test("streaming query listener") {
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index d59d01b4ce3..49bac17a4f4 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -3059,8 +3059,9 @@ class SparkConnectPlanner(val sessionHolder: 
SessionHolder) extends Logging {
 .asJava)
 
   case StreamingQueryManagerCommand.CommandCase.GET_QUERY =>
-val query = session.streams.get(command.getGetQuery)
-respBuilder.setQuery(buildStreamingQueryInstance(query))
+Option(session.streams.get(command.getGetQuery)).foreach { q =>
+  respBuilder.setQuery(buildStreamingQueryInstance(q))
+}
 
   case StreamingQueryManagerCommand.CommandCase.AWAIT_ANY_TERMINATION =>
 if (command.getAwaitAnyTermination.hasTimeoutMs) {
diff --git a/python/pyspark/sql/tests/streaming/test_streaming.py 
b/python/pyspark/sql/tests/streaming/test_streaming.py
index 52fa19a8642..0eea86dc737 100644
--- a/python/pyspark/sql/tests/streaming/test_streaming.py
+++ b/python/pyspark/sql/tests/streaming/test_streaming.py
@@ -315,6 +315,19 @@ class StreamingTestsMixin:
 contains = msg in e.desc
 self.assertTrue(contains, "Exception tree doesn't contain the expected 
message: %s" % msg)
 
+def test_query_manager_get(self):
+df = self.spark.readStream.format("rate").load()
+for q in self.spark.streams.active:
+q.stop()
+q = df.writeStream.format("noop").start()
+
+self.assertTrue(q.isActive)
+self.assertTrue(q.id == self.spark.streams.get(q.id).id)
+
+q.stop()
+
+self.assertIsNone(self.spark.streams.get(q.id))
+
 def test_query_manager_await_termination(self):
 df = 
self.spark.readStream.format("text").load("python/test_support/sql/streaming")
 for q in self.spark.streams.active:


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (8fe50fcdb6a -> 42eb4223628)

2023-08-10 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 8fe50fcdb6a [SPARK-44766][PYTHON] Cache the pandas converter for reuse 
for Python UDTFs
 add 42eb4223628 [SPARK-43032][FOLLOWUP][SS][CONNECT] StreamingQueryManager 
bug fix

No new revisions were added by this update.

Summary of changes:
 .../spark/sql/streaming/ClientStreamingQuerySuite.scala |  2 ++
 .../spark/sql/connect/planner/SparkConnectPlanner.scala |  5 +++--
 python/pyspark/sql/tests/streaming/test_streaming.py| 13 +
 3 files changed, 18 insertions(+), 2 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.5 updated: [SPARK-44766][PYTHON] Cache the pandas converter for reuse for Python UDTFs

2023-08-10 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new ce0f215534b [SPARK-44766][PYTHON] Cache the pandas converter for reuse 
for Python UDTFs
ce0f215534b is described below

commit ce0f215534bf722821fd7821c9b7adfba2c16a37
Author: allisonwang-db 
AuthorDate: Fri Aug 11 10:23:01 2023 +0900

[SPARK-44766][PYTHON] Cache the pandas converter for reuse for Python UDTFs

### What changes were proposed in this pull request?

This PR caches the pandas converter for reuse when serializing the results 
from arrow-optimized Python UDTFs.

### Why are the changes needed?

To improve the performance

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests

Closes #42439 from allisonwang-db/spark-44766-cache-converter.

Authored-by: allisonwang-db 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 8fe50fcdb6a34b06c07c235f497b77cc5e245877)
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/pandas/serializers.py | 20 +---
 1 file changed, 13 insertions(+), 7 deletions(-)

diff --git a/python/pyspark/sql/pandas/serializers.py 
b/python/pyspark/sql/pandas/serializers.py
index d1a3babb1fd..2cc3db15c9c 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -499,6 +499,7 @@ class 
ArrowStreamPandasUDTFSerializer(ArrowStreamPandasUDFSerializer):
 # Enables explicit casting for mismatched return types of Arrow 
Python UDTFs.
 arrow_cast=True,
 )
+self._converter_map = dict()
 
 def _create_batch(self, series):
 """
@@ -538,6 +539,17 @@ class 
ArrowStreamPandasUDTFSerializer(ArrowStreamPandasUDFSerializer):
 
 return pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in 
range(len(arrs))])
 
+def _get_or_create_converter_from_pandas(self, dt):
+if dt not in self._converter_map:
+conv = _create_converter_from_pandas(
+dt,
+timezone=self._timezone,
+error_on_duplicated_field_names=False,
+ignore_unexpected_complex_type_values=True,
+)
+self._converter_map[dt] = conv
+return self._converter_map[dt]
+
 def _create_array(self, series, arrow_type, spark_type=None, 
arrow_cast=False):
 """
 Override the `_create_array` method in the superclass to create an 
Arrow Array
@@ -569,13 +581,7 @@ class 
ArrowStreamPandasUDTFSerializer(ArrowStreamPandasUDFSerializer):
 
 if arrow_type is not None:
 dt = spark_type or from_arrow_type(arrow_type, 
prefer_timestamp_ntz=True)
-# TODO(SPARK-43579): cache the converter for reuse
-conv = _create_converter_from_pandas(
-dt,
-timezone=self._timezone,
-error_on_duplicated_field_names=False,
-ignore_unexpected_complex_type_values=True,
-)
+conv = self._get_or_create_converter_from_pandas(dt)
 series = conv(series)
 
 if hasattr(series.array, "__arrow_array__"):


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-44766][PYTHON] Cache the pandas converter for reuse for Python UDTFs

2023-08-10 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8fe50fcdb6a [SPARK-44766][PYTHON] Cache the pandas converter for reuse 
for Python UDTFs
8fe50fcdb6a is described below

commit 8fe50fcdb6a34b06c07c235f497b77cc5e245877
Author: allisonwang-db 
AuthorDate: Fri Aug 11 10:23:01 2023 +0900

[SPARK-44766][PYTHON] Cache the pandas converter for reuse for Python UDTFs

### What changes were proposed in this pull request?

This PR caches the pandas converter for reuse when serializing the results 
from arrow-optimized Python UDTFs.

### Why are the changes needed?

To improve the performance

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests

Closes #42439 from allisonwang-db/spark-44766-cache-converter.

Authored-by: allisonwang-db 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/pandas/serializers.py | 20 +---
 1 file changed, 13 insertions(+), 7 deletions(-)

diff --git a/python/pyspark/sql/pandas/serializers.py 
b/python/pyspark/sql/pandas/serializers.py
index d1a3babb1fd..2cc3db15c9c 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -499,6 +499,7 @@ class 
ArrowStreamPandasUDTFSerializer(ArrowStreamPandasUDFSerializer):
 # Enables explicit casting for mismatched return types of Arrow 
Python UDTFs.
 arrow_cast=True,
 )
+self._converter_map = dict()
 
 def _create_batch(self, series):
 """
@@ -538,6 +539,17 @@ class 
ArrowStreamPandasUDTFSerializer(ArrowStreamPandasUDFSerializer):
 
 return pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in 
range(len(arrs))])
 
+def _get_or_create_converter_from_pandas(self, dt):
+if dt not in self._converter_map:
+conv = _create_converter_from_pandas(
+dt,
+timezone=self._timezone,
+error_on_duplicated_field_names=False,
+ignore_unexpected_complex_type_values=True,
+)
+self._converter_map[dt] = conv
+return self._converter_map[dt]
+
 def _create_array(self, series, arrow_type, spark_type=None, 
arrow_cast=False):
 """
 Override the `_create_array` method in the superclass to create an 
Arrow Array
@@ -569,13 +581,7 @@ class 
ArrowStreamPandasUDTFSerializer(ArrowStreamPandasUDFSerializer):
 
 if arrow_type is not None:
 dt = spark_type or from_arrow_type(arrow_type, 
prefer_timestamp_ntz=True)
-# TODO(SPARK-43579): cache the converter for reuse
-conv = _create_converter_from_pandas(
-dt,
-timezone=self._timezone,
-error_on_duplicated_field_names=False,
-ignore_unexpected_complex_type_values=True,
-)
+conv = self._get_or_create_converter_from_pandas(dt)
 series = conv(series)
 
 if hasattr(series.array, "__arrow_array__"):


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[GitHub] [spark-website] fmusayev opened a new pull request, #471: Added IOMETE to powered by Spark docs

2023-08-10 Thread via GitHub


fmusayev opened a new pull request, #471:
URL: https://github.com/apache/spark-website/pull/471

   Added IOMETE Data Lakehouse platform to Powered by Spark docs.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.5 updated: Revert "[SPARK-44461][SS][PYTHON][CONNECT] Verify Python Version for spark connect streaming workers"

2023-08-10 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new fc58395f08b Revert "[SPARK-44461][SS][PYTHON][CONNECT] Verify Python 
Version for spark connect streaming workers"
fc58395f08b is described below

commit fc58395f08ba81e84e60b9f0260f257a6f8b4fc1
Author: Dongjoon Hyun 
AuthorDate: Thu Aug 10 15:17:53 2023 -0700

Revert "[SPARK-44461][SS][PYTHON][CONNECT] Verify Python Version for spark 
connect streaming workers"

This reverts commit 6b55d618d36bdd296b3883916328d26863e94b8a.
---
 .../main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala | 2 +-
 python/pyspark/sql/connect/streaming/query.py  | 3 +--
 python/pyspark/sql/connect/streaming/readwriter.py | 3 +--
 python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py | 3 ---
 python/pyspark/sql/connect/streaming/worker/listener_worker.py | 3 ---
 5 files changed, 3 insertions(+), 11 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
index cddda6fb7a7..1a75965eb92 100644
--- 
a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
+++ 
b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
@@ -83,7 +83,7 @@ private[spark] class StreamingPythonRunner(
 val stream = new BufferedOutputStream(pythonWorker.get.getOutputStream, 
bufferSize)
 val dataOut = new DataOutputStream(stream)
 
-PythonWorkerUtils.writePythonVersion(pythonVer, dataOut)
+// TODO(SPARK-44461): verify python version
 
 // Send sessionId
 PythonRDD.writeUTF(sessionId, dataOut)
diff --git a/python/pyspark/sql/connect/streaming/query.py 
b/python/pyspark/sql/connect/streaming/query.py
index 021d27e939d..59e98e7bc30 100644
--- a/python/pyspark/sql/connect/streaming/query.py
+++ b/python/pyspark/sql/connect/streaming/query.py
@@ -23,7 +23,6 @@ from pyspark.errors import StreamingQueryException, 
PySparkValueError
 import pyspark.sql.connect.proto as pb2
 from pyspark.serializers import CloudPickleSerializer
 from pyspark.sql.connect import proto
-from pyspark.sql.connect.utils import get_python_ver
 from pyspark.sql.streaming import StreamingQueryListener
 from pyspark.sql.streaming.query import (
 StreamingQuery as PySparkStreamingQuery,
@@ -238,7 +237,7 @@ class StreamingQueryManager:
 cmd = pb2.StreamingQueryManagerCommand()
 expr = proto.PythonUDF()
 expr.command = CloudPickleSerializer().dumps(listener)
-expr.python_ver = get_python_ver()
+expr.python_ver = "%d.%d" % sys.version_info[:2]
 cmd.add_listener.python_listener_payload.CopyFrom(expr)
 cmd.add_listener.id = listener._id
 self._execute_streaming_query_manager_cmd(cmd)
diff --git a/python/pyspark/sql/connect/streaming/readwriter.py 
b/python/pyspark/sql/connect/streaming/readwriter.py
index 89097fcf43a..c8cd408404f 100644
--- a/python/pyspark/sql/connect/streaming/readwriter.py
+++ b/python/pyspark/sql/connect/streaming/readwriter.py
@@ -31,7 +31,6 @@ from pyspark.sql.streaming.readwriter import (
 DataStreamReader as PySparkDataStreamReader,
 DataStreamWriter as PySparkDataStreamWriter,
 )
-from pyspark.sql.connect.utils import get_python_ver
 from pyspark.sql.types import Row, StructType
 from pyspark.errors import PySparkTypeError, PySparkValueError
 
@@ -500,7 +499,7 @@ class DataStreamWriter:
 self._write_proto.foreach_batch.python_function.command = 
CloudPickleSerializer().dumps(
 func
 )
-self._write_proto.foreach_batch.python_function.python_ver = 
get_python_ver()
+self._write_proto.foreach_batch.python_function.python_ver = "%d.%d" % 
sys.version_info[:2]
 return self
 
 foreachBatch.__doc__ = PySparkDataStreamWriter.foreachBatch.__doc__
diff --git a/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py 
b/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py
index cf61463cd68..48a9848de40 100644
--- a/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py
+++ b/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py
@@ -31,15 +31,12 @@ from pyspark.serializers import (
 from pyspark import worker
 from pyspark.sql import SparkSession
 from typing import IO
-from pyspark.worker_util import check_python_version
 
 pickle_ser = CPickleSerializer()
 utf8_deserializer = UTF8Deserializer()
 
 
 def main(infile: IO, outfile: IO) -> None:
-check_python_version(infile)
-
 connect_url = os.environ["SPARK_CONNECT_LOCAL_URL"]
 session_id = utf8_deserializer.loads(infile)
 
diff --git a/python/pyspark/sql/connect/streaming/worker/listener_worker.py 

[spark] branch SPARK-27997 deleted (was 64df418dac0)

2023-08-10 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch SPARK-27997
in repository https://gitbox.apache.org/repos/asf/spark.git


 was 64df418dac0 Rename

This change permanently discards the following revisions:

 discard 64df418dac0 Rename


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch SPARK-27997 created (now 64df418dac0)

2023-08-10 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch SPARK-27997
in repository https://gitbox.apache.org/repos/asf/spark.git


  at 64df418dac0 Rename

This branch includes the following new commits:

 new 64df418dac0 Rename

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] 01/01: Rename

2023-08-10 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch SPARK-27997
in repository https://gitbox.apache.org/repos/asf/spark.git

commit 64df418dac0bf13cde151fc1f802ff2804dcbe3f
Author: Dongjoon Hyun 
AuthorDate: Thu Aug 10 09:14:16 2023 -0700

Rename
---
 .../org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala  | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
index 8537965857d..245cc924fbd 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
@@ -57,14 +57,14 @@ private[spark] object SparkKubernetesClientFactory extends 
Logging {
   .orElse(defaultServiceAccountToken)
 val oauthTokenValue = sparkConf.getOption(oauthTokenConf)
 val oauthTokenProviderConf = 
s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_PROVIDER_CONF_SUFFIX"
-val oauthTokenProviderInstance = 
sparkConf.getOption(oauthTokenProviderConf)
+val oauthTokenProvider = sparkConf.getOption(oauthTokenProviderConf)
   .map(Utils.classForName(_)
 .getDeclaredConstructor()
 .newInstance()
 .asInstanceOf[OAuthTokenProvider])
 
 require(
-  Seq(oauthTokenFile, oauthTokenValue, 
oauthTokenProviderInstance).count(_.isDefined) <= 1,
+  Seq(oauthTokenFile, oauthTokenValue, 
oauthTokenProvider).count(_.isDefined) <= 1,
   s"OAuth token should be specified via only one of $oauthTokenFileConf, 
$oauthTokenConf " +
 s"or $oauthTokenProviderConf."
 )
@@ -101,7 +101,7 @@ private[spark] object SparkKubernetesClientFactory extends 
Logging {
   .withRequestTimeout(clientType.requestTimeout(sparkConf))
   .withConnectionTimeout(clientType.connectionTimeout(sparkConf))
   .withTrustCerts(sparkConf.get(KUBERNETES_TRUST_CERTIFICATES))
-  .withOption(oauthTokenProviderInstance) {
+  .withOption(oauthTokenProvider) {
 (provider, configBuilder) => 
configBuilder.withOauthTokenProvider(provider)
   }.withOption(oauthTokenValue) {
 (token, configBuilder) => configBuilder.withOauthToken(token)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-43781][SQL] Fix IllegalStateException when cogrouping two datasets derived from the same source

2023-08-10 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 5db87787d5c [SPARK-43781][SQL] Fix IllegalStateException when 
cogrouping two datasets derived from the same source
5db87787d5c is described below

commit 5db87787d5cc1cefb51ec77e49bac7afaa46d300
Author: Jia Fan 
AuthorDate: Thu Aug 10 23:19:05 2023 +0800

[SPARK-43781][SQL] Fix IllegalStateException when cogrouping two datasets 
derived from the same source

### What changes were proposed in this pull request?
When cogroup two datasets derived from same source, eg:
```scala
val inputType = StructType(Array(StructField("id", LongType, false),
   StructField("type", StringType, false)))
val keyType = StructType(Array(StructField("id", LongType, false)))

val inputRows = new java.util.ArrayList[Row]()
inputRows.add(Row(1L, "foo"))
inputRows.add(Row(1L, "bar"))
inputRows.add(Row(2L, "foo"))
val input = spark.createDataFrame(inputRows, inputType)
val fooGroups = input.filter("type = 
'foo'").groupBy("id").as(RowEncoder(keyType),
   RowEncoder(inputType))
val barGroups = input.filter("type = 
'bar'").groupBy("id").as(RowEncoder(keyType),
   RowEncoder(inputType))

val result = fooGroups.cogroup(barGroups) { case (row, iterator, iterator1) 
=>
   iterator.toSeq ++ iterator1.toSeq
}(RowEncoder(inputType)).collect()
```
The error will be reported:
```
21:03:27.651 ERROR org.apache.spark.executor.Executor: Exception in task 
1.0 in stage 0.0 (TID 1)
java.lang.IllegalStateException: Couldn't find id#19L in [id#0L,type#1]
at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:80)
at 
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:73)
...
```
The reason are `DeduplicateRelations` rewrite `LocalRelation` but can't 
rewrite `left(right)Group` and `left(right)Attr` in `CoGroup`. In fact, the 
`Join` will face same situation. But `Join` regenerate plan when invoke itself 
to avoid this situation. Please refer 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L1089

This PR let `DeduplicateRelations` handle with `CoGroup` case

### Why are the changes needed?
Fix IllegalStateException when cogrouping two datasets derived from the 
same source

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add new test

Closes #41554 from Hisoka-X/SPARK-43781_cogrouping_two_datasets.

Authored-by: Jia Fan 
Signed-off-by: Wenchen Fan 
---
 .../catalyst/analysis/DeduplicateRelations.scala   | 39 --
 .../scala/org/apache/spark/sql/DatasetSuite.scala  | 26 +++
 2 files changed, 63 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
index a8f2765b1c4..56ce3765836 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
 
 import scala.collection.mutable
 
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeMap, 
AttributeSet, NamedExpression, OuterReference, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeMap, AttributeReference, AttributeSet, Expression, NamedExpression, 
OuterReference, SubqueryExpression}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreePattern._
@@ -228,7 +228,42 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
   if (attrMap.isEmpty) {
 planWithNewChildren
   } else {
-planWithNewChildren.rewriteAttrs(attrMap)
+def rewriteAttrs[T <: Expression](
+exprs: Seq[T],
+attrMap: Map[Attribute, Attribute]): Seq[T] = {
+  exprs.map { expr =>
+
expr.transformWithPruning(_.containsPattern(ATTRIBUTE_REFERENCE)) {
+  case a: AttributeReference => attrMap.getOrElse(a, a)
+}.asInstanceOf[T]
+  }
+}
+
+planWithNewChildren match {
+  // TODO (SPARK-44754): we should handle all special cases here.
+  case c: CoGroup =>
+// SPARK-43781: CoGroup 

[spark] branch master updated: [SPARK-44760][INFRA] Fix list index out of range for JIRA resolution in merge_spark_pr

2023-08-10 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 3164ff51dd2 [SPARK-44760][INFRA] Fix list index out of range for JIRA 
resolution in merge_spark_pr
3164ff51dd2 is described below

commit 3164ff51dd249670b8505a5ea8d361cf53c9db94
Author: Kent Yao 
AuthorDate: Thu Aug 10 08:08:32 2023 -0700

[SPARK-44760][INFRA] Fix list index out of range for JIRA resolution in 
merge_spark_pr

### What changes were proposed in this pull request?

This PR fixes list index out-of-range error for the merge_spark_pr script

The error occurs when the branch we merge into does not have a jira project 
version.

```python
>>> versions = sorted(versions, key=lambda x: x.name, reverse=True)
>>> versions
[, , , , , 
]
>>> list(filter(lambda x: x.name.startswith(""), versions))[-1]

>>> list(filter(lambda x: x.name.startswith("3.2"), versions))[-1]
Traceback (most recent call last):
  File "", line 1, in 
IndexError: list index out of range
```

Unlike other archived branches or versions, v3.2.5 is missing from our 
JIRA. This crushes the merging scripts during JIRA resolution if branch-3.2 is 
chosen.

### Why are the changes needed?

happy merging

### Does this PR introduce _any_ user-facing change?

no, infra only change

### How was this patch tested?

manually tested

```python
>>> default_fix_versions = []
>>> for b in merge_branches:
... if b == "master":
... default_fix_versions.append(versions[0])
... else:
... found = False
... for v in versions:
... if v.name.startswith(b.replace("branch-", "")):
... default_fix_versions.append(v)
... found = True
... if not found:
... print(
... "Target version for %s is not found on JIRA, it may be 
archived or "
... "not created. Skipping it." % b)
...
Target version for branch-3.2 is not found on JIRA, it may be archived or 
not created. Skipping it.
>>> default_fix_versions
[, , , , , 
]
>>> quit
```

Closes #42429 from yaooqinn/SPARK-44760.

Authored-by: Kent Yao 
Signed-off-by: Dongjoon Hyun 
---
 dev/merge_spark_pr.py | 37 ++---
 1 file changed, 22 insertions(+), 15 deletions(-)

diff --git a/dev/merge_spark_pr.py b/dev/merge_spark_pr.py
index 41b00b463f1..88792e77161 100755
--- a/dev/merge_spark_pr.py
+++ b/dev/merge_spark_pr.py
@@ -237,15 +237,6 @@ def cherry_pick(pr_num, merge_hash, default_branch):
 return pick_ref
 
 
-def fix_version_from_branch(branch, versions):
-# Note: Assumes this is a sorted (newest->oldest) list of un-released 
versions
-if branch == "master":
-return versions[0]
-else:
-branch_ver = branch.replace("branch-", "")
-return list(filter(lambda x: x.name.startswith(branch_ver), 
versions))[-1]
-
-
 def resolve_jira_issue(merge_branches, comment, default_jira_id=""):
 asf_jira = jira.client.JIRA(
 {"server": JIRA_API_BASE}, basic_auth=(JIRA_USERNAME, JIRA_PASSWORD)
@@ -280,14 +271,30 @@ def resolve_jira_issue(merge_branches, comment, 
default_jira_id=""):
 )
 
 versions = asf_jira.project_versions("SPARK")
+# Consider only x.y.z, unreleased, unarchived versions
+versions = [
+x
+for x in versions
+if not x.raw["released"] and not x.raw["archived"] and 
re.match(r"\d+\.\d+\.\d+", x.name)
+]
 versions = sorted(versions, key=lambda x: x.name, reverse=True)
-versions = list(filter(lambda x: x.raw["released"] is False, versions))
-# Consider only x.y.z versions
-versions = list(filter(lambda x: re.match(r"\d+\.\d+\.\d+", x.name), 
versions))
 
-default_fix_versions = list(
-map(lambda x: fix_version_from_branch(x, versions).name, 
merge_branches)
-)
+default_fix_versions = []
+for b in merge_branches:
+if b == "master":
+default_fix_versions.append(versions[0])
+else:
+found = False
+for v in versions:
+if v.name.startswith(b.replace("branch-", "")):
+default_fix_versions.append(v)
+found = True
+if not found:
+print(
+"Target version for %s is not found on JIRA, it may be 
archived or "
+"not created. Skipping it." % b
+)
+
 for v in default_fix_versions:
 # Handles the case where we have forked a release branch but not yet 
made the release.
 # In this case, if the PR is committed to the master branch and the 
release branch, 

[spark] branch master updated: [SPARK-44741][CORE] Support regex-based MetricFilter in `StatsdSink`

2023-08-10 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 36b5429abc7 [SPARK-44741][CORE] Support regex-based MetricFilter in 
`StatsdSink`
36b5429abc7 is described below

commit 36b5429abc73f59459d1636ebef58a5a679af6c0
Author: Rameshkrishnan Muthusamy 
AuthorDate: Thu Aug 10 07:07:24 2023 -0700

[SPARK-44741][CORE] Support regex-based MetricFilter in `StatsdSink`

### What changes were proposed in this pull request?

Adding additional option in StatsdSink to allow passing regex as a filter 
option for reporting metrics
Added unit test case to validate the application of filter in the StatsdSink
Updated metrics config to represent the added config for StatsdSink

### Why are the changes needed?

In the current state Spark metrics instances send a large range of metrics 
that are useful for debugging and performance analysis. There are cases where 
the consumers would like to switch between deeper metrics and only specific 
custom metrics that are required for system maintenance. This option introduced 
in StatsdSink extends the existing filtering option in the reporter and exposes 
the same as an option. This implementation is similar to GraphiteSink that 
supports a similar feature.

### Does this PR introduce _any_ user-facing change?

Yes , This PR exposes an additional config that users can provide 
optionally to filter the metrics that are being reported. This is a optional 
field and defaulted to the existing behaviour where the sink reports all the 
metrics.

### How was this patch tested?

Unit tests has been added in the PR to validate the change

Closes #42416 from ramesh-muthusamy/SPARK-44741.

Authored-by: Rameshkrishnan Muthusamy 
Signed-off-by: Dongjoon Hyun 
---
 conf/metrics.properties.template   |  2 ++
 .../org/apache/spark/metrics/sink/StatsdSink.scala | 14 --
 .../spark/metrics/sink/StatsdSinkSuite.scala   | 30 ++
 3 files changed, 44 insertions(+), 2 deletions(-)

diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template
index f52d33fd642..aa8e0e438e6 100644
--- a/conf/metrics.properties.template
+++ b/conf/metrics.properties.template
@@ -139,6 +139,7 @@
 #   period10Poll period
 #   unit  seconds   Units of poll period
 #   prefixEMPTY STRING  Prefix to prepend to metric name
+#   regex NONE  Optional filter to send only metrics matching this 
regex string
 
 ## Examples
 # Enable JmxSink for all instances by class name
@@ -150,6 +151,7 @@
 # Enable StatsdSink for all instances by class name
 #*.sink.statsd.class=org.apache.spark.metrics.sink.StatsdSink
 #*.sink.statsd.prefix=spark
+#*.sink.statsd.regex=
 
 # Polling period for the ConsoleSink
 #*.sink.console.period=10
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala 
b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala
index c6e7bcccd4c..c506b86b456 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala
@@ -20,7 +20,7 @@ package org.apache.spark.metrics.sink
 import java.util.{Locale, Properties}
 import java.util.concurrent.TimeUnit
 
-import com.codahale.metrics.MetricRegistry
+import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry}
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.metrics.MetricsSystem
@@ -31,6 +31,7 @@ private[spark] object StatsdSink {
   val STATSD_KEY_PERIOD = "period"
   val STATSD_KEY_UNIT = "unit"
   val STATSD_KEY_PREFIX = "prefix"
+  val STATSD_KEY_REGEX = "regex"
 
   val STATSD_DEFAULT_HOST = "127.0.0.1"
   val STATSD_DEFAULT_PORT = "8125"
@@ -53,9 +54,18 @@ private[spark] class StatsdSink(
 
   val prefix = property.getProperty(STATSD_KEY_PREFIX, STATSD_DEFAULT_PREFIX)
 
+  val filter = Option(property.getProperty(STATSD_KEY_REGEX)) match {
+case Some(pattern) => new MetricFilter() {
+  override def matches(name: String, metric: Metric): Boolean = {
+pattern.r.findFirstMatchIn(name).isDefined
+  }
+}
+case None => MetricFilter.ALL
+  }
+
   MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
 
-  val reporter = new StatsdReporter(registry, host, port, prefix)
+  val reporter = new StatsdReporter(registry, host, port, prefix, filter)
 
   override def start(): Unit = {
 reporter.start(pollPeriod, pollUnit)
diff --git 
a/core/src/test/scala/org/apache/spark/metrics/sink/StatsdSinkSuite.scala 
b/core/src/test/scala/org/apache/spark/metrics/sink/StatsdSinkSuite.scala
index ff883633d5e..28bf40e8c93 100644
--- a/core/src/test/scala/org/apache/spark/metrics/sink/StatsdSinkSuite.scala
+++ 

[spark] branch master updated: [SPARK-43660][CONNECT][PS][FOLLOWUP] Remove JVM dependency for resample

2023-08-10 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new bd14d6412a3 [SPARK-43660][CONNECT][PS][FOLLOWUP] Remove JVM dependency 
for resample
bd14d6412a3 is described below

commit bd14d6412a3124eecce1493fcad436280915ba71
Author: itholic 
AuthorDate: Thu Aug 10 18:20:24 2023 +0800

[SPARK-43660][CONNECT][PS][FOLLOWUP] Remove JVM dependency for resample

### What changes were proposed in this pull request?

This is follow-up for https://github.com/apache/spark/pull/41877 to remove 
JVM dependency.

### Why are the changes needed?

To remove JVM dependency from Pandas API on Spark with Spark Connect.

### Does this PR introduce _any_ user-facing change?

No, it's internal handling.

### How was this patch tested?

The existing UT.

Closes #42410 from itholic/resample_followup.

Authored-by: itholic 
Signed-off-by: Ruifeng Zheng 
---
 python/pyspark/pandas/resample.py | 27 +--
 1 file changed, 9 insertions(+), 18 deletions(-)

diff --git a/python/pyspark/pandas/resample.py 
b/python/pyspark/pandas/resample.py
index 30f8c9d3169..2d10802b651 100644
--- a/python/pyspark/pandas/resample.py
+++ b/python/pyspark/pandas/resample.py
@@ -41,7 +41,6 @@ else:
 
 _builtin_table = SelectionMixin._builtin_table  # type: 
ignore[attr-defined]
 
-from pyspark import SparkContext
 from pyspark.sql import Column, functions as F
 from pyspark.sql.types import (
 NumericType,
@@ -67,7 +66,6 @@ from pyspark.pandas.utils import (
 scol_for,
 verify_temp_column_name,
 )
-from pyspark.sql.utils import is_remote
 from pyspark.pandas.spark.functions import timestampdiff
 
 
@@ -145,22 +143,15 @@ class Resampler(Generic[FrameLike], metaclass=ABCMeta):
 def get_make_interval(  # type: ignore[return]
 self, unit: str, col: Union[Column, int, float]
 ) -> Column:
-if is_remote():
-from pyspark.sql.connect.functions import lit, make_interval
-
-col = col if not isinstance(col, (int, float)) else lit(col)  # 
type: ignore[assignment]
-if unit == "MONTH":
-return make_interval(months=col)  # type: ignore
-if unit == "HOUR":
-return make_interval(hours=col)  # type: ignore
-if unit == "MINUTE":
-return make_interval(mins=col)  # type: ignore
-if unit == "SECOND":
-return make_interval(secs=col)  # type: ignore
-else:
-sql_utils = SparkContext._active_spark_context._jvm.PythonSQLUtils
-col = col._jc if isinstance(col, Column) else F.lit(col)._jc
-return sql_utils.makeInterval(unit, col)
+col = col if not isinstance(col, (int, float)) else F.lit(col)
+if unit == "MONTH":
+return F.make_interval(months=col)
+if unit == "HOUR":
+return F.make_interval(hours=col)
+if unit == "MINUTE":
+return F.make_interval(mins=col)
+if unit == "SECOND":
+return F.make_interval(secs=col)
 
 def _bin_timestamp(self, origin: pd.Timestamp, ts_scol: Column) -> Column:
 key_type = self._resamplekey_type


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-43245][SPARK-43705][PS] Type match for `DatetimeIndex`/`TimedeltaIndex` with pandas 2

2023-08-10 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 2c9b25a90b5 [SPARK-43245][SPARK-43705][PS] Type match for 
`DatetimeIndex`/`TimedeltaIndex` with pandas 2
2c9b25a90b5 is described below

commit 2c9b25a90b57ca2095881f9de4f13bf820f9dac9
Author: itholic 
AuthorDate: Thu Aug 10 17:42:21 2023 +0900

[SPARK-43245][SPARK-43705][PS] Type match for 
`DatetimeIndex`/`TimedeltaIndex` with pandas 2

### What changes were proposed in this pull request?

This PR proposes to match the type for `DatetimeIndex`/`TimedeltaIndex` 
with [pandas 2](https://pandas.pydata.org/docs/dev/whatsnew/v2.0.0.html).

### Why are the changes needed?

To match the behavior with pandas 2 and above.

### Does this PR introduce _any_ user-facing change?

Yes, the return type for several DatatimeIndex & TimedeltaIndex APIs is 
changed to `int32` instead of `int64`. e.g.

```diff
>>> s = ps.from_pandas(pd.date_range('2016-12-31', '2017-01-08', 
freq='D').to_series())
>>> s.dt.dayofweek
2016-12-315
2017-01-016
2017-01-020
2017-01-031
2017-01-042
2017-01-053
2017-01-064
2017-01-075
2017-01-086
-  dtype: int64
+  dtype: int32
```

### How was this patch tested?

Enabling the existing doctests & UTs.

Closes #42271 from itholic/pandas_datetime_api.

Authored-by: itholic 
Signed-off-by: Hyukjin Kwon 
---
 .../source/migration_guide/pyspark_upgrade.rst |  1 +
 python/pyspark/pandas/datetimes.py | 30 +++---
 python/pyspark/pandas/indexes/timedelta.py |  3 ++-
 .../pyspark/pandas/tests/indexes/test_datetime.py  | 28 +++-
 .../pyspark/pandas/tests/indexes/test_timedelta.py |  4 ---
 5 files changed, 40 insertions(+), 26 deletions(-)

diff --git a/python/docs/source/migration_guide/pyspark_upgrade.rst 
b/python/docs/source/migration_guide/pyspark_upgrade.rst
index da49719579a..98630133e0c 100644
--- a/python/docs/source/migration_guide/pyspark_upgrade.rst
+++ b/python/docs/source/migration_guide/pyspark_upgrade.rst
@@ -34,6 +34,7 @@ Upgrading from PySpark 3.5 to 4.0
 * In Spark 4.0, ``closed`` parameter from ``ps.date_range`` has been removed 
from pandas API on Spark.
 * In Spark 4.0, ``include_start`` and ``include_end`` parameters from 
``DataFrame.between_time`` have been removed from pandas API on Spark, use 
``inclusive`` instead.
 * In Spark 4.0, ``include_start`` and ``include_end`` parameters from 
``Series.between_time`` have been removed from pandas API on Spark, use 
``inclusive`` instead.
+* In Spark 4.0, the various datetime attributes of ``DatetimeIndex`` (``day``, 
``month``, ``year`` etc.) are now ``int32`` instead of ``int64`` from pandas 
API on Spark.
 
 
 Upgrading from PySpark 3.3 to 3.4
diff --git a/python/pyspark/pandas/datetimes.py 
b/python/pyspark/pandas/datetimes.py
index 752f6f46282..b0649cf5761 100644
--- a/python/pyspark/pandas/datetimes.py
+++ b/python/pyspark/pandas/datetimes.py
@@ -27,7 +27,7 @@ from pandas.tseries.offsets import DateOffset
 
 import pyspark.pandas as ps
 import pyspark.sql.functions as F
-from pyspark.sql.types import DateType, TimestampType, TimestampNTZType, 
LongType
+from pyspark.sql.types import DateType, TimestampType, TimestampNTZType, 
LongType, IntegerType
 
 
 class DatetimeMethods:
@@ -64,42 +64,42 @@ class DatetimeMethods:
 """
 The year of the datetime.
 """
-return self._data.spark.transform(lambda c: F.year(c).cast(LongType()))
+return self._data.spark.transform(lambda c: 
F.year(c).cast(IntegerType()))
 
 @property
 def month(self) -> "ps.Series":
 """
 The month of the timestamp as January = 1 December = 12.
 """
-return self._data.spark.transform(lambda c: 
F.month(c).cast(LongType()))
+return self._data.spark.transform(lambda c: 
F.month(c).cast(IntegerType()))
 
 @property
 def day(self) -> "ps.Series":
 """
 The days of the datetime.
 """
-return self._data.spark.transform(lambda c: 
F.dayofmonth(c).cast(LongType()))
+return self._data.spark.transform(lambda c: 
F.dayofmonth(c).cast(IntegerType()))
 
 @property
 def hour(self) -> "ps.Series":
 """
 The hours of the datetime.
 """
-return self._data.spark.transform(lambda c: F.hour(c).cast(LongType()))
+return self._data.spark.transform(lambda c: 
F.hour(c).cast(IntegerType()))
 
 @property
 def minute(self) -> "ps.Series":
 """
 The minutes of the datetime.
 """
-return self._data.spark.transform(lambda c: 
F.minute(c).cast(LongType()))
+return 

[spark] branch branch-3.5 updated: [SPARK-44691][SQL][CONNECT] Move Subclasses of AnalysisException to sql/api

2023-08-10 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 1db23d263a1 [SPARK-44691][SQL][CONNECT] Move Subclasses of 
AnalysisException to sql/api
1db23d263a1 is described below

commit 1db23d263a16a3a42c8e9280e29f99b25461d8a7
Author: Yihong He 
AuthorDate: Thu Aug 10 14:56:50 2023 +0800

[SPARK-44691][SQL][CONNECT] Move Subclasses of AnalysisException to sql/api

### What changes were proposed in this pull request?

- Move subclasses of AnalysisException to sql/api
- Refactor some dependent utilities

### Why are the changes needed?

- Supporting throws these exceptions for better compatibility with the 
existing control flow behaviors

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

- Existing tests

Closes #42363 from heyihong/SPARK-44691.

Authored-by: Yihong He 
Signed-off-by: Wenchen Fan 
(cherry picked from commit d5f1f17278a772689a394e2d10751e5f6655fdbc)
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/connector/catalog/Identifier.java|   0
 .../sql/connector/catalog/IdentifierImpl.java  |  10 +-
 .../sql/catalyst/analysis/NonEmptyException.scala  |   4 +-
 .../catalyst/analysis/alreadyExistException.scala} | 105 +---
 .../catalyst/analysis/noSuchItemsExceptions.scala} |  96 +--
 .../spark/sql/catalyst/util/QuotingUtils.scala |  24 +++
 .../spark/sql/catalyst/util/StringUtils.scala  |   6 -
 .../spark/sql/errors/DataTypeErrorsBase.scala  |   4 +-
 .../catalyst/analysis/AlreadyExistException.scala  | 163 --
 .../catalyst/analysis/NoSuchItemException.scala| 185 -
 .../apache/spark/sql/catalyst/util/package.scala   |   4 +-
 .../sql/connector/catalog/CatalogV2Implicits.scala |  10 +-
 12 files changed, 48 insertions(+), 563 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Identifier.java
 b/sql/api/src/main/java/org/apache/spark/sql/connector/catalog/Identifier.java
similarity index 100%
rename from 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Identifier.java
rename to 
sql/api/src/main/java/org/apache/spark/sql/connector/catalog/Identifier.java
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/IdentifierImpl.java
 
b/sql/api/src/main/java/org/apache/spark/sql/connector/catalog/IdentifierImpl.java
similarity index 91%
rename from 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/IdentifierImpl.java
rename to 
sql/api/src/main/java/org/apache/spark/sql/connector/catalog/IdentifierImpl.java
index c7aceecabac..61894b18fe6 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/IdentifierImpl.java
+++ 
b/sql/api/src/main/java/org/apache/spark/sql/connector/catalog/IdentifierImpl.java
@@ -17,14 +17,14 @@
 
 package org.apache.spark.sql.connector.catalog;
 
+import org.apache.arrow.util.Preconditions;
+
 import java.util.Arrays;
 import java.util.Objects;
 import java.util.StringJoiner;
 
-import com.google.common.base.Preconditions;
-
 import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.catalyst.util.package$;
+import org.apache.spark.sql.catalyst.util.QuotingUtils;
 
 /**
  *  An {@link Identifier} implementation.
@@ -56,9 +56,9 @@ class IdentifierImpl implements Identifier {
   public String toString() {
 StringJoiner joiner = new StringJoiner(".");
 for (String p : namespace) {
-  joiner.add(package$.MODULE$.quoteIfNeeded(p));
+  joiner.add(QuotingUtils.quoteIfNeeded(p));
 }
-joiner.add(package$.MODULE$.quoteIfNeeded(name));
+joiner.add(QuotingUtils.quoteIfNeeded(name));
 return joiner.toString();
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala
similarity index 91%
rename from 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala
rename to 
sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala
index f3ff28f74fc..ecd57672b61 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+import org.apache.spark.sql.catalyst.util.QuotingUtils.quoted
 
 
 /**
@@ -31,6 +31,6 @@ case class NonEmptyNamespaceException(
   extends AnalysisException(message, cause = cause) {
 
   

[spark] branch master updated: [SPARK-44691][SQL][CONNECT] Move Subclasses of AnalysisException to sql/api

2023-08-10 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d5f1f17278a [SPARK-44691][SQL][CONNECT] Move Subclasses of 
AnalysisException to sql/api
d5f1f17278a is described below

commit d5f1f17278a772689a394e2d10751e5f6655fdbc
Author: Yihong He 
AuthorDate: Thu Aug 10 14:56:50 2023 +0800

[SPARK-44691][SQL][CONNECT] Move Subclasses of AnalysisException to sql/api

### What changes were proposed in this pull request?

- Move subclasses of AnalysisException to sql/api
- Refactor some dependent utilities

### Why are the changes needed?

- Supporting throws these exceptions for better compatibility with the 
existing control flow behaviors

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

- Existing tests

Closes #42363 from heyihong/SPARK-44691.

Authored-by: Yihong He 
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/connector/catalog/Identifier.java|   0
 .../sql/connector/catalog/IdentifierImpl.java  |  10 +-
 .../sql/catalyst/analysis/NonEmptyException.scala  |   4 +-
 .../catalyst/analysis/alreadyExistException.scala} | 105 +---
 .../catalyst/analysis/noSuchItemsExceptions.scala} |  96 +--
 .../spark/sql/catalyst/util/QuotingUtils.scala |  24 +++
 .../spark/sql/catalyst/util/StringUtils.scala  |   6 -
 .../spark/sql/errors/DataTypeErrorsBase.scala  |   4 +-
 .../catalyst/analysis/AlreadyExistException.scala  | 163 --
 .../catalyst/analysis/NoSuchItemException.scala| 185 -
 .../apache/spark/sql/catalyst/util/package.scala   |   4 +-
 .../sql/connector/catalog/CatalogV2Implicits.scala |  10 +-
 12 files changed, 48 insertions(+), 563 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Identifier.java
 b/sql/api/src/main/java/org/apache/spark/sql/connector/catalog/Identifier.java
similarity index 100%
rename from 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Identifier.java
rename to 
sql/api/src/main/java/org/apache/spark/sql/connector/catalog/Identifier.java
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/IdentifierImpl.java
 
b/sql/api/src/main/java/org/apache/spark/sql/connector/catalog/IdentifierImpl.java
similarity index 91%
rename from 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/IdentifierImpl.java
rename to 
sql/api/src/main/java/org/apache/spark/sql/connector/catalog/IdentifierImpl.java
index c7aceecabac..61894b18fe6 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/IdentifierImpl.java
+++ 
b/sql/api/src/main/java/org/apache/spark/sql/connector/catalog/IdentifierImpl.java
@@ -17,14 +17,14 @@
 
 package org.apache.spark.sql.connector.catalog;
 
+import org.apache.arrow.util.Preconditions;
+
 import java.util.Arrays;
 import java.util.Objects;
 import java.util.StringJoiner;
 
-import com.google.common.base.Preconditions;
-
 import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.catalyst.util.package$;
+import org.apache.spark.sql.catalyst.util.QuotingUtils;
 
 /**
  *  An {@link Identifier} implementation.
@@ -56,9 +56,9 @@ class IdentifierImpl implements Identifier {
   public String toString() {
 StringJoiner joiner = new StringJoiner(".");
 for (String p : namespace) {
-  joiner.add(package$.MODULE$.quoteIfNeeded(p));
+  joiner.add(QuotingUtils.quoteIfNeeded(p));
 }
-joiner.add(package$.MODULE$.quoteIfNeeded(name));
+joiner.add(QuotingUtils.quoteIfNeeded(name));
 return joiner.toString();
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala
similarity index 91%
rename from 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala
rename to 
sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala
index f3ff28f74fc..ecd57672b61 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+import org.apache.spark.sql.catalyst.util.QuotingUtils.quoted
 
 
 /**
@@ -31,6 +31,6 @@ case class NonEmptyNamespaceException(
   extends AnalysisException(message, cause = cause) {
 
   def this(namespace: Array[String]) = {
-this(s"Namespace '${namespace.quoted}' is non empty.")
+