This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 4c0f9d8 [SPARK-32813][SQL] Get default config of ParquetSource vectorized reader if no active SparkSession 4c0f9d8 is described below commit 4c0f9d8b44f63a3d1faaeece8b1d6b47c3bfe75f Author: Liang-Chi Hsieh <vii...@gmail.com> AuthorDate: Wed Sep 9 12:23:05 2020 +0900 [SPARK-32813][SQL] Get default config of ParquetSource vectorized reader if no active SparkSession ### What changes were proposed in this pull request? If no active SparkSession is available, let `FileSourceScanExec.needsUnsafeRowConversion` look at default SQL config of ParquetSource vectorized reader instead of failing the query execution. ### Why are the changes needed? Fix a bug that if no active SparkSession is available, file-based data source scan for Parquet Source will throw exception. ### Does this PR introduce _any_ user-facing change? Yes, this change fixes the bug. ### How was this patch tested? Unit test. Closes #29667 from viirya/SPARK-32813. Authored-by: Liang-Chi Hsieh <vii...@gmail.com> Signed-off-by: HyukjinKwon <gurwls...@apache.org> (cherry picked from commit de0dc52a842bf4374c1ae4f9546dd95b3f35c4f1) Signed-off-by: HyukjinKwon <gurwls...@apache.org> --- .../spark/sql/execution/DataSourceScanExec.scala | 2 +- .../spark/sql/execution/SQLExecutionSuite.scala | 40 +++++++++++++++++++++- 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 447e0a6..0fcb0dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -175,7 +175,7 @@ case class FileSourceScanExec( private lazy val needsUnsafeRowConversion: Boolean = { if (relation.fileFormat.isInstanceOf[ParquetSource]) { - SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled + sqlContext.conf.parquetVectorizedReaderEnabled } else { false } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala index 8bf7fe6..81e6920 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala @@ -17,11 +17,17 @@ package org.apache.spark.sql.execution +import java.util.concurrent.Executors + import scala.collection.parallel.immutable.ParRange +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration._ import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.types._ +import org.apache.spark.util.ThreadUtils class SQLExecutionSuite extends SparkFunSuite { @@ -119,6 +125,38 @@ class SQLExecutionSuite extends SparkFunSuite { spark.stop() } + + test("SPARK-32813: Table scan should work in different thread") { + val executor1 = Executors.newSingleThreadExecutor() + val executor2 = Executors.newSingleThreadExecutor() + var session: SparkSession = null + SparkSession.cleanupAnyExistingSession() + + withTempDir { tempDir => + try { + val tablePath = tempDir.toString + "/table" + val df = ThreadUtils.awaitResult(Future { + session = SparkSession.builder().appName("test").master("local[*]").getOrCreate() + + session.createDataFrame( + session.sparkContext.parallelize(Row(Array(1, 2, 3)) :: Nil), + StructType(Seq( + StructField("a", ArrayType(IntegerType, containsNull = false), nullable = false)))) + .write.parquet(tablePath) + + session.read.parquet(tablePath) + }(ExecutionContext.fromExecutorService(executor1)), 1.minute) + + ThreadUtils.awaitResult(Future { + assert(df.rdd.collect()(0) === Row(Seq(1, 2, 3))) + }(ExecutionContext.fromExecutorService(executor2)), 1.minute) + } finally { + executor1.shutdown() + executor2.shutdown() + session.stop() + } + } + } } object SQLExecutionSuite { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org