(spark) branch master updated: [MINOR][SQL] Convert `UnresolvedException` to an internal error

2023-12-12 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 071c684dee44 [MINOR][SQL] Convert `UnresolvedException` to an internal 
error
071c684dee44 is described below

commit 071c684dee44665691ddab916021d4920a9ac51b
Author: Max Gekk 
AuthorDate: Tue Dec 12 18:06:48 2023 +0300

[MINOR][SQL] Convert `UnresolvedException` to an internal error

### What changes were proposed in this pull request?
In the PR, I propose to change the parent class of `UnresolvedException` 
from `AnalysisException` to `SparkException` with the error class 
`INTERNAL_ERROR`. If an user observes the error, this is definitely a bug in 
the compiler.

### Why are the changes needed?
To unify all Spark exceptions, and assign an error class. So, this should 
improve user experience with Spark SQL.

### Does this PR introduce _any_ user-facing change?
No, users shouldn't face to the error in regular cases.

### How was this patch tested?
By existing GAs.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #44311 from MaxGekk/error-class-UnresolvedException.

Authored-by: Max Gekk 
Signed-off-by: Max Gekk 
---
 .../scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala| 5 -
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 97912fb5d592..e1dec5955a7f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -37,7 +37,10 @@ import org.apache.spark.util.ArrayImplicits._
  * resolved.
  */
 class UnresolvedException(function: String)
