This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 54b75582506d [SPARK-48805][SQL][ML][SS][AVRO][EXAMPLES] Replace calls to bridged APIs based on `SparkSession#sqlContext` with `SparkSession` API 54b75582506d is described below commit 54b75582506d0e58af7f500b9d284ab7222e98f0 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Thu Jul 4 19:27:22 2024 +0800 [SPARK-48805][SQL][ML][SS][AVRO][EXAMPLES] Replace calls to bridged APIs based on `SparkSession#sqlContext` with `SparkSession` API ### What changes were proposed in this pull request? In the internal code of Spark, there are instances where, despite having a SparkSession instance, the bridged APIs based on SparkSession#sqlContext are still used. Therefore, this PR makes some simplifications in this regard:" 1. `SparkSession#sqlContext#read` -> `SparkSession#read` ```scala /** * Returns a [[DataFrameReader]] that can be used to read non-streaming data in as a * `DataFrame`. * {{{ * sqlContext.read.parquet("/path/to/file.parquet") * sqlContext.read.schema(schema).json("/path/to/file.json") * }}} * * group genericdata * since 1.4.0 */ def read: DataFrameReader = sparkSession.read ``` 2. `SparkSession#sqlContext#setConf` -> `SparkSession#conf#set` ```scala /** * Set the given Spark SQL configuration property. * * group config * since 1.0.0 */ def setConf(key: String, value: String): Unit = { sparkSession.conf.set(key, value) } ``` 3. `SparkSession#sqlContext#getConf` -> `SparkSession#conf#get` ```scala /** * Return the value of Spark SQL configuration property for the given key. * * group config * since 1.0.0 */ def getConf(key: String): String = { sparkSession.conf.get(key) } ``` 4. `SparkSession#sqlContext#createDataFrame` -> `SparkSession#createDataFrame` ```scala /** * Creates a DataFrame from an RDD of Product (e.g. case classes, tuples). * * group dataframes * since 1.3.0 */ def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame = { sparkSession.createDataFrame(rdd) } ``` 5. `SparkSession#sqlContext#sessionState` -> `SparkSession#sessionState` ```scala private[sql] def sessionState: SessionState = sparkSession.sessionState ``` 6. `SparkSession#sqlContext#sharedState` -> `SparkSession#sharedState` ```scala private[sql] def sharedState: SharedState = sparkSession.sharedState ``` 7. `SparkSession#sqlContext#streams` -> `SparkSession#streams` ```scala /** * Returns a `StreamingQueryManager` that allows managing all the * [[org.apache.spark.sql.streaming.StreamingQuery StreamingQueries]] active on `this` context. * * since 2.0.0 */ def streams: StreamingQueryManager = sparkSession.streams ``` 8. `SparkSession#sqlContext#uncacheTable` -> `SparkSession#catalog#uncacheTable` ```scala /** * Removes the specified table from the in-memory cache. * group cachemgmt * since 1.3.0 */ def uncacheTable(tableName: String): Unit = { sparkSession.catalog.uncacheTable(tableName) } ``` ### Why are the changes needed? Decrease the nesting levels of API calls ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass GitHub Actions - Manually checked `SparkHiveExample` ### Was this patch authored or co-authored using generative AI tooling? No Closes #47210 from LuciferYang/session.sqlContext. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: yangjie01 <yangji...@baidu.com> --- .../src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala | 4 ++-- .../apache/spark/examples/sql/hive/SparkHiveExample.scala | 4 ++-- .../apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala | 2 +- .../sql/execution/streaming/FlatMapGroupsWithStateExec.scala | 12 ++++++------ .../sql/execution/streaming/TransformWithStateExec.scala | 12 ++++++------ .../test/scala/org/apache/spark/sql/CachedTableSuite.scala | 4 ++-- .../apache/spark/sql/errors/QueryExecutionErrorsSuite.scala | 2 +- .../apache/spark/sql/hive/HiveParquetMetastoreSuite.scala | 2 +- .../org/apache/spark/sql/hive/HiveUDFDynamicLoadSuite.scala | 2 +- .../spark/sql/hive/PartitionedTablePerfStatsSuite.scala | 2 +- 10 files changed, 23 insertions(+), 23 deletions(-) diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 42c13f5e2087..ce38ada7c9e4 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -566,7 +566,7 @@ abstract class AvroSuite dataFileWriter.flush() dataFileWriter.close() - val df = spark.sqlContext.read.format("avro").load(nativeWriterPath) + val df = spark.read.format("avro").load(nativeWriterPath) assertResult(Row(field1, null, null, null))(df.selectExpr("field1.*").first()) assertResult(Row(null, field2, null, null))(df.selectExpr("field2.*").first()) assertResult(Row(null, null, field3, null))(df.selectExpr("field3.*").first()) @@ -575,7 +575,7 @@ abstract class AvroSuite df.write.format("avro").option("avroSchema", schema.toString).save(sparkWriterPath) - val df2 = spark.sqlContext.read.format("avro").load(nativeWriterPath) + val df2 = spark.read.format("avro").load(nativeWriterPath) assertResult(Row(field1, null, null, null))(df2.selectExpr("field1.*").first()) assertResult(Row(null, field2, null, null))(df2.selectExpr("field2.*").first()) assertResult(Row(null, null, field3, null))(df2.selectExpr("field3.*").first()) diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala index 3be8a3862f39..3996d5074172 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala @@ -134,8 +134,8 @@ object SparkHiveExample { // ... Order may vary, as spark processes the partitions in parallel. // Turn on flag for Hive Dynamic Partitioning - spark.sqlContext.setConf("hive.exec.dynamic.partition", "true") - spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict") + spark.conf.set("hive.exec.dynamic.partition", "true") + spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict") // Create a Hive partitioned table using DataFrame API df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl") // Partitioned column `key` will be moved to the end of the schema. diff --git a/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala index db685b6ed4fa..f2bb14561472 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala @@ -158,7 +158,7 @@ class LibSVMRelationSuite StructField("labelFoo", DoubleType, false), StructField("featuresBar", VectorType, false)) ) - val df = spark.sqlContext.createDataFrame(rawData, struct) + val df = spark.createDataFrame(rawData, struct) val writePath = Utils.createTempDir().getPath diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala index 2fa744a0f89b..ad1707ce7dad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala @@ -191,7 +191,7 @@ trait FlatMapGroupsWithStateExecBase override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration): Unit = { StateSchemaCompatibilityChecker.validateAndMaybeEvolveStateSchema(getStateInfo, hadoopConf, - groupingAttributes.toStructType, stateManager.stateSchema, session.sqlContext.sessionState) + groupingAttributes.toStructType, stateManager.stateSchema, session.sessionState) } override protected def doExecute(): RDD[InternalRow] = { @@ -215,14 +215,14 @@ trait FlatMapGroupsWithStateExecBase if (hasInitialState) { // If the user provided initial state we need to have the initial state and the // data in the same partition so that we can still have just one commit at the end. - val storeConf = new StateStoreConf(session.sqlContext.sessionState.conf) + val storeConf = new StateStoreConf(session.sessionState.conf) val hadoopConfBroadcast = sparkContext.broadcast( - new SerializableConfiguration(session.sqlContext.sessionState.newHadoopConf())) + new SerializableConfiguration(session.sessionState.newHadoopConf())) child.execute().stateStoreAwareZipPartitions( initialState.execute(), getStateInfo, storeNames = Seq(), - session.sqlContext.streams.stateStoreCoordinator) { + session.streams.stateStoreCoordinator) { // The state store aware zip partitions will provide us with two iterators, // child data iterator and the initial state iterator per partition. case (partitionId, childDataIterator, initStateIterator) => @@ -246,8 +246,8 @@ trait FlatMapGroupsWithStateExecBase groupingAttributes.toStructType, stateManager.stateSchema, NoPrefixKeyStateEncoderSpec(groupingAttributes.toStructType), - session.sqlContext.sessionState, - Some(session.sqlContext.streams.stateStoreCoordinator) + session.sessionState, + Some(session.streams.stateStoreCoordinator) ) { case (store: StateStore, singleIterator: Iterator[InternalRow]) => val processor = createInputProcessor(store) processDataWithPartition(singleIterator, store, processor) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala index c5b22c89e82f..c56d7c969d9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala @@ -353,14 +353,14 @@ case class TransformWithStateExec( validateTimeMode() if (hasInitialState) { - val storeConf = new StateStoreConf(session.sqlContext.sessionState.conf) + val storeConf = new StateStoreConf(session.sessionState.conf) val hadoopConfBroadcast = sparkContext.broadcast( - new SerializableConfiguration(session.sqlContext.sessionState.newHadoopConf())) + new SerializableConfiguration(session.sessionState.newHadoopConf())) child.execute().stateStoreAwareZipPartitions( initialState.execute(), getStateInfo, storeNames = Seq(), - session.sqlContext.streams.stateStoreCoordinator) { + session.streams.stateStoreCoordinator) { // The state store aware zip partitions will provide us with two iterators, // child data iterator and the initial state iterator per partition. case (partitionId, childDataIterator, initStateIterator) => @@ -393,8 +393,8 @@ case class TransformWithStateExec( KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA, NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA), - session.sqlContext.sessionState, - Some(session.sqlContext.streams.stateStoreCoordinator), + session.sessionState, + Some(session.streams.stateStoreCoordinator), useColumnFamilies = true ) { case (store: StateStore, singleIterator: Iterator[InternalRow]) => @@ -404,7 +404,7 @@ case class TransformWithStateExec( // If the query is running in batch mode, we need to create a new StateStore and instantiate // a temp directory on the executors in mapPartitionsWithIndex. val hadoopConfBroadcast = sparkContext.broadcast( - new SerializableConfiguration(session.sqlContext.sessionState.newHadoopConf())) + new SerializableConfiguration(session.sessionState.newHadoopConf())) child.execute().mapPartitionsWithIndex[InternalRow]( (i: Int, iter: Iterator[InternalRow]) => { initNewStateStoreAndProcessData(i, hadoopConfBroadcast) { store => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 71b420bb85ea..3ac433f31288 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -1473,7 +1473,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils assert(spark.catalog.isCached("view2")) val oldView = spark.table("view2") - spark.sqlContext.uncacheTable("view1") + spark.catalog.uncacheTable("view1") assert(storeAnalyzed == spark.sharedState.cacheManager.lookupCachedData(oldView).isDefined, s"when storeAnalyzed = $storeAnalyzed") @@ -1493,7 +1493,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils assert(spark.catalog.isCached("view2")) val oldView = spark.table("view2") - spark.sqlContext.uncacheTable(s"$db.view1") + spark.catalog.uncacheTable(s"$db.view1") assert(storeAnalyzed == spark.sharedState.cacheManager.lookupCachedData(oldView).isDefined, s"when storeAnalyzed = $storeAnalyzed") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala index 8238eabc7fef..4a748d590feb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala @@ -777,7 +777,7 @@ class QueryExecutionErrorsSuite checkError( exception = intercept[SparkIllegalArgumentException] { val row = spark.sparkContext.parallelize(Seq(1, 2)).map(Row(_)) - spark.sqlContext.createDataFrame(row, StructType.fromString("StructType()")) + spark.createDataFrame(row, StructType.fromString("StructType()")) }, errorClass = "UNSUPPORTED_DATATYPE", parameters = Map( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetMetastoreSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetMetastoreSuite.scala index 035175430d92..fe029173fae3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetMetastoreSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetMetastoreSuite.scala @@ -155,7 +155,7 @@ class HiveParquetMetastoreSuite extends ParquetPartitioningTest { (1 to 10).map(i => Tuple1(Seq(Integer.valueOf(i), null))).toDF("a") .createOrReplaceTempView("jt_array") - assert(spark.sqlContext.getConf(HiveUtils.CONVERT_METASTORE_PARQUET.key) == "true") + assert(spark.conf.get(HiveUtils.CONVERT_METASTORE_PARQUET.key) == "true") } override def afterAll(): Unit = { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUDFDynamicLoadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUDFDynamicLoadSuite.scala index fa54d4898f1d..fe991d017702 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUDFDynamicLoadSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUDFDynamicLoadSuite.scala @@ -166,7 +166,7 @@ class HiveUDFDynamicLoadSuite extends QueryTest with SQLTestUtils with TestHiveS assert(Thread.currentThread().getContextClassLoader ne sparkClassLoader) assert(Thread.currentThread().getContextClassLoader eq - spark.sqlContext.sharedState.jarClassLoader) + spark.sharedState.jarClassLoader) val udfExpr = udfInfo.fnCreateHiveUDFExpression() // force initializing - this is what we do in HiveSessionCatalog diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala index de3b1ffccf00..f7e453a1dbde 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala @@ -100,7 +100,7 @@ class PartitionedTablePerfStatsSuite } genericTest("partitioned pruned table reports only selected files") { spec => - assert(spark.sqlContext.getConf(HiveUtils.CONVERT_METASTORE_PARQUET.key) == "true") + assert(spark.conf.get(HiveUtils.CONVERT_METASTORE_PARQUET.key) == "true") withTable("test") { withTempDir { dir => spec.setupTable("test", dir) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org