This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 05fd99131de [SPARK-42586][CONNECT] Add RuntimeConfig for Scala Client 05fd99131de is described below commit 05fd99131de01d2d346a2e4c48c4bae27af1f875 Author: Herman van Hovell <her...@databricks.com> AuthorDate: Mon Feb 27 09:05:01 2023 -0400 [SPARK-42586][CONNECT] Add RuntimeConfig for Scala Client ### What changes were proposed in this pull request? This PR adds the RuntimeConfig class for the Spark Connect Scala Client. ### Why are the changes needed? API Parity. ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? Added tests to the ClientE2ETestSuite. Closes #40185 from hvanhovell/SPARK-42586. Authored-by: Herman van Hovell <her...@databricks.com> Signed-off-by: Herman van Hovell <her...@databricks.com> (cherry picked from commit a6f28ca7eab25d6cc1e6bcea1dedc70d36c30a61) Signed-off-by: Herman van Hovell <her...@databricks.com> --- .../scala/org/apache/spark/sql/RuntimeConfig.scala | 162 +++++++++++++++++++++ .../scala/org/apache/spark/sql/SparkSession.scala | 11 ++ .../sql/connect/client/SparkConnectClient.scala | 16 ++ .../org/apache/spark/sql/ClientE2ETestSuite.scala | 19 +++ .../sql/connect/client/CompatibilitySuite.scala | 28 +--- 5 files changed, 214 insertions(+), 22 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala new file mode 100644 index 00000000000..c16bc034488 --- /dev/null +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql + +import org.apache.spark.connect.proto.{ConfigRequest, ConfigResponse, KeyValue} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.connect.client.SparkConnectClient + +/** + * Runtime configuration interface for Spark. To access this, use `SparkSession.conf`. + * + * @since 3.4.0 + */ +class RuntimeConfig(client: SparkConnectClient) extends Logging { + + /** + * Sets the given Spark runtime configuration property. + * + * @since 3.4.0 + */ + def set(key: String, value: String): Unit = { + executeConfigRequest { builder => + builder.getSetBuilder.addPairsBuilder().setKey(key).setValue(value) + } + } + + /** + * Sets the given Spark runtime configuration property. + * + * @since 3.4.0 + */ + def set(key: String, value: Boolean): Unit = set(key, String.valueOf(value)) + + /** + * Sets the given Spark runtime configuration property. + * + * @since 3.4.0 + */ + def set(key: String, value: Long): Unit = set(key, String.valueOf(value)) + + /** + * Returns the value of Spark runtime configuration property for the given key. + * + * @throws java.util.NoSuchElementException + * if the key is not set and does not have a default value + * @since 3.4.0 + */ + @throws[NoSuchElementException]("if the key is not set") + def get(key: String): String = getOption(key).getOrElse { + throw new NoSuchElementException(key) + } + + /** + * Returns the value of Spark runtime configuration property for the given key. + * + * @since 3.4.0 + */ + def get(key: String, default: String): String = { + executeConfigRequestSingleValue { builder => + builder.getGetWithDefaultBuilder.addPairsBuilder().setKey(key).setValue(default) + } + } + + /** + * Returns all properties set in this conf. + * + * @since 3.4.0 + */ + def getAll: Map[String, String] = { + val response = executeConfigRequest { builder => + builder.getGetAllBuilder + } + val builder = Map.newBuilder[String, String] + response.getPairsList.forEach { kv => + require(kv.hasValue) + builder += ((kv.getKey, kv.getValue)) + } + builder.result() + } + + /** + * Returns the value of Spark runtime configuration property for the given key. + * + * @since 3.4.0 + */ + def getOption(key: String): Option[String] = { + val pair = executeConfigRequestSinglePair { builder => + builder.getGetOptionBuilder.addKeys(key) + } + if (pair.hasValue) { + Option(pair.getValue) + } else { + None + } + } + + /** + * Resets the configuration property for the given key. + * + * @since 3.4.0 + */ + def unset(key: String): Unit = { + executeConfigRequest { builder => + builder.getUnsetBuilder.addKeys(key) + } + } + + /** + * Indicates whether the configuration property with the given key is modifiable in the current + * session. + * + * @return + * `true` if the configuration property is modifiable. For static SQL, Spark Core, invalid + * (not existing) and other non-modifiable configuration properties, the returned value is + * `false`. + * @since 3.4.0 + */ + def isModifiable(key: String): Boolean = { + val modifiable = executeConfigRequestSingleValue { builder => + builder.getIsModifiableBuilder.addKeys(key) + } + java.lang.Boolean.valueOf(modifiable) + } + + private def executeConfigRequestSingleValue( + f: ConfigRequest.Operation.Builder => Unit): String = { + val pair = executeConfigRequestSinglePair(f) + require(pair.hasValue, "The returned pair does not have a value set") + pair.getValue + } + + private def executeConfigRequestSinglePair( + f: ConfigRequest.Operation.Builder => Unit): KeyValue = { + val response = executeConfigRequest(f) + require(response.getPairsCount == 1, "") + response.getPairs(0) + } + + private def executeConfigRequest(f: ConfigRequest.Operation.Builder => Unit): ConfigResponse = { + val builder = ConfigRequest.Operation.newBuilder() + f(builder) + val response = client.config(builder.build()) + response.getWarningsList.forEach { warning => + logWarning(warning) + } + response + } +} diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala index e39a6779e25..b1b1f4b0a4e 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -63,6 +63,17 @@ class SparkSession( def version: String = SPARK_VERSION + /** + * Runtime configuration interface for Spark. + * + * This is the interface through which the user can get and set all Spark configurations that + * are relevant to Spark SQL. When getting the value of a config, his defaults to the value set + * in server, if any. + * + * @since 3.4.0 + */ + val conf: RuntimeConfig = new RuntimeConfig(client) + /** * Executes some code block and prints to stdout the time taken to execute the block. This is * available in Scala only and is used primarily for interactive testing and debugging. diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala index 12bb581880c..8b69f75b201 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala @@ -67,6 +67,22 @@ private[sql] class SparkConnectClient( stub.executePlan(request) } + /** + * Dispatch the [[proto.ConfigRequest]] to the Spark Connect server. + * @return + * A [[proto.ConfigResponse]] from the Spark Connect server. + */ + def config(operation: proto.ConfigRequest.Operation): proto.ConfigResponse = { + val request = proto.ConfigRequest + .newBuilder() + .setOperation(operation) + .setClientId(sessionId) + .setClientType(userAgent) + .setUserContext(userContext) + .build() + stub.config(request) + } + /** * Builds a [[proto.AnalyzePlanRequest]] from `plan` and dispatched it to the Spark Connect * server. diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala index 33e9d0756c1..122e7d5d271 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala @@ -427,4 +427,23 @@ class ClientE2ETestSuite extends RemoteSparkSession { val timeFragments = Seq("Time taken: ", " ms") testCapturedStdOut(spark.time(spark.sql("select 1").collect()), timeFragments: _*) } + + test("RuntimeConfig") { + intercept[NoSuchElementException](spark.conf.get("foo.bar")) + assert(spark.conf.getOption("foo.bar").isEmpty) + spark.conf.set("foo.bar", value = true) + assert(spark.conf.getOption("foo.bar") === Option("true")) + spark.conf.set("foo.bar.numBaz", 100L) + assert(spark.conf.get("foo.bar.numBaz") === "100") + spark.conf.set("foo.bar.name", "donkey") + assert(spark.conf.get("foo.bar.name") === "donkey") + spark.conf.unset("foo.bar.name") + val allKeyValues = spark.conf.getAll + assert(allKeyValues("foo.bar") === "true") + assert(allKeyValues("foo.bar.numBaz") === "100") + assert(!spark.conf.isModifiable("foo.bar")) // This is a bit odd. + assert(spark.conf.isModifiable("spark.sql.ansi.enabled")) + assert(!spark.conf.isModifiable("spark.sql.globalTempDatabase")) + intercept[Exception](spark.conf.set("spark.sql.globalTempDatabase", "/dev/null")) + } } diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala index 2c5ea027bb7..5546542898e 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala @@ -80,7 +80,8 @@ class CompatibilitySuite extends ConnectFunSuite { IncludeByName("org.apache.spark.sql.Dataset.*"), IncludeByName("org.apache.spark.sql.functions.*"), IncludeByName("org.apache.spark.sql.RelationalGroupedDataset.*"), - IncludeByName("org.apache.spark.sql.SparkSession.*")) + IncludeByName("org.apache.spark.sql.SparkSession.*"), + IncludeByName("org.apache.spark.sql.RuntimeConfig.*")) val excludeRules = Seq( // Filter unsupported rules: // Note when muting errors for a method, checks on all overloading methods are also muted. @@ -101,15 +102,11 @@ class CompatibilitySuite extends ConnectFunSuite { ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.COL_POS_KEY"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.DATASET_ID_KEY"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.curId"), - ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.groupBy"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.observe"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.queryExecution"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.encoder"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.sqlContext"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.as"), - ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.checkpoint"), - ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.localCheckpoint"), - ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.withWatermark"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.na"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.stat"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.joinWith"), @@ -125,24 +122,11 @@ class CompatibilitySuite extends ConnectFunSuite { ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.foreach"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.foreachPartition"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.persist"), - ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.cache"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.storageLevel"), - ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.unpersist"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.rdd"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.toJavaRDD"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.javaRDD"), - ProblemFilters.exclude[Problem]( - "org.apache.spark.sql.Dataset.registerTempTable" - ), // deprecated - ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.createTempView"), - ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.createOrReplaceTempView"), - ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.createGlobalTempView"), - ProblemFilters.exclude[Problem]( - "org.apache.spark.sql.Dataset.createOrReplaceGlobalTempView"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.writeStream"), - ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.toJSON"), - ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.sameSemantics"), - ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.semanticHash"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.this"), // functions @@ -170,11 +154,9 @@ class CompatibilitySuite extends ConnectFunSuite { ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.setDefaultSession"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.implicits"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sparkContext"), - ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.version"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sharedState"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sessionState"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sqlContext"), - ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.conf"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.listenerManager"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.experimental"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.udf"), @@ -189,7 +171,6 @@ class CompatibilitySuite extends ConnectFunSuite { ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.catalog"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.executeCommand"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.readStream"), - ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.time"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.stop"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.this"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.setActiveSession"), @@ -198,7 +179,10 @@ class CompatibilitySuite extends ConnectFunSuite { ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.clearDefaultSession"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.getActiveSession"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.getDefaultSession"), - ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.range")) + ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.range"), + + // RuntimeConfig + ProblemFilters.exclude[Problem]("org.apache.spark.sql.RuntimeConfig.this")) val problems = allProblems .filter { p => includedRules.exists(rule => rule(p)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org