-  extends AnalysisException(s"Invalid call to $function on unresolved object")
+  extends SparkException(
+errorClass = "INTERNAL_ERROR",
+messageParameters = Map("message" -> s"Invalid call to $function on 
unresolved object"),
+cause = null)
 
 /** Parent trait for unresolved node types */
 trait UnresolvedNode extends LogicalPlan {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-46075][CONNECT] Improvements to SparkConnectSessionManager

2023-12-12 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new a122b8acc2f4 [SPARK-46075][CONNECT] Improvements to 
SparkConnectSessionManager
a122b8acc2f4 is described below

commit a122b8acc2f47c58e8891a5f1464a588f77750e7
Author: Juliusz Sompolski 
AuthorDate: Tue Dec 12 09:40:56 2023 -0800

[SPARK-46075][CONNECT] Improvements to SparkConnectSessionManager

### What changes were proposed in this pull request?

This is factored out from https://github.com/apache/spark/pull/43913 and is 
a continuation to https://github.com/apache/spark/pull/43546 when 
SparkConnectSessionManager was introduced.

We want to remove the use a Guava cache as session cache, and have our 
custom logic with more control. This refactors the Session Manager and adds 
more tests.

We introduce a mechanism that mirrors SparkConnectExecutionManager instead.

### Why are the changes needed?

With guava cache, only a single "inactivity timeout" can be specified for 
the whole cache. This can't be for example overriden per session. The actual 
invalidation also happens not in it's own thread inside guava, but it's 
work-stealing lazily piggy backed to other operations on the cache, making it 
opaque when session removal will actually happen.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

SparkConnectSessionManagerSuite added.

### Was this patch authored or co-authored using generative AI tooling?

Github Copilot was assisting in some boilerplate auto-completion.

Generated-by: Github Copilot

Closes #43985 from juliuszsompolski/SPARK-46075.

Authored-by: Juliusz Sompolski 
Signed-off-by: Hyukjin Kwon 
---
 .../apache/spark/sql/connect/config/Connect.scala  |  11 +-
 .../spark/sql/connect/service/ExecuteHolder.scala  |  39 ++--
 .../spark/sql/connect/service/SessionHolder.scala  |  80 ++--
 .../service/SparkConnectExecutionManager.scala |  48 +++--
 .../service/SparkConnectSessionManager.scala   | 224 -
 .../spark/sql/connect/SparkConnectServerTest.scala |   2 +-
 .../service/SparkConnectSessionManagerSuite.scala  | 137 +
 7 files changed, 429 insertions(+), 112 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
index f7aa98af2fa3..ab4f06d508a0 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
@@ -78,7 +78,8 @@ object Connect {
   val CONNECT_SESSION_MANAGER_DEFAULT_SESSION_TIMEOUT =
 buildStaticConf("spark.connect.session.manager.defaultSessionTimeout")
   .internal()
-  .doc("Timeout after which sessions without any new incoming RPC will be 
removed.")
+  .doc("Timeout after which sessions without any new incoming RPC will be 
removed. " +
+"Setting it to -1 indicates that sessions should be kept forever.")
   .version("4.0.0")
   .timeConf(TimeUnit.MILLISECONDS)
   .createWithDefaultString("60m")
@@ -93,6 +94,14 @@ object Connect {
   .intConf
   .createWithDefaultString("1000")
 
+  val CONNECT_SESSION_MANAGER_MAINTENANCE_INTERVAL =
+buildStaticConf("spark.connect.session.manager.maintenanceInterval")
+  .internal()
+  .doc("Interval at which session manager will search for expired sessions 
to remove.")
+  .version("4.0.0")
+  .timeConf(TimeUnit.MILLISECONDS)
+  .createWithDefaultString("30s")
+
   val CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT =
 buildStaticConf("spark.connect.execute.manager.detachedTimeout")
   .internal()
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala
index 9e97ded5bf8a..f03f81326064 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala
@@ -95,17 +95,17 @@ private[connect] class ExecuteHolder(
   private val runner: ExecuteThreadRunner = new ExecuteThreadRunner(this)
 
   /** System.currentTimeMillis when this ExecuteHolder was created. */
-  val creationTime = System.currentTimeMillis()
+  val creationTimeMs = System.currentTimeMillis()
 
   /**
* None if there is currently an attached RPC (grpcResponseSenders not empty 
or during initial
* ExecutePlan handler). Otherwise, the System.currentTimeMillis whe

(spark) branch master updated: [SPARK-46353][CORE] Refactor to improve `RegisterWorker` unit test coverage

2023-12-12 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 2215cef40043 [SPARK-46353][CORE] Refactor to improve `RegisterWorker` 
unit test coverage
2215cef40043 is described below

commit 2215cef40043a3205446f8daecafed8f2360a742
Author: Dongjoon Hyun 
AuthorDate: Tue Dec 12 09:57:43 2023 -0800

[SPARK-46353][CORE] Refactor to improve `RegisterWorker` unit test coverage

### What changes were proposed in this pull request?

This PR aims to improve the unit test coverage for `RegisterWorker` message 
handling.

- Add `handleRegisterWorker` helper method which is testable easily.
- Add new unit tests for three conditional branches.

### Why are the changes needed?

It's easily to test and improve. We can add more tests in this way in the 
future.

### Does this PR introduce _any_ user-facing change?

No. This is a refactoring on the main code and only additions to the test 
methods.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44284 from dongjoon-hyun/SPARK-46353.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
---
 .../org/apache/spark/deploy/master/Master.scala| 75 +-
 .../apache/spark/deploy/master/MasterSuite.scala   | 59 -
 2 files changed, 102 insertions(+), 32 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index a550f44fc0a4..c8679c185ad7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -37,7 +37,7 @@ import org.apache.spark.internal.config.Deploy._
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.internal.config.Worker._
 import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
-import org.apache.spark.resource.{ResourceProfile, ResourceRequirement, 
ResourceUtils}
+import org.apache.spark.resource.{ResourceInformation, ResourceProfile, 
ResourceRequirement, ResourceUtils}
 import org.apache.spark.rpc._
 import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, Serializer}
 import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, 
Utils}
@@ -75,7 +75,8 @@ private[deploy] class Master(
   private val waitingApps = new ArrayBuffer[ApplicationInfo]
   val apps = new HashSet[ApplicationInfo]
 
-  private val idToWorker = new HashMap[String, WorkerInfo]
+  // Visible for testing
+  private[master] val idToWorker = new HashMap[String, WorkerInfo]
   private val addressToWorker = new HashMap[RpcAddress, WorkerInfo]
 
   private val endpointToApp = new HashMap[RpcEndpointRef, ApplicationInfo]
@@ -106,7 +107,7 @@ private[deploy] class Master(
 
   private[master] var state = RecoveryState.STANDBY
 
-  private var persistenceEngine: PersistenceEngine = _
+  private[master] var persistenceEngine: PersistenceEngine = _
 
   private var leaderElectionAgent: LeaderElectionAgent = _
 
@@ -281,33 +282,8 @@ private[deploy] class Master(
 case RegisterWorker(
   id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl,
   masterAddress, resources) =>
-  logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
-workerHost, workerPort, cores, Utils.megabytesToString(memory)))
-  if (state == RecoveryState.STANDBY) {
-workerRef.send(MasterInStandby)
-  } else if (idToWorker.contains(id)) {
-if (idToWorker(id).state == WorkerState.UNKNOWN) {
-  logInfo("Worker has been re-registered: " + id)
-  idToWorker(id).state = WorkerState.ALIVE
-}
-workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress, 
true))
-  } else {
-val workerResources =
-  resources.map(r => r._1 -> WorkerResourceInfo(r._1, 
r._2.addresses.toImmutableArraySeq))
-val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
-  workerRef, workerWebUiUrl, workerResources)
-if (registerWorker(worker)) {
-  persistenceEngine.addWorker(worker)
-  workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress, 
false))
-  schedule()
-} else {
-  val workerAddress = worker.endpoint.address
-  logWarning("Worker registration failed. Attempted to re-register 
worker at same " +
-"address: " + workerAddress)
-  workerRef.send(RegisterWorkerFailed("Attempted to re-register worker 
at same address: "
-+ workerAddress))
-}
-  }
+  handleRegisterWorker(id, workerHost, workerPort, workerRef, cores, 

(spark) branch master updated: [SPARK-46378][SQL] Still remove Sort after converting Aggregate to Project

2023-12-12 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new c1ba963e64a2 [SPARK-46378][SQL] Still remove Sort after converting 
Aggregate to Project
c1ba963e64a2 is described below

commit c1ba963e64a22dea28e17b1ed954e6d03d38da1e
Author: Wenchen Fan 
AuthorDate: Tue Dec 12 10:04:40 2023 -0800

[SPARK-46378][SQL] Still remove Sort after converting Aggregate to Project

### What changes were proposed in this pull request?

This is a follow-up of https://github.com/apache/spark/pull/33397 to avoid 
sub-optimal plans. After converting `Aggregate` to `Project`, there is 
information lost: `Aggregate` doesn't care about the data order of inputs, but 
`Project` cares. `EliminateSorts` can remove `Sort` below `Aggregate`, but it 
doesn't work anymore if we convert `Aggregate` to `Project`.

This PR fixes this issue by tagging the `Project` to be order-irrelevant if 
it's converted from `Aggregate`. Then `EliminateSorts` optimizes the tagged 
`Project`.

### Why are the changes needed?

avoid sub-optimal plans

### Does this PR introduce _any_ user-facing change?

No
### How was this patch tested?

new test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #44310 from cloud-fan/sort.

Authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
---
 .../scala/org/apache/spark/sql/catalyst/dsl/package.scala|  2 ++
 .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala  |  6 +-
 .../sql/catalyst/plans/logical/basicLogicalOperators.scala   |  3 +++
 .../spark/sql/catalyst/optimizer/EliminateSortsSuite.scala   | 12 
 4 files changed, 22 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 30d4c2dbb409..eb3047700215 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -395,6 +395,8 @@ package object dsl {
 
   def limit(limitExpr: Expression): LogicalPlan = Limit(limitExpr, 
logicalPlan)
 
+  def localLimit(limitExpr: Expression): LogicalPlan = 
LocalLimit(limitExpr, logicalPlan)
+
   def offset(offsetExpr: Expression): LogicalPlan = Offset(offsetExpr, 
logicalPlan)
 
   def join(
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 960f5e532c08..a4b25cbd1d2e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -769,7 +769,9 @@ object LimitPushDown extends Rule[LogicalPlan] {
   LocalLimit(exp, project.copy(child = pushLocalLimitThroughJoin(exp, 
join)))
 // Push down limit 1 through Aggregate and turn Aggregate into Project if 
it is group only.
 case Limit(le @ IntegerLiteral(1), a: Aggregate) if a.groupOnly =>
-  Limit(le, Project(a.aggregateExpressions, LocalLimit(le, a.child)))
+  val project = Project(a.aggregateExpressions, LocalLimit(le, a.child))
+  project.setTagValue(Project.dataOrderIrrelevantTag, ())
+  Limit(le, project)
 case Limit(le @ IntegerLiteral(1), p @ Project(_, a: Aggregate)) if 
a.groupOnly =>
   Limit(le, p.copy(child = Project(a.aggregateExpressions, LocalLimit(le, 
a.child
 // Merge offset value and limit value into LocalLimit and pushes down 
LocalLimit through Offset.
@@ -1583,6 +1585,8 @@ object EliminateSorts extends Rule[LogicalPlan] {
 right = recursiveRemoveSort(originRight, true))
 case g @ Aggregate(_, aggs, originChild) if isOrderIrrelevantAggs(aggs) =>
   g.copy(child = recursiveRemoveSort(originChild, true))
+case p: Project if p.getTagValue(Project.dataOrderIrrelevantTag).isDefined 
=>
+  p.copy(child = recursiveRemoveSort(p.child, true))
   }
 
   /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 497f485b67fe..65f4151c0c96 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -101,6 +101,9 @@ case class Project(projectList: Seq[NamedExpression], 
child: LogicalPlan)
 
 object Project {
   val hiddenOutputTag: TreeNodeTag[Seq[Attribute]] = 
TreeNodeTag[Seq[Attribute]]

(spark) branch master updated: [SPARK-46253][PYTHON] Plan Python data source read using MapInArrow

2023-12-12 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 01d61a5fc963 [SPARK-46253][PYTHON] Plan Python data source read using 
MapInArrow
01d61a5fc963 is described below

commit 01d61a5fc963013fcf55bbfb384e06d1c5ec7e3d
Author: allisonwang-db 
AuthorDate: Tue Dec 12 14:05:29 2023 -0800

[SPARK-46253][PYTHON] Plan Python data source read using MapInArrow

### What changes were proposed in this pull request?

This PR changes how we plan Python data source read. Instead of using a 
regular Python UDTF, we can use an arrow UDF and plan the data source read 
using the MapInArrow operator.

### Why are the changes needed?

To improve the performance

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit tests

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #44170 from allisonwang-db/spark-46253-arrow-read.

Authored-by: allisonwang-db 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/errors/error_classes.py |  10 ++
 python/pyspark/sql/tests/test_python_datasource.py | 105 +++-
 python/pyspark/sql/worker/plan_data_source_read.py | 132 +++--
 .../plans/logical/pythonLogicalOperators.scala |   6 +-
 .../datasources/PlanPythonDataSourceScan.scala |  44 ---
 .../python/UserDefinedPythonDataSource.scala   |  21 +++-
 .../execution/python/PythonDataSourceSuite.scala   |  23 ++--
 7 files changed, 287 insertions(+), 54 deletions(-)

diff --git a/python/pyspark/errors/error_classes.py 
b/python/pyspark/errors/error_classes.py
index d2d7f3148f4c..ffe5d692001c 100644
--- a/python/pyspark/errors/error_classes.py
+++ b/python/pyspark/errors/error_classes.py
@@ -752,6 +752,16 @@ ERROR_CLASSES_JSON = """
 "Unable to create the Python data source  because the '' 
method hasn't been implemented."
 ]
   },
+  "PYTHON_DATA_SOURCE_READ_INVALID_RETURN_TYPE" : {
+"message" : [
+"The data type of the returned value ('') from the Python data 
source '' is not supported. Supported types: ."
+]
+  },
+  "PYTHON_DATA_SOURCE_READ_RETURN_SCHEMA_MISMATCH" : {
+"message" : [
+  "The number of columns in the result does not match the required schema. 
Expected column count: , Actual column count: . Please make 
sure the values returned by the 'read' method have the same number of columns 
as required by the output schema."
+]
+  },
   "PYTHON_DATA_SOURCE_TYPE_MISMATCH" : {
 "message" : [
   "Expected , but got ."
diff --git a/python/pyspark/sql/tests/test_python_datasource.py 
b/python/pyspark/sql/tests/test_python_datasource.py
index 8c7074c72a64..74ef6a874589 100644
--- a/python/pyspark/sql/tests/test_python_datasource.py
+++ b/python/pyspark/sql/tests/test_python_datasource.py
@@ -16,9 +16,11 @@
 #
 import os
 import unittest
+from typing import Callable, Union
 
+from pyspark.errors import PythonException
 from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition
-from pyspark.sql.types import Row
+from pyspark.sql.types import Row, StructType
 from pyspark.testing import assertDataFrameEqual
 from pyspark.testing.sqlutils import ReusedSQLTestCase
 from pyspark.testing.utils import SPARK_HOME
@@ -78,6 +80,107 @@ class BasePythonDataSourceTestsMixin:
 df = self.spark.read.format("TestDataSource").load()
 assertDataFrameEqual(df, [Row(c=0, d=1)])
 
+def register_data_source(
+self,
+read_func: Callable,
+partition_func: Callable = None,
+output: Union[str, StructType] = "i int, j int",
+name: str = "test",
+):
+class TestDataSourceReader(DataSourceReader):
+def __init__(self, schema):
+self.schema = schema
+
+def partitions(self):
+if partition_func is not None:
+return partition_func()
+else:
+raise NotImplementedError
+
+def read(self, partition):
+return read_func(self.schema, partition)
+
+class TestDataSource(DataSource):
+@classmethod
+def name(cls):
+return name
+
+def schema(self):
+return output
+
+def reader(self, schema) -> "DataSourceReader":
+return TestDataSourceReader(schema)
+
+self.spark.dataSource.register(TestDataSource)
+
+def test_data_source_read_output_tuple(self):
+self.register_data_source(read_func=lambda schema, partition: 
iter([(0, 1)]))
+df = self.spark.read.format("test").load()
+assertDataFrameEqual(df, [Row(0, 1)])
+
+def test_data_source_read_output_list(self):
+ 

(spark) branch master updated: [SPARK-46202][CONNECT] Expose new ArtifactManager APIs to support custom target directories

2023-12-12 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new dcbebce9eacb [SPARK-46202][CONNECT] Expose new ArtifactManager APIs to 
support custom target directories
dcbebce9eacb is described below

commit dcbebce9eacb201cc8dfac918318be04ada842a8
Author: vicennial 
AuthorDate: Tue Dec 12 14:10:41 2023 -0800

[SPARK-46202][CONNECT] Expose new ArtifactManager APIs to support custom 
target directories

### What changes were proposed in this pull request?
Adds new client APIs to the Spark Connect Scala Client:
- `def addArtifact(bytes: Array[Byte], target: String): Unit`
- `def addArtifact(source: String, target: String): Unit`

### Why are the changes needed?
Currently, without the use of a REPL/Class finder, there is no API to 
support adding artifacts (file-based and in-memory)
 with a custom target directory structure to the remote Spark Connect 
session.

### Does this PR introduce _any_ user-facing change?
Yes.

Users can do the following for classfiles and jars:
```scala
addArtifact("/Users/dummyUser/files/foo/bar.class", 
"sub/directory/foo.class")
addArtifact(bytesBar, "bar.class")
```

This would preserve the directory structure in the remote server. In this 
case, the file would be stored under the directory:
`$ROOT_SESSION_ARTIFACT_DIRECTORY/classes/bar.class`
`$ROOT_SESSION_ARTIFACT_DIRECTORY/classes/sub/directory/foo.class`
### How was this patch tested?

Unit test

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44109 from vicennial/SPARK-46202.

Authored-by: vicennial 
Signed-off-by: Herman van Hovell 
---
 .../scala/org/apache/spark/sql/SparkSession.scala  |  41 
 .../spark/sql/connect/client/ArtifactSuite.scala   |  50 ++
 .../spark/sql/connect/client/ArtifactManager.scala | 108 +
 .../sql/connect/client/SparkConnectClient.scala|  37 +++
 4 files changed, 219 insertions(+), 17 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index daa172e215ad..81c2ca11a7fb 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -590,6 +590,47 @@ class SparkSession private[sql] (
   @Experimental
   def addArtifact(uri: URI): Unit = client.addArtifact(uri)
 
+  /**
+   * Add a single in-memory artifact to the session while preserving the 
directory structure
+   * specified by `target` under the session's working directory of that 
particular file
+   * extension.
+   *
+   * Supported target file extensions are .jar and .class.
+   *
+   * ==Example==
+   * {{{
+   *  addArtifact(bytesBar, "foo/bar.class")
+   *  addArtifact(bytesFlat, "flat.class")
+   *  // Directory structure of the session's working directory for class 
files would look like:
+   *  // ${WORKING_DIR_FOR_CLASS_FILES}/flat.class
+   *  // ${WORKING_DIR_FOR_CLASS_FILES}/foo/bar.class
+   * }}}
+   *
+   * @since 4.0.0
+   */
+  @Experimental
+  def addArtifact(bytes: Array[Byte], target: String): Unit = 
client.addArtifact(bytes, target)
+
+  /**
+   * Add a single artifact to the session while preserving the directory 
structure specified by
+   * `target` under the session's working directory of that particular file 
extension.
+   *
+   * Supported target file extensions are .jar and .class.
+   *
+   * ==Example==
+   * {{{
+   *  addArtifact("/Users/dummyUser/files/foo/bar.class", "foo/bar.class")
+   *  addArtifact("/Users/dummyUser/files/flat.class", "flat.class")
+   *  // Directory structure of the session's working directory for class 
files would look like:
+   *  // ${WORKING_DIR_FOR_CLASS_FILES}/flat.class
+   *  // ${WORKING_DIR_FOR_CLASS_FILES}/foo/bar.class
+   * }}}
+   *
+   * @since 4.0.0
+   */
+  @Experimental
+  def addArtifact(source: String, target: String): Unit = 
client.addArtifact(source, target)
+
   /**
* Add one or more artifacts to the session.
*
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala
index f945313d2427..0c8ef8e599fb 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala
@@ -284,4 +284,54 @@ class ArtifactSuite extends ConnectFunSuite with 
BeforeAndAfterEach {
 }
 
  

(spark) branch master updated: [SPARK-46370][SQL] Fix bug when querying from table after changing column defaults

2023-12-12 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b035bb177c08 [SPARK-46370][SQL] Fix bug when querying from table after 
changing column defaults
b035bb177c08 is described below

commit b035bb177c0875cfb7edb6d8672d4d2ac2813d1b
Author: Daniel Tenedorio 
AuthorDate: Tue Dec 12 14:44:27 2023 -0800

[SPARK-46370][SQL] Fix bug when querying from table after changing column 
defaults

### What changes were proposed in this pull request?

This PR fixes a bug when querying from table after changing defaults:

```
drop table if exists t;
create table t(i int, s string default 'def') using parquet;
insert into t select 1, default;
alter table t alter column s drop default;
insert into t select 2, default;
select * from t;  -- Removing this line changes the following results!
alter table t alter column s set default 'mno';
insert into t select 3, default;
select * from t;
```

The bug is related to the relation cache, and the fix involves adding a 
manual refresh to the cache to make sure we use the right table schema.

### Why are the changes needed?

This PR fixes a correctness bug.

### Does this PR introduce _any_ user-facing change?

Yes, see above.

### How was this patch tested?

This PR adds test coverage.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44302 from dtenedor/fix-default-bug.

Authored-by: Daniel Tenedorio 
Signed-off-by: Wenchen Fan 
---
 .../apache/spark/sql/execution/command/ddl.scala   |  3 +++
 .../org/apache/spark/sql/sources/InsertSuite.scala | 24 ++
 2 files changed, 27 insertions(+)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 7e001803592f..dc1c5b3fd580 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -374,6 +374,9 @@ case class AlterTableChangeColumnCommand(
   // TODO: support change column name/dataType/metadata/position.
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
+// This command may change column default values, so we need to refresh 
the table relation cache
+// here so that DML commands can resolve these default values correctly.
+catalog.refreshTable(tableName)
 val table = catalog.getTableRawMetadata(tableName)
 val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 94535bc84a4c..76073a108a3c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -2608,6 +2608,30 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
 }
   }
 
+  test("SPARK-46370: Querying a table should not invalidate the column 
defaults") {
+withTable("t") {
+  // Create a table and insert some rows into it, changing the default 
value of a column
+  // throughout.
+  spark.sql("CREATE TABLE t(i INT, s STRING DEFAULT 'def') USING CSV")
+  spark.sql("INSERT INTO t SELECT 1, DEFAULT")
+  spark.sql("ALTER TABLE t ALTER COLUMN s DROP DEFAULT")
+  spark.sql("INSERT INTO t SELECT 2, DEFAULT")
+  // Run a query to trigger the table relation cache.
+  val results = spark.table("t").collect()
+  assert(results.length == 2)
+  // Change the column default value and insert another row. Then query 
the table's contents
+  // and the results should be correct.
+  spark.sql("ALTER TABLE t ALTER COLUMN s SET DEFAULT 'mno'")
+  spark.sql("INSERT INTO t SELECT 3, DEFAULT").collect()
+  checkAnswer(
+spark.table("t"),
+Seq(
+  Row(1, "def"),
+  Row(2, null),
+  Row(3, "mno")))
+}
+  }
+
   test("UNSUPPORTED_OVERWRITE.TABLE: Can't overwrite a table that is also 
being read from") {
 val tableName = "t1"
 withTable(tableName) {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-46132][CORE] Support key password for JKS keys for RPC SSL

2023-12-12 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 83434af22284 [SPARK-46132][CORE] Support key password for JKS keys for 
RPC SSL
83434af22284 is described below

commit 83434af22284482d4e805d8be625f733b074928b
Author: Hasnain Lakhani 
AuthorDate: Tue Dec 12 23:35:13 2023 -0600

[SPARK-46132][CORE] Support key password for JKS keys for RPC SSL

### What changes were proposed in this pull request?

Add support for a separate key password in addition to the key store 
password for JKS keys. This is needed for keys which may have a key password in 
addition to a key store password. We already had this support for the UI SSL 
support, so for compatibility we should have it here.

This wasn't done earlier as I wasn't sure how to implement it but the 
discussion in https://github.com/apache/spark/pull/43998#discussion_r1406993411 
suggested the right way.

### Why are the changes needed?

These are needed to support users who may have such keys configured.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added some unit tests

```
build/sbt
> project network-common
> testOnly
```

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #44264 from hasnain-db/separate-pw.

Authored-by: Hasnain Lakhani 
Signed-off-by: Mridul Muralidharan gmail.com>
---
 .../org/apache/spark/network/ssl/SSLFactory.java   |  11 ++-
 .../apache/spark/network/ssl/SSLFactorySuite.java  | 104 +
 2 files changed, 111 insertions(+), 4 deletions(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/ssl/SSLFactory.java
 
b/common/network-common/src/main/java/org/apache/spark/network/ssl/SSLFactory.java
index 0ae83eb5fd68..82951d213011 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/ssl/SSLFactory.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/ssl/SSLFactory.java
@@ -89,7 +89,7 @@ public class SSLFactory {
 
   private void initJdkSslContext(final Builder b)
   throws IOException, GeneralSecurityException {
-this.keyManagers = keyManagers(b.keyStore, b.keyStorePassword);
+this.keyManagers = keyManagers(b.keyStore, b.keyPassword, 
b.keyStorePassword);
 this.trustManagers = trustStoreManagers(
   b.trustStore, b.trustStorePassword,
   b.trustStoreReloadingEnabled, b.trustStoreReloadIntervalMs
@@ -391,13 +391,16 @@ public class SSLFactory {
 }
   }
 
-  private static KeyManager[] keyManagers(File keyStore, String 
keyStorePassword)
+  private static KeyManager[] keyManagers(
+File keyStore, String keyPassword, String keyStorePassword)
   throws NoSuchAlgorithmException, CertificateException,
   KeyStoreException, IOException, UnrecoverableKeyException {
 KeyManagerFactory factory = KeyManagerFactory.getInstance(
   KeyManagerFactory.getDefaultAlgorithm());
-char[] passwordCharacters = keyStorePassword != null? 
keyStorePassword.toCharArray() : null;
-factory.init(loadKeyStore(keyStore, passwordCharacters), 
passwordCharacters);
+char[] keyStorePasswordChars = keyStorePassword != null? 
keyStorePassword.toCharArray() : null;
+char[] keyPasswordChars = keyPassword != null?
+  keyPassword.toCharArray() : keyStorePasswordChars;
+factory.init(loadKeyStore(keyStore, keyStorePasswordChars), 
keyPasswordChars);
 return factory.getKeyManagers();
   }
 
diff --git 
a/common/network-common/src/test/java/org/apache/spark/network/ssl/SSLFactorySuite.java
 
b/common/network-common/src/test/java/org/apache/spark/network/ssl/SSLFactorySuite.java
new file mode 100644
index ..922d0f22c25c
--- /dev/null
+++ 
b/common/network-common/src/test/java/org/apache/spark/network/ssl/SSLFactorySuite.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.ssl;
+
+import java.io.F

(spark) branch master updated (83434af22284 -> 4f65413031d8)

2023-12-12 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 83434af22284 [SPARK-46132][CORE] Support key password for JKS keys for 
RPC SSL
 add 4f65413031d8 [SPARK-46379][PS][TESTS] Reorganize 
`FrameInterpolateTests`

No new revisions were added by this update.

Summary of changes:
 dev/sparktestsupport/modules.py|  6 +-
 .../test_parity_interpolate.py}|  8 +--
 .../test_parity_interpolate_error.py}  |  8 +--
 .../tests/connect/test_parity_frame_interpolate.py | 39 -
 .../test_interpolate.py}   | 49 +++-
 .../test_interpolate_error.py} | 67 +++---
 6 files changed, 42 insertions(+), 135 deletions(-)
 copy python/pyspark/pandas/tests/connect/{window/test_parity_rolling_count.py 
=> frame/test_parity_interpolate.py} (84%)
 copy python/pyspark/pandas/tests/connect/{series/test_parity_interpolate.py => 
frame/test_parity_interpolate_error.py} (83%)
 delete mode 100644 
python/pyspark/pandas/tests/connect/test_parity_frame_interpolate.py
 copy python/pyspark/pandas/tests/{test_frame_interpolate.py => 
frame/test_interpolate.py} (74%)
 rename python/pyspark/pandas/tests/{test_frame_interpolate.py => 
frame/test_interpolate_error.py} (53%)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org