This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 454fe9c [SPARK-38419][SQL] Change spark.sessionstate.conf.getConf/setConf operation to spark.conf.get/set 454fe9c is described below commit 454fe9c8eb1903e1f199e4d5e13b171e519f793e Author: jackylee-ch <lijunq...@baidu.com> AuthorDate: Thu Mar 31 18:50:32 2022 +0800 [SPARK-38419][SQL] Change spark.sessionstate.conf.getConf/setConf operation to spark.conf.get/set ### Why are the changes needed? In the sql module, we provide `SparkSession.conf` as a unified entry for `SQLConf.set/get`, which can prevent users or logic from modifying StaticSQLConf and Spark configs. However, I found `SparkSession.sessionstate.conf` is used in some code to getConf or setConf, which can skip the check of `RuntimeConfig`. In this PR, we want to unify the behavior of `SQLConf.getConf/setConf` to `SparkSession.conf`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Origin GA Closes #35950 from jackylee-ch/change_spark_sessionstate_conf_operation_to_spark_conf. Lead-authored-by: jackylee-ch <lijunq...@baidu.com> Co-authored-by: Jacky Lee <qcsd2...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../src/main/scala/org/apache/spark/sql/Dataset.scala | 4 ++-- .../main/scala/org/apache/spark/sql/RuntimeConfig.scala | 8 ++++++++ .../main/scala/org/apache/spark/sql/SparkSession.scala | 4 ++-- .../org/apache/spark/sql/execution/CacheManager.scala | 2 +- .../org/apache/spark/sql/execution/command/views.scala | 2 +- .../spark/sql/execution/datasources/DataSource.scala | 3 +-- .../org/apache/spark/sql/execution/datasources/ddl.scala | 2 +- .../spark/sql/execution/streaming/StreamExecution.scala | 2 +- .../spark/sql/streaming/StreamingQueryManager.scala | 2 +- .../org/apache/spark/sql/FileBasedDataSourceSuite.scala | 2 +- .../test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 6 +++--- .../org/apache/spark/sql/SparkSessionBuilderSuite.scala | 8 ++++---- .../apache/spark/sql/SparkSessionExtensionSuite.scala | 4 ++-- .../spark/sql/connector/DataSourceV2SQLSuite.scala | 8 ++++---- .../state/RocksDBStateStoreIntegrationSuite.scala | 2 +- .../spark/sql/expressions/ExpressionInfoSuite.scala | 2 +- .../org/apache/spark/sql/internal/SQLConfSuite.scala | 16 ++++++++-------- .../org/apache/spark/sql/sources/BucketedReadSuite.scala | 2 +- .../apache/spark/sql/streaming/FileStreamSinkSuite.scala | 2 +- .../spark/sql/streaming/FileStreamSourceSuite.scala | 4 ++-- 20 files changed, 46 insertions(+), 39 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 62dea96..5e5de1f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -218,7 +218,7 @@ class Dataset[T] private[sql]( @transient private[sql] val logicalPlan: LogicalPlan = { val plan = queryExecution.commandExecuted - if (sparkSession.sessionState.conf.getConf(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED)) { + if (sparkSession.conf.get(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED)) { val dsIds = plan.getTagValue(Dataset.DATASET_ID_TAG).getOrElse(new HashSet[Long]) dsIds.add(id) plan.setTagValue(Dataset.DATASET_ID_TAG, dsIds) @@ -1426,7 +1426,7 @@ class Dataset[T] private[sql]( private def addDataFrameIdToCol(expr: NamedExpression): NamedExpression = { val newExpr = expr transform { case a: AttributeReference - if sparkSession.sessionState.conf.getConf(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED) => + if sparkSession.conf.get(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED) => val metadata = new MetadataBuilder() .withMetadata(a.metadata) .putLong(Dataset.DATASET_ID_KEY, id) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala index 6c9b150..a3237cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala @@ -61,6 +61,14 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) { } /** + * Sets the given Spark runtime configuration property. + */ + protected[sql] def set[T](entry: ConfigEntry[T], value: T): Unit = { + requireNonStaticConf(entry.key) + sqlConf.setConf(entry, value) + } + + /** * Returns the value of Spark runtime configuration property for the given key. * * @throws java.util.NoSuchElementException if the key is not set and does not have a default diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 734b8e5..5b212c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -1104,13 +1104,13 @@ object SparkSession extends Logging { private[sql] def getOrCloneSessionWithConfigsOff( session: SparkSession, configurations: Seq[ConfigEntry[Boolean]]): SparkSession = { - val configsEnabled = configurations.filter(session.sessionState.conf.getConf(_)) + val configsEnabled = configurations.filter(session.conf.get[Boolean]) if (configsEnabled.isEmpty) { session } else { val newSession = session.cloneSession() configsEnabled.foreach(conf => { - newSession.sessionState.conf.setConf(conf, false) + newSession.conf.set(conf, false) }) newSession } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 27d6bed..527f78e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -332,7 +332,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { * If CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING is enabled, just return original session. */ private def getOrCloneSessionWithConfigsOff(session: SparkSession): SparkSession = { - if (session.sessionState.conf.getConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING)) { + if (session.conf.get(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING)) { session } else { SparkSession.getOrCloneSessionWithConfigsOff(session, forceDisableConfigs) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index eca48a6..3a2e32c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -123,7 +123,7 @@ case class CreateViewCommand( referredTempFunctions) catalog.createTempView(name.table, tableDefinition, overrideIfExists = replace) } else if (viewType == GlobalTempView) { - val db = sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE) + val db = sparkSession.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE) val viewIdent = TableIdentifier(name.table, Option(db)) val aliasedPlan = aliasPlan(sparkSession, analyzedPlan) val tableDefinition = createTemporaryViewRelation( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 2bb3d48..6b3dfac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -267,8 +267,7 @@ case class DataSource( checkAndGlobPathIfNecessary(checkEmptyGlobPath = false, checkFilesExist = false) createInMemoryFileIndex(globbedPaths) }) - val forceNullable = - sparkSession.sessionState.conf.getConf(SQLConf.FILE_SOURCE_SCHEMA_FORCE_NULLABLE) + val forceNullable = sparkSession.conf.get(SQLConf.FILE_SOURCE_SCHEMA_FORCE_NULLABLE) val sourceDataSchema = if (forceNullable) dataSchema.asNullable else dataSchema SourceInfo( s"FileSource[$path]", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index 9a81f2a..dc5894e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -106,7 +106,7 @@ case class CreateTempViewUsing( }.logicalPlan if (global) { - val db = sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE) + val db = sparkSession.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE) val viewIdent = TableIdentifier(tableIdent.table, Option(db)) val viewDefinition = createTemporaryViewRelation( viewIdent, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index f9ae65c..324a833 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -417,7 +417,7 @@ abstract class StreamExecution( @throws[TimeoutException] protected def interruptAndAwaitExecutionThreadTermination(): Unit = { val timeout = math.max( - sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_STOP_TIMEOUT), 0) + sparkSession.conf.get(SQLConf.STREAMING_STOP_TIMEOUT), 0) queryExecutionThread.interrupt() queryExecutionThread.join(timeout) if (queryExecutionThread.isAlive) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 4e1c7cc..6548d5f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -343,7 +343,7 @@ class StreamingQueryManager private[sql] ( .orElse(activeQueries.get(query.id)) // shouldn't be needed but paranoia ... val shouldStopActiveRun = - sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART) + sparkSession.conf.get(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART) if (activeOption.isDefined) { if (shouldStopActiveRun) { val oldQuery = activeOption.get diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index bc7a7b2..e4d6eb3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -53,7 +53,7 @@ class FileBasedDataSourceSuite extends QueryTest override def beforeAll(): Unit = { super.beforeAll() - spark.sessionState.conf.setConf(SQLConf.ORC_IMPLEMENTATION, "native") + spark.conf.set(SQLConf.ORC_IMPLEMENTATION, "native") } override def afterAll(): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index c28dde9..f7c35ec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2556,17 +2556,17 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark val originalValue = newSession.sessionState.conf.runSQLonFile try { - newSession.sessionState.conf.setConf(SQLConf.RUN_SQL_ON_FILES, false) + newSession.conf.set(SQLConf.RUN_SQL_ON_FILES, false) intercept[AnalysisException] { newSession.sql(s"SELECT i, j FROM parquet.`${path.getCanonicalPath}`") } - newSession.sessionState.conf.setConf(SQLConf.RUN_SQL_ON_FILES, true) + newSession.conf.set(SQLConf.RUN_SQL_ON_FILES, true) checkAnswer( newSession.sql(s"SELECT i, j FROM parquet.`${path.getCanonicalPath}`"), Row(1, "a")) } finally { - newSession.sessionState.conf.setConf(SQLConf.RUN_SQL_ON_FILES, originalValue) + newSession.conf.set(SQLConf.RUN_SQL_ON_FILES, originalValue) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala index 0a7c684..ee8b6d2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala @@ -199,11 +199,11 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach wit .config("spark.app.name", "test-app-SPARK-31234") .getOrCreate() - assert(session.sessionState.conf.getConfString("spark.app.name") === "test-app-SPARK-31234") - assert(session.sessionState.conf.getConf(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31234") + assert(session.conf.get("spark.app.name") === "test-app-SPARK-31234") + assert(session.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31234") session.sql("RESET") - assert(session.sessionState.conf.getConfString("spark.app.name") === "test-app-SPARK-31234") - assert(session.sessionState.conf.getConf(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31234") + assert(session.conf.get("spark.app.name") === "test-app-SPARK-31234") + assert(session.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31234") } test("SPARK-31354: SparkContext only register one SparkSession ApplicationEnd listener") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index 3577812..17124cc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala @@ -170,7 +170,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite { MyColumnarRule(MyNewQueryStageRule(), MyNewQueryStageRule())) } withSession(extensions) { session => - session.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED, true) + session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED, true) assert(session.sessionState.queryStagePrepRules.contains(MyQueryStagePrepRule())) assert(session.sessionState.columnarRules.contains( MyColumnarRule(MyNewQueryStageRule(), MyNewQueryStageRule()))) @@ -209,7 +209,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite { MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())) } withSession(extensions) { session => - session.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED, enableAQE) + session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED, enableAQE) assert(session.sessionState.columnarRules.contains( MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule()))) import session.sqlContext.implicits._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 6f14b1f..98dec46 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -1927,7 +1927,7 @@ class DataSourceV2SQLSuite } test("global temp view should not be masked by v2 catalog") { - val globalTempDB = spark.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE) + val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE) spark.conf.set(s"spark.sql.catalog.$globalTempDB", classOf[InMemoryTableCatalog].getName) try { @@ -1941,7 +1941,7 @@ class DataSourceV2SQLSuite } test("SPARK-30104: global temp db is used as a table name under v2 catalog") { - val globalTempDB = spark.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE) + val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE) val t = s"testcat.$globalTempDB" withTable(t) { sql(s"CREATE TABLE $t (id bigint, data string) USING foo") @@ -1952,7 +1952,7 @@ class DataSourceV2SQLSuite } test("SPARK-30104: v2 catalog named global_temp will be masked") { - val globalTempDB = spark.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE) + val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE) spark.conf.set(s"spark.sql.catalog.$globalTempDB", classOf[InMemoryTableCatalog].getName) val e = intercept[AnalysisException] { @@ -2132,7 +2132,7 @@ class DataSourceV2SQLSuite } intercept[AnalysisException](sql("COMMENT ON TABLE testcat.abc IS NULL")) - val globalTempDB = spark.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE) + val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE) spark.conf.set(s"spark.sql.catalog.$globalTempDB", classOf[InMemoryTableCatalog].getName) withTempView("v") { sql("create global temp view v as select 1") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala index 25d0a80..b50ac71 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala @@ -107,7 +107,7 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest { testQuietly("SPARK-36519: store RocksDB format version in the checkpoint") { def getFormatVersion(query: StreamingQuery): Int = { query.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution.sparkSession - .sessionState.conf.getConf(SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION) + .conf.get(SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION) } withSQLConf( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala index 7f25831..d568b7a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala @@ -199,7 +199,7 @@ class ExpressionInfoSuite extends SparkFunSuite with SharedSparkSession { // Examples can change settings. We clone the session to prevent tests clashing. val clonedSpark = spark.cloneSession() // Coalescing partitions can change result order, so disable it. - clonedSpark.sessionState.conf.setConf(SQLConf.COALESCE_PARTITIONS_ENABLED, false) + clonedSpark.conf.set(SQLConf.COALESCE_PARTITIONS_ENABLED, false) val info = clonedSpark.sessionState.catalog.lookupFunctionInfo(funcId) val className = info.getClassName if (!ignoreSet.contains(className)) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala index a589d4e..55d91ae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala @@ -343,10 +343,10 @@ class SQLConfSuite extends QueryTest with SharedSparkSession { assert(spark.sessionState.conf.parquetOutputTimestampType == SQLConf.ParquetOutputTimestampType.INT96) - spark.sessionState.conf.setConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "timestamp_micros") + spark.conf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "timestamp_micros") assert(spark.sessionState.conf.parquetOutputTimestampType == SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS) - spark.sessionState.conf.setConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "int96") + spark.conf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "int96") assert(spark.sessionState.conf.parquetOutputTimestampType == SQLConf.ParquetOutputTimestampType.INT96) @@ -362,9 +362,9 @@ class SQLConfSuite extends QueryTest with SharedSparkSession { val fallback = SQLConf.buildConf("spark.sql.__test__.spark_22779") .fallbackConf(SQLConf.PARQUET_COMPRESSION) - assert(spark.sessionState.conf.getConfString(fallback.key) === + assert(spark.conf.get(fallback.key) === SQLConf.PARQUET_COMPRESSION.defaultValue.get) - assert(spark.sessionState.conf.getConfString(fallback.key, "lzo") === "lzo") + assert(spark.conf.get(fallback.key, "lzo") === "lzo") val displayValue = spark.sessionState.conf.getAllDefinedConfs .find { case (key, _, _, _) => key == fallback.key } @@ -372,11 +372,11 @@ class SQLConfSuite extends QueryTest with SharedSparkSession { .get assert(displayValue === fallback.defaultValueString) - spark.sessionState.conf.setConf(SQLConf.PARQUET_COMPRESSION, "gzip") - assert(spark.sessionState.conf.getConfString(fallback.key) === "gzip") + spark.conf.set(SQLConf.PARQUET_COMPRESSION, "gzip") + assert(spark.conf.get(fallback.key) === "gzip") - spark.sessionState.conf.setConf(fallback, "lzo") - assert(spark.sessionState.conf.getConfString(fallback.key) === "lzo") + spark.conf.set(fallback, "lzo") + assert(spark.conf.get(fallback.key) === "lzo") val newDisplayValue = spark.sessionState.conf.getAllDefinedConfs .find { case (key, _, _, _) => key == fallback.key } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 18039db..921cff2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -53,7 +53,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti protected override def beforeAll(): Unit = { super.beforeAll() - spark.sessionState.conf.setConf(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING, true) + spark.conf.set(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING, true) } protected override def afterAll(): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 407d783..49cbbe1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -47,7 +47,7 @@ abstract class FileStreamSinkSuite extends StreamTest { override def beforeAll(): Unit = { super.beforeAll() - spark.sessionState.conf.setConf(SQLConf.ORC_IMPLEMENTATION, "native") + spark.conf.set(SQLConf.ORC_IMPLEMENTATION, "native") } override def afterAll(): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index b907323..8a45895 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -257,7 +257,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { override def beforeAll(): Unit = { super.beforeAll() - spark.sessionState.conf.setConf(SQLConf.ORC_IMPLEMENTATION, "native") + spark.conf.set(SQLConf.ORC_IMPLEMENTATION, "native") } override def afterAll(): Unit = { @@ -1433,7 +1433,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { // This is to avoid running a spark job to list of files in parallel // by the InMemoryFileIndex. - spark.sessionState.conf.setConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD, numFiles * 2) + spark.conf.set(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD, numFiles * 2) withTempDirs { case (root, tmp) => val src = new File(root, "a=1") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org