(spark) branch master updated: [MINOR][SQL] Convert `UnresolvedException` to an internal error
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 071c684dee44 [MINOR][SQL] Convert `UnresolvedException` to an internal error 071c684dee44 is described below commit 071c684dee44665691ddab916021d4920a9ac51b Author: Max Gekk AuthorDate: Tue Dec 12 18:06:48 2023 +0300 [MINOR][SQL] Convert `UnresolvedException` to an internal error ### What changes were proposed in this pull request? In the PR, I propose to change the parent class of `UnresolvedException` from `AnalysisException` to `SparkException` with the error class `INTERNAL_ERROR`. If an user observes the error, this is definitely a bug in the compiler. ### Why are the changes needed? To unify all Spark exceptions, and assign an error class. So, this should improve user experience with Spark SQL. ### Does this PR introduce _any_ user-facing change? No, users shouldn't face to the error in regular cases. ### How was this patch tested? By existing GAs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44311 from MaxGekk/error-class-UnresolvedException. Authored-by: Max Gekk Signed-off-by: Max Gekk --- .../scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala| 5 - 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 97912fb5d592..e1dec5955a7f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -37,7 +37,10 @@ import org.apache.spark.util.ArrayImplicits._ * resolved. */ class UnresolvedException(function: String) - extends AnalysisException(s"Invalid call to $function on unresolved object") + extends SparkException( +errorClass = "INTERNAL_ERROR", +messageParameters = Map("message" -> s"Invalid call to $function on unresolved object"), +cause = null) /** Parent trait for unresolved node types */ trait UnresolvedNode extends LogicalPlan { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-46075][CONNECT] Improvements to SparkConnectSessionManager
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a122b8acc2f4 [SPARK-46075][CONNECT] Improvements to SparkConnectSessionManager a122b8acc2f4 is described below commit a122b8acc2f47c58e8891a5f1464a588f77750e7 Author: Juliusz Sompolski AuthorDate: Tue Dec 12 09:40:56 2023 -0800 [SPARK-46075][CONNECT] Improvements to SparkConnectSessionManager ### What changes were proposed in this pull request? This is factored out from https://github.com/apache/spark/pull/43913 and is a continuation to https://github.com/apache/spark/pull/43546 when SparkConnectSessionManager was introduced. We want to remove the use a Guava cache as session cache, and have our custom logic with more control. This refactors the Session Manager and adds more tests. We introduce a mechanism that mirrors SparkConnectExecutionManager instead. ### Why are the changes needed? With guava cache, only a single "inactivity timeout" can be specified for the whole cache. This can't be for example overriden per session. The actual invalidation also happens not in it's own thread inside guava, but it's work-stealing lazily piggy backed to other operations on the cache, making it opaque when session removal will actually happen. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? SparkConnectSessionManagerSuite added. ### Was this patch authored or co-authored using generative AI tooling? Github Copilot was assisting in some boilerplate auto-completion. Generated-by: Github Copilot Closes #43985 from juliuszsompolski/SPARK-46075. Authored-by: Juliusz Sompolski Signed-off-by: Hyukjin Kwon --- .../apache/spark/sql/connect/config/Connect.scala | 11 +- .../spark/sql/connect/service/ExecuteHolder.scala | 39 ++-- .../spark/sql/connect/service/SessionHolder.scala | 80 ++-- .../service/SparkConnectExecutionManager.scala | 48 +++-- .../service/SparkConnectSessionManager.scala | 224 - .../spark/sql/connect/SparkConnectServerTest.scala | 2 +- .../service/SparkConnectSessionManagerSuite.scala | 137 + 7 files changed, 429 insertions(+), 112 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala index f7aa98af2fa3..ab4f06d508a0 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala @@ -78,7 +78,8 @@ object Connect { val CONNECT_SESSION_MANAGER_DEFAULT_SESSION_TIMEOUT = buildStaticConf("spark.connect.session.manager.defaultSessionTimeout") .internal() - .doc("Timeout after which sessions without any new incoming RPC will be removed.") + .doc("Timeout after which sessions without any new incoming RPC will be removed. " + +"Setting it to -1 indicates that sessions should be kept forever.") .version("4.0.0") .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("60m") @@ -93,6 +94,14 @@ object Connect { .intConf .createWithDefaultString("1000") + val CONNECT_SESSION_MANAGER_MAINTENANCE_INTERVAL = +buildStaticConf("spark.connect.session.manager.maintenanceInterval") + .internal() + .doc("Interval at which session manager will search for expired sessions to remove.") + .version("4.0.0") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("30s") + val CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT = buildStaticConf("spark.connect.execute.manager.detachedTimeout") .internal() diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala index 9e97ded5bf8a..f03f81326064 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala @@ -95,17 +95,17 @@ private[connect] class ExecuteHolder( private val runner: ExecuteThreadRunner = new ExecuteThreadRunner(this) /** System.currentTimeMillis when this ExecuteHolder was created. */ - val creationTime = System.currentTimeMillis() + val creationTimeMs = System.currentTimeMillis() /** * None if there is currently an attached RPC (grpcResponseSenders not empty or during initial * ExecutePlan handler). Otherwise, the System.currentTimeMillis whe
(spark) branch master updated: [SPARK-46353][CORE] Refactor to improve `RegisterWorker` unit test coverage
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2215cef40043 [SPARK-46353][CORE] Refactor to improve `RegisterWorker` unit test coverage 2215cef40043 is described below commit 2215cef40043a3205446f8daecafed8f2360a742 Author: Dongjoon Hyun AuthorDate: Tue Dec 12 09:57:43 2023 -0800 [SPARK-46353][CORE] Refactor to improve `RegisterWorker` unit test coverage ### What changes were proposed in this pull request? This PR aims to improve the unit test coverage for `RegisterWorker` message handling. - Add `handleRegisterWorker` helper method which is testable easily. - Add new unit tests for three conditional branches. ### Why are the changes needed? It's easily to test and improve. We can add more tests in this way in the future. ### Does this PR introduce _any_ user-facing change? No. This is a refactoring on the main code and only additions to the test methods. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44284 from dongjoon-hyun/SPARK-46353. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- .../org/apache/spark/deploy/master/Master.scala| 75 +- .../apache/spark/deploy/master/MasterSuite.scala | 59 - 2 files changed, 102 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index a550f44fc0a4..c8679c185ad7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -37,7 +37,7 @@ import org.apache.spark.internal.config.Deploy._ import org.apache.spark.internal.config.UI._ import org.apache.spark.internal.config.Worker._ import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances} -import org.apache.spark.resource.{ResourceProfile, ResourceRequirement, ResourceUtils} +import org.apache.spark.resource.{ResourceInformation, ResourceProfile, ResourceRequirement, ResourceUtils} import org.apache.spark.rpc._ import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, Serializer} import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils} @@ -75,7 +75,8 @@ private[deploy] class Master( private val waitingApps = new ArrayBuffer[ApplicationInfo] val apps = new HashSet[ApplicationInfo] - private val idToWorker = new HashMap[String, WorkerInfo] + // Visible for testing + private[master] val idToWorker = new HashMap[String, WorkerInfo] private val addressToWorker = new HashMap[RpcAddress, WorkerInfo] private val endpointToApp = new HashMap[RpcEndpointRef, ApplicationInfo] @@ -106,7 +107,7 @@ private[deploy] class Master( private[master] var state = RecoveryState.STANDBY - private var persistenceEngine: PersistenceEngine = _ + private[master] var persistenceEngine: PersistenceEngine = _ private var leaderElectionAgent: LeaderElectionAgent = _ @@ -281,33 +282,8 @@ private[deploy] class Master( case RegisterWorker( id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress, resources) => - logInfo("Registering worker %s:%d with %d cores, %s RAM".format( -workerHost, workerPort, cores, Utils.megabytesToString(memory))) - if (state == RecoveryState.STANDBY) { -workerRef.send(MasterInStandby) - } else if (idToWorker.contains(id)) { -if (idToWorker(id).state == WorkerState.UNKNOWN) { - logInfo("Worker has been re-registered: " + id) - idToWorker(id).state = WorkerState.ALIVE -} -workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress, true)) - } else { -val workerResources = - resources.map(r => r._1 -> WorkerResourceInfo(r._1, r._2.addresses.toImmutableArraySeq)) -val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, - workerRef, workerWebUiUrl, workerResources) -if (registerWorker(worker)) { - persistenceEngine.addWorker(worker) - workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress, false)) - schedule() -} else { - val workerAddress = worker.endpoint.address - logWarning("Worker registration failed. Attempted to re-register worker at same " + -"address: " + workerAddress) - workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: " -+ workerAddress)) -} - } + handleRegisterWorker(id, workerHost, workerPort, workerRef, cores,
(spark) branch master updated: [SPARK-46378][SQL] Still remove Sort after converting Aggregate to Project
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c1ba963e64a2 [SPARK-46378][SQL] Still remove Sort after converting Aggregate to Project c1ba963e64a2 is described below commit c1ba963e64a22dea28e17b1ed954e6d03d38da1e Author: Wenchen Fan AuthorDate: Tue Dec 12 10:04:40 2023 -0800 [SPARK-46378][SQL] Still remove Sort after converting Aggregate to Project ### What changes were proposed in this pull request? This is a follow-up of https://github.com/apache/spark/pull/33397 to avoid sub-optimal plans. After converting `Aggregate` to `Project`, there is information lost: `Aggregate` doesn't care about the data order of inputs, but `Project` cares. `EliminateSorts` can remove `Sort` below `Aggregate`, but it doesn't work anymore if we convert `Aggregate` to `Project`. This PR fixes this issue by tagging the `Project` to be order-irrelevant if it's converted from `Aggregate`. Then `EliminateSorts` optimizes the tagged `Project`. ### Why are the changes needed? avoid sub-optimal plans ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? new test ### Was this patch authored or co-authored using generative AI tooling? No Closes #44310 from cloud-fan/sort. Authored-by: Wenchen Fan Signed-off-by: Wenchen Fan --- .../scala/org/apache/spark/sql/catalyst/dsl/package.scala| 2 ++ .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 6 +- .../sql/catalyst/plans/logical/basicLogicalOperators.scala | 3 +++ .../spark/sql/catalyst/optimizer/EliminateSortsSuite.scala | 12 4 files changed, 22 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 30d4c2dbb409..eb3047700215 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -395,6 +395,8 @@ package object dsl { def limit(limitExpr: Expression): LogicalPlan = Limit(limitExpr, logicalPlan) + def localLimit(limitExpr: Expression): LogicalPlan = LocalLimit(limitExpr, logicalPlan) + def offset(offsetExpr: Expression): LogicalPlan = Offset(offsetExpr, logicalPlan) def join( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 960f5e532c08..a4b25cbd1d2e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -769,7 +769,9 @@ object LimitPushDown extends Rule[LogicalPlan] { LocalLimit(exp, project.copy(child = pushLocalLimitThroughJoin(exp, join))) // Push down limit 1 through Aggregate and turn Aggregate into Project if it is group only. case Limit(le @ IntegerLiteral(1), a: Aggregate) if a.groupOnly => - Limit(le, Project(a.aggregateExpressions, LocalLimit(le, a.child))) + val project = Project(a.aggregateExpressions, LocalLimit(le, a.child)) + project.setTagValue(Project.dataOrderIrrelevantTag, ()) + Limit(le, project) case Limit(le @ IntegerLiteral(1), p @ Project(_, a: Aggregate)) if a.groupOnly => Limit(le, p.copy(child = Project(a.aggregateExpressions, LocalLimit(le, a.child // Merge offset value and limit value into LocalLimit and pushes down LocalLimit through Offset. @@ -1583,6 +1585,8 @@ object EliminateSorts extends Rule[LogicalPlan] { right = recursiveRemoveSort(originRight, true)) case g @ Aggregate(_, aggs, originChild) if isOrderIrrelevantAggs(aggs) => g.copy(child = recursiveRemoveSort(originChild, true)) +case p: Project if p.getTagValue(Project.dataOrderIrrelevantTag).isDefined => + p.copy(child = recursiveRemoveSort(p.child, true)) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 497f485b67fe..65f4151c0c96 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -101,6 +101,9 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) object Project { val hiddenOutputTag: TreeNodeTag[Seq[Attribute]] = TreeNodeTag[Seq[Attribute]]
(spark) branch master updated: [SPARK-46253][PYTHON] Plan Python data source read using MapInArrow
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 01d61a5fc963 [SPARK-46253][PYTHON] Plan Python data source read using MapInArrow 01d61a5fc963 is described below commit 01d61a5fc963013fcf55bbfb384e06d1c5ec7e3d Author: allisonwang-db AuthorDate: Tue Dec 12 14:05:29 2023 -0800 [SPARK-46253][PYTHON] Plan Python data source read using MapInArrow ### What changes were proposed in this pull request? This PR changes how we plan Python data source read. Instead of using a regular Python UDTF, we can use an arrow UDF and plan the data source read using the MapInArrow operator. ### Why are the changes needed? To improve the performance ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #44170 from allisonwang-db/spark-46253-arrow-read. Authored-by: allisonwang-db Signed-off-by: Hyukjin Kwon --- python/pyspark/errors/error_classes.py | 10 ++ python/pyspark/sql/tests/test_python_datasource.py | 105 +++- python/pyspark/sql/worker/plan_data_source_read.py | 132 +++-- .../plans/logical/pythonLogicalOperators.scala | 6 +- .../datasources/PlanPythonDataSourceScan.scala | 44 --- .../python/UserDefinedPythonDataSource.scala | 21 +++- .../execution/python/PythonDataSourceSuite.scala | 23 ++-- 7 files changed, 287 insertions(+), 54 deletions(-) diff --git a/python/pyspark/errors/error_classes.py b/python/pyspark/errors/error_classes.py index d2d7f3148f4c..ffe5d692001c 100644 --- a/python/pyspark/errors/error_classes.py +++ b/python/pyspark/errors/error_classes.py @@ -752,6 +752,16 @@ ERROR_CLASSES_JSON = """ "Unable to create the Python data source because the '' method hasn't been implemented." ] }, + "PYTHON_DATA_SOURCE_READ_INVALID_RETURN_TYPE" : { +"message" : [ +"The data type of the returned value ('') from the Python data source '' is not supported. Supported types: ." +] + }, + "PYTHON_DATA_SOURCE_READ_RETURN_SCHEMA_MISMATCH" : { +"message" : [ + "The number of columns in the result does not match the required schema. Expected column count: , Actual column count: . Please make sure the values returned by the 'read' method have the same number of columns as required by the output schema." +] + }, "PYTHON_DATA_SOURCE_TYPE_MISMATCH" : { "message" : [ "Expected , but got ." diff --git a/python/pyspark/sql/tests/test_python_datasource.py b/python/pyspark/sql/tests/test_python_datasource.py index 8c7074c72a64..74ef6a874589 100644 --- a/python/pyspark/sql/tests/test_python_datasource.py +++ b/python/pyspark/sql/tests/test_python_datasource.py @@ -16,9 +16,11 @@ # import os import unittest +from typing import Callable, Union +from pyspark.errors import PythonException from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition -from pyspark.sql.types import Row +from pyspark.sql.types import Row, StructType from pyspark.testing import assertDataFrameEqual from pyspark.testing.sqlutils import ReusedSQLTestCase from pyspark.testing.utils import SPARK_HOME @@ -78,6 +80,107 @@ class BasePythonDataSourceTestsMixin: df = self.spark.read.format("TestDataSource").load() assertDataFrameEqual(df, [Row(c=0, d=1)]) +def register_data_source( +self, +read_func: Callable, +partition_func: Callable = None, +output: Union[str, StructType] = "i int, j int", +name: str = "test", +): +class TestDataSourceReader(DataSourceReader): +def __init__(self, schema): +self.schema = schema + +def partitions(self): +if partition_func is not None: +return partition_func() +else: +raise NotImplementedError + +def read(self, partition): +return read_func(self.schema, partition) + +class TestDataSource(DataSource): +@classmethod +def name(cls): +return name + +def schema(self): +return output + +def reader(self, schema) -> "DataSourceReader": +return TestDataSourceReader(schema) + +self.spark.dataSource.register(TestDataSource) + +def test_data_source_read_output_tuple(self): +self.register_data_source(read_func=lambda schema, partition: iter([(0, 1)])) +df = self.spark.read.format("test").load() +assertDataFrameEqual(df, [Row(0, 1)]) + +def test_data_source_read_output_list(self): +
(spark) branch master updated: [SPARK-46202][CONNECT] Expose new ArtifactManager APIs to support custom target directories
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new dcbebce9eacb [SPARK-46202][CONNECT] Expose new ArtifactManager APIs to support custom target directories dcbebce9eacb is described below commit dcbebce9eacb201cc8dfac918318be04ada842a8 Author: vicennial AuthorDate: Tue Dec 12 14:10:41 2023 -0800 [SPARK-46202][CONNECT] Expose new ArtifactManager APIs to support custom target directories ### What changes were proposed in this pull request? Adds new client APIs to the Spark Connect Scala Client: - `def addArtifact(bytes: Array[Byte], target: String): Unit` - `def addArtifact(source: String, target: String): Unit` ### Why are the changes needed? Currently, without the use of a REPL/Class finder, there is no API to support adding artifacts (file-based and in-memory) with a custom target directory structure to the remote Spark Connect session. ### Does this PR introduce _any_ user-facing change? Yes. Users can do the following for classfiles and jars: ```scala addArtifact("/Users/dummyUser/files/foo/bar.class", "sub/directory/foo.class") addArtifact(bytesBar, "bar.class") ``` This would preserve the directory structure in the remote server. In this case, the file would be stored under the directory: `$ROOT_SESSION_ARTIFACT_DIRECTORY/classes/bar.class` `$ROOT_SESSION_ARTIFACT_DIRECTORY/classes/sub/directory/foo.class` ### How was this patch tested? Unit test ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44109 from vicennial/SPARK-46202. Authored-by: vicennial Signed-off-by: Herman van Hovell --- .../scala/org/apache/spark/sql/SparkSession.scala | 41 .../spark/sql/connect/client/ArtifactSuite.scala | 50 ++ .../spark/sql/connect/client/ArtifactManager.scala | 108 + .../sql/connect/client/SparkConnectClient.scala| 37 +++ 4 files changed, 219 insertions(+), 17 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala index daa172e215ad..81c2ca11a7fb 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -590,6 +590,47 @@ class SparkSession private[sql] ( @Experimental def addArtifact(uri: URI): Unit = client.addArtifact(uri) + /** + * Add a single in-memory artifact to the session while preserving the directory structure + * specified by `target` under the session's working directory of that particular file + * extension. + * + * Supported target file extensions are .jar and .class. + * + * ==Example== + * {{{ + * addArtifact(bytesBar, "foo/bar.class") + * addArtifact(bytesFlat, "flat.class") + * // Directory structure of the session's working directory for class files would look like: + * // ${WORKING_DIR_FOR_CLASS_FILES}/flat.class + * // ${WORKING_DIR_FOR_CLASS_FILES}/foo/bar.class + * }}} + * + * @since 4.0.0 + */ + @Experimental + def addArtifact(bytes: Array[Byte], target: String): Unit = client.addArtifact(bytes, target) + + /** + * Add a single artifact to the session while preserving the directory structure specified by + * `target` under the session's working directory of that particular file extension. + * + * Supported target file extensions are .jar and .class. + * + * ==Example== + * {{{ + * addArtifact("/Users/dummyUser/files/foo/bar.class", "foo/bar.class") + * addArtifact("/Users/dummyUser/files/flat.class", "flat.class") + * // Directory structure of the session's working directory for class files would look like: + * // ${WORKING_DIR_FOR_CLASS_FILES}/flat.class + * // ${WORKING_DIR_FOR_CLASS_FILES}/foo/bar.class + * }}} + * + * @since 4.0.0 + */ + @Experimental + def addArtifact(source: String, target: String): Unit = client.addArtifact(source, target) + /** * Add one or more artifacts to the session. * diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala index f945313d2427..0c8ef8e599fb 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala @@ -284,4 +284,54 @@ class ArtifactSuite extends ConnectFunSuite with BeforeAndAfterEach { }
(spark) branch master updated: [SPARK-46370][SQL] Fix bug when querying from table after changing column defaults
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b035bb177c08 [SPARK-46370][SQL] Fix bug when querying from table after changing column defaults b035bb177c08 is described below commit b035bb177c0875cfb7edb6d8672d4d2ac2813d1b Author: Daniel Tenedorio AuthorDate: Tue Dec 12 14:44:27 2023 -0800 [SPARK-46370][SQL] Fix bug when querying from table after changing column defaults ### What changes were proposed in this pull request? This PR fixes a bug when querying from table after changing defaults: ``` drop table if exists t; create table t(i int, s string default 'def') using parquet; insert into t select 1, default; alter table t alter column s drop default; insert into t select 2, default; select * from t; -- Removing this line changes the following results! alter table t alter column s set default 'mno'; insert into t select 3, default; select * from t; ``` The bug is related to the relation cache, and the fix involves adding a manual refresh to the cache to make sure we use the right table schema. ### Why are the changes needed? This PR fixes a correctness bug. ### Does this PR introduce _any_ user-facing change? Yes, see above. ### How was this patch tested? This PR adds test coverage. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44302 from dtenedor/fix-default-bug. Authored-by: Daniel Tenedorio Signed-off-by: Wenchen Fan --- .../apache/spark/sql/execution/command/ddl.scala | 3 +++ .../org/apache/spark/sql/sources/InsertSuite.scala | 24 ++ 2 files changed, 27 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 7e001803592f..dc1c5b3fd580 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -374,6 +374,9 @@ case class AlterTableChangeColumnCommand( // TODO: support change column name/dataType/metadata/position. override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog +// This command may change column default values, so we need to refresh the table relation cache +// here so that DML commands can resolve these default values correctly. +catalog.refreshTable(tableName) val table = catalog.getTableRawMetadata(tableName) val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 94535bc84a4c..76073a108a3c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -2608,6 +2608,30 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { } } + test("SPARK-46370: Querying a table should not invalidate the column defaults") { +withTable("t") { + // Create a table and insert some rows into it, changing the default value of a column + // throughout. + spark.sql("CREATE TABLE t(i INT, s STRING DEFAULT 'def') USING CSV") + spark.sql("INSERT INTO t SELECT 1, DEFAULT") + spark.sql("ALTER TABLE t ALTER COLUMN s DROP DEFAULT") + spark.sql("INSERT INTO t SELECT 2, DEFAULT") + // Run a query to trigger the table relation cache. + val results = spark.table("t").collect() + assert(results.length == 2) + // Change the column default value and insert another row. Then query the table's contents + // and the results should be correct. + spark.sql("ALTER TABLE t ALTER COLUMN s SET DEFAULT 'mno'") + spark.sql("INSERT INTO t SELECT 3, DEFAULT").collect() + checkAnswer( +spark.table("t"), +Seq( + Row(1, "def"), + Row(2, null), + Row(3, "mno"))) +} + } + test("UNSUPPORTED_OVERWRITE.TABLE: Can't overwrite a table that is also being read from") { val tableName = "t1" withTable(tableName) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-46132][CORE] Support key password for JKS keys for RPC SSL
This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 83434af22284 [SPARK-46132][CORE] Support key password for JKS keys for RPC SSL 83434af22284 is described below commit 83434af22284482d4e805d8be625f733b074928b Author: Hasnain Lakhani AuthorDate: Tue Dec 12 23:35:13 2023 -0600 [SPARK-46132][CORE] Support key password for JKS keys for RPC SSL ### What changes were proposed in this pull request? Add support for a separate key password in addition to the key store password for JKS keys. This is needed for keys which may have a key password in addition to a key store password. We already had this support for the UI SSL support, so for compatibility we should have it here. This wasn't done earlier as I wasn't sure how to implement it but the discussion in https://github.com/apache/spark/pull/43998#discussion_r1406993411 suggested the right way. ### Why are the changes needed? These are needed to support users who may have such keys configured. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added some unit tests ``` build/sbt > project network-common > testOnly ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #44264 from hasnain-db/separate-pw. Authored-by: Hasnain Lakhani Signed-off-by: Mridul Muralidharan gmail.com> --- .../org/apache/spark/network/ssl/SSLFactory.java | 11 ++- .../apache/spark/network/ssl/SSLFactorySuite.java | 104 + 2 files changed, 111 insertions(+), 4 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/ssl/SSLFactory.java b/common/network-common/src/main/java/org/apache/spark/network/ssl/SSLFactory.java index 0ae83eb5fd68..82951d213011 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/ssl/SSLFactory.java +++ b/common/network-common/src/main/java/org/apache/spark/network/ssl/SSLFactory.java @@ -89,7 +89,7 @@ public class SSLFactory { private void initJdkSslContext(final Builder b) throws IOException, GeneralSecurityException { -this.keyManagers = keyManagers(b.keyStore, b.keyStorePassword); +this.keyManagers = keyManagers(b.keyStore, b.keyPassword, b.keyStorePassword); this.trustManagers = trustStoreManagers( b.trustStore, b.trustStorePassword, b.trustStoreReloadingEnabled, b.trustStoreReloadIntervalMs @@ -391,13 +391,16 @@ public class SSLFactory { } } - private static KeyManager[] keyManagers(File keyStore, String keyStorePassword) + private static KeyManager[] keyManagers( +File keyStore, String keyPassword, String keyStorePassword) throws NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, UnrecoverableKeyException { KeyManagerFactory factory = KeyManagerFactory.getInstance( KeyManagerFactory.getDefaultAlgorithm()); -char[] passwordCharacters = keyStorePassword != null? keyStorePassword.toCharArray() : null; -factory.init(loadKeyStore(keyStore, passwordCharacters), passwordCharacters); +char[] keyStorePasswordChars = keyStorePassword != null? keyStorePassword.toCharArray() : null; +char[] keyPasswordChars = keyPassword != null? + keyPassword.toCharArray() : keyStorePasswordChars; +factory.init(loadKeyStore(keyStore, keyStorePasswordChars), keyPasswordChars); return factory.getKeyManagers(); } diff --git a/common/network-common/src/test/java/org/apache/spark/network/ssl/SSLFactorySuite.java b/common/network-common/src/test/java/org/apache/spark/network/ssl/SSLFactorySuite.java new file mode 100644 index ..922d0f22c25c --- /dev/null +++ b/common/network-common/src/test/java/org/apache/spark/network/ssl/SSLFactorySuite.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.network.ssl; + +import java.io.F
(spark) branch master updated (83434af22284 -> 4f65413031d8)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 83434af22284 [SPARK-46132][CORE] Support key password for JKS keys for RPC SSL add 4f65413031d8 [SPARK-46379][PS][TESTS] Reorganize `FrameInterpolateTests` No new revisions were added by this update. Summary of changes: dev/sparktestsupport/modules.py| 6 +- .../test_parity_interpolate.py}| 8 +-- .../test_parity_interpolate_error.py} | 8 +-- .../tests/connect/test_parity_frame_interpolate.py | 39 - .../test_interpolate.py} | 49 +++- .../test_interpolate_error.py} | 67 +++--- 6 files changed, 42 insertions(+), 135 deletions(-) copy python/pyspark/pandas/tests/connect/{window/test_parity_rolling_count.py => frame/test_parity_interpolate.py} (84%) copy python/pyspark/pandas/tests/connect/{series/test_parity_interpolate.py => frame/test_parity_interpolate_error.py} (83%) delete mode 100644 python/pyspark/pandas/tests/connect/test_parity_frame_interpolate.py copy python/pyspark/pandas/tests/{test_frame_interpolate.py => frame/test_interpolate.py} (74%) rename python/pyspark/pandas/tests/{test_frame_interpolate.py => frame/test_interpolate_error.py} (53%) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org