[spark] branch branch-3.0 updated: [SPARK-32813][SQL] Get default config of ParquetSource vectorized reader if no active SparkSession

2020-09-08 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 4c0f9d8  [SPARK-32813][SQL] Get default config of ParquetSource 
vectorized reader if no active SparkSession
4c0f9d8 is described below

commit 4c0f9d8b44f63a3d1faaeece8b1d6b47c3bfe75f
Author: Liang-Chi Hsieh 
AuthorDate: Wed Sep 9 12:23:05 2020 +0900

[SPARK-32813][SQL] Get default config of ParquetSource vectorized reader if 
no active SparkSession

### What changes were proposed in this pull request?

If no active SparkSession is available, let 
`FileSourceScanExec.needsUnsafeRowConversion` look at default SQL config of 
ParquetSource vectorized reader instead of failing the query execution.

### Why are the changes needed?

Fix a bug that if no active SparkSession is available, file-based data 
source scan for Parquet Source will throw exception.

### Does this PR introduce _any_ user-facing change?

Yes, this change fixes the bug.

### How was this patch tested?

Unit test.

Closes #29667 from viirya/SPARK-32813.

Authored-by: Liang-Chi Hsieh 
Signed-off-by: HyukjinKwon 
(cherry picked from commit de0dc52a842bf4374c1ae4f9546dd95b3f35c4f1)
Signed-off-by: HyukjinKwon 
---
 .../spark/sql/execution/DataSourceScanExec.scala   |  2 +-
 .../spark/sql/execution/SQLExecutionSuite.scala| 40 +-
 2 files changed, 40 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 447e0a6..0fcb0dd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -175,7 +175,7 @@ case class FileSourceScanExec(
 
   private lazy val needsUnsafeRowConversion: Boolean = {
 if (relation.fileFormat.isInstanceOf[ParquetSource]) {
-  
SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
+  sqlContext.conf.parquetVectorizedReaderEnabled
 } else {
   false
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
index 8bf7fe6..81e6920 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
@@ -17,11 +17,17 @@
 
 package org.apache.spark.sql.execution
 
+import java.util.concurrent.Executors
+
 import scala.collection.parallel.immutable.ParRange
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration._
 
 import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.types._
+import org.apache.spark.util.ThreadUtils
 
 class SQLExecutionSuite extends SparkFunSuite {
 
@@ -119,6 +125,38 @@ class SQLExecutionSuite extends SparkFunSuite {
 
 spark.stop()
   }
+
+  test("SPARK-32813: Table scan should work in different thread") {
+val executor1 = Executors.newSingleThreadExecutor()
+val executor2 = Executors.newSingleThreadExecutor()
+var session: SparkSession = null
+SparkSession.cleanupAnyExistingSession()
+
+withTempDir { tempDir =>
+  try {
+val tablePath = tempDir.toString + "/table"
+val df = ThreadUtils.awaitResult(Future {
+  session = 
SparkSession.builder().appName("test").master("local[*]").getOrCreate()
+
+  session.createDataFrame(
+session.sparkContext.parallelize(Row(Array(1, 2, 3)) :: Nil),
+StructType(Seq(
+  StructField("a", ArrayType(IntegerType, containsNull = false), 
nullable = false
+.write.parquet(tablePath)
+
+  session.read.parquet(tablePath)
+}(ExecutionContext.fromExecutorService(executor1)), 1.minute)
+
+ThreadUtils.awaitResult(Future {
+  assert(df.rdd.collect()(0) === Row(Seq(1, 2, 3)))
+}(ExecutionContext.fromExecutorService(executor2)), 1.minute)
+  } finally {
+executor1.shutdown()
+executor2.shutdown()
+session.stop()
+  }
+}
+  }
 }
 
 object SQLExecutionSuite {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated: [SPARK-32813][SQL] Get default config of ParquetSource vectorized reader if no active SparkSession

2020-09-08 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 4c0f9d8  [SPARK-32813][SQL] Get default config of ParquetSource 
vectorized reader if no active SparkSession
4c0f9d8 is described below

commit 4c0f9d8b44f63a3d1faaeece8b1d6b47c3bfe75f
Author: Liang-Chi Hsieh 
AuthorDate: Wed Sep 9 12:23:05 2020 +0900

[SPARK-32813][SQL] Get default config of ParquetSource vectorized reader if 
no active SparkSession

### What changes were proposed in this pull request?

If no active SparkSession is available, let 
`FileSourceScanExec.needsUnsafeRowConversion` look at default SQL config of 
ParquetSource vectorized reader instead of failing the query execution.

### Why are the changes needed?

Fix a bug that if no active SparkSession is available, file-based data 
source scan for Parquet Source will throw exception.

### Does this PR introduce _any_ user-facing change?

Yes, this change fixes the bug.

### How was this patch tested?

Unit test.

Closes #29667 from viirya/SPARK-32813.

Authored-by: Liang-Chi Hsieh 
Signed-off-by: HyukjinKwon 
(cherry picked from commit de0dc52a842bf4374c1ae4f9546dd95b3f35c4f1)
Signed-off-by: HyukjinKwon 
---
 .../spark/sql/execution/DataSourceScanExec.scala   |  2 +-
 .../spark/sql/execution/SQLExecutionSuite.scala| 40 +-
 2 files changed, 40 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 447e0a6..0fcb0dd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -175,7 +175,7 @@ case class FileSourceScanExec(
 
   private lazy val needsUnsafeRowConversion: Boolean = {
 if (relation.fileFormat.isInstanceOf[ParquetSource]) {
-  
SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
+  sqlContext.conf.parquetVectorizedReaderEnabled
 } else {
   false
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
index 8bf7fe6..81e6920 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
@@ -17,11 +17,17 @@
 
 package org.apache.spark.sql.execution
 
+import java.util.concurrent.Executors
+
 import scala.collection.parallel.immutable.ParRange
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration._
 
 import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.types._
+import org.apache.spark.util.ThreadUtils
 
 class SQLExecutionSuite extends SparkFunSuite {
 
@@ -119,6 +125,38 @@ class SQLExecutionSuite extends SparkFunSuite {
 
 spark.stop()
   }
+
+  test("SPARK-32813: Table scan should work in different thread") {
+val executor1 = Executors.newSingleThreadExecutor()
+val executor2 = Executors.newSingleThreadExecutor()
+var session: SparkSession = null
+SparkSession.cleanupAnyExistingSession()
+
+withTempDir { tempDir =>
+  try {
+val tablePath = tempDir.toString + "/table"
+val df = ThreadUtils.awaitResult(Future {
+  session = 
SparkSession.builder().appName("test").master("local[*]").getOrCreate()
+
+  session.createDataFrame(
+session.sparkContext.parallelize(Row(Array(1, 2, 3)) :: Nil),
+StructType(Seq(
+  StructField("a", ArrayType(IntegerType, containsNull = false), 
nullable = false
+.write.parquet(tablePath)
+
+  session.read.parquet(tablePath)
+}(ExecutionContext.fromExecutorService(executor1)), 1.minute)
+
+ThreadUtils.awaitResult(Future {
+  assert(df.rdd.collect()(0) === Row(Seq(1, 2, 3)))
+}(ExecutionContext.fromExecutorService(executor2)), 1.minute)
+  } finally {
+executor1.shutdown()
+executor2.shutdown()
+session.stop()
+  }
+}
+  }
 }
 
 object SQLExecutionSuite {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated: [SPARK-32813][SQL] Get default config of ParquetSource vectorized reader if no active SparkSession

2020-09-08 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 4c0f9d8  [SPARK-32813][SQL] Get default config of ParquetSource 
vectorized reader if no active SparkSession
4c0f9d8 is described below

commit 4c0f9d8b44f63a3d1faaeece8b1d6b47c3bfe75f
Author: Liang-Chi Hsieh 
AuthorDate: Wed Sep 9 12:23:05 2020 +0900

[SPARK-32813][SQL] Get default config of ParquetSource vectorized reader if 
no active SparkSession

### What changes were proposed in this pull request?

If no active SparkSession is available, let 
`FileSourceScanExec.needsUnsafeRowConversion` look at default SQL config of 
ParquetSource vectorized reader instead of failing the query execution.

### Why are the changes needed?

Fix a bug that if no active SparkSession is available, file-based data 
source scan for Parquet Source will throw exception.

### Does this PR introduce _any_ user-facing change?

Yes, this change fixes the bug.

### How was this patch tested?

Unit test.

Closes #29667 from viirya/SPARK-32813.

Authored-by: Liang-Chi Hsieh 
Signed-off-by: HyukjinKwon 
(cherry picked from commit de0dc52a842bf4374c1ae4f9546dd95b3f35c4f1)
Signed-off-by: HyukjinKwon 
---
 .../spark/sql/execution/DataSourceScanExec.scala   |  2 +-
 .../spark/sql/execution/SQLExecutionSuite.scala| 40 +-
 2 files changed, 40 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 447e0a6..0fcb0dd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -175,7 +175,7 @@ case class FileSourceScanExec(
 
   private lazy val needsUnsafeRowConversion: Boolean = {
 if (relation.fileFormat.isInstanceOf[ParquetSource]) {
-  
SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
+  sqlContext.conf.parquetVectorizedReaderEnabled
 } else {
   false
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
index 8bf7fe6..81e6920 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
@@ -17,11 +17,17 @@
 
 package org.apache.spark.sql.execution
 
+import java.util.concurrent.Executors
+
 import scala.collection.parallel.immutable.ParRange
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration._
 
 import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.types._
+import org.apache.spark.util.ThreadUtils
 
 class SQLExecutionSuite extends SparkFunSuite {
 
@@ -119,6 +125,38 @@ class SQLExecutionSuite extends SparkFunSuite {
 
 spark.stop()
   }
+
+  test("SPARK-32813: Table scan should work in different thread") {
+val executor1 = Executors.newSingleThreadExecutor()
+val executor2 = Executors.newSingleThreadExecutor()
+var session: SparkSession = null
+SparkSession.cleanupAnyExistingSession()
+
+withTempDir { tempDir =>
+  try {
+val tablePath = tempDir.toString + "/table"
+val df = ThreadUtils.awaitResult(Future {
+  session = 
SparkSession.builder().appName("test").master("local[*]").getOrCreate()
+
+  session.createDataFrame(
+session.sparkContext.parallelize(Row(Array(1, 2, 3)) :: Nil),
+StructType(Seq(
+  StructField("a", ArrayType(IntegerType, containsNull = false), 
nullable = false
+.write.parquet(tablePath)
+
+  session.read.parquet(tablePath)
+}(ExecutionContext.fromExecutorService(executor1)), 1.minute)
+
+ThreadUtils.awaitResult(Future {
+  assert(df.rdd.collect()(0) === Row(Seq(1, 2, 3)))
+}(ExecutionContext.fromExecutorService(executor2)), 1.minute)
+  } finally {
+executor1.shutdown()
+executor2.shutdown()
+session.stop()
+  }
+}
+  }
 }
 
 object SQLExecutionSuite {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated: [SPARK-32813][SQL] Get default config of ParquetSource vectorized reader if no active SparkSession

2020-09-08 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 4c0f9d8  [SPARK-32813][SQL] Get default config of ParquetSource 
vectorized reader if no active SparkSession
4c0f9d8 is described below

commit 4c0f9d8b44f63a3d1faaeece8b1d6b47c3bfe75f
Author: Liang-Chi Hsieh 
AuthorDate: Wed Sep 9 12:23:05 2020 +0900

[SPARK-32813][SQL] Get default config of ParquetSource vectorized reader if 
no active SparkSession

### What changes were proposed in this pull request?

If no active SparkSession is available, let 
`FileSourceScanExec.needsUnsafeRowConversion` look at default SQL config of 
ParquetSource vectorized reader instead of failing the query execution.

### Why are the changes needed?

Fix a bug that if no active SparkSession is available, file-based data 
source scan for Parquet Source will throw exception.

### Does this PR introduce _any_ user-facing change?

Yes, this change fixes the bug.

### How was this patch tested?

Unit test.

Closes #29667 from viirya/SPARK-32813.

Authored-by: Liang-Chi Hsieh 
Signed-off-by: HyukjinKwon 
(cherry picked from commit de0dc52a842bf4374c1ae4f9546dd95b3f35c4f1)
Signed-off-by: HyukjinKwon 
---
 .../spark/sql/execution/DataSourceScanExec.scala   |  2 +-
 .../spark/sql/execution/SQLExecutionSuite.scala| 40 +-
 2 files changed, 40 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 447e0a6..0fcb0dd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -175,7 +175,7 @@ case class FileSourceScanExec(
 
   private lazy val needsUnsafeRowConversion: Boolean = {
 if (relation.fileFormat.isInstanceOf[ParquetSource]) {
-  
SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
+  sqlContext.conf.parquetVectorizedReaderEnabled
 } else {
   false
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
index 8bf7fe6..81e6920 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
@@ -17,11 +17,17 @@
 
 package org.apache.spark.sql.execution
 
+import java.util.concurrent.Executors
+
 import scala.collection.parallel.immutable.ParRange
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration._
 
 import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.types._
+import org.apache.spark.util.ThreadUtils
 
 class SQLExecutionSuite extends SparkFunSuite {
 
@@ -119,6 +125,38 @@ class SQLExecutionSuite extends SparkFunSuite {
 
 spark.stop()
   }
+
+  test("SPARK-32813: Table scan should work in different thread") {
+val executor1 = Executors.newSingleThreadExecutor()
+val executor2 = Executors.newSingleThreadExecutor()
+var session: SparkSession = null
+SparkSession.cleanupAnyExistingSession()
+
+withTempDir { tempDir =>
+  try {
+val tablePath = tempDir.toString + "/table"
+val df = ThreadUtils.awaitResult(Future {
+  session = 
SparkSession.builder().appName("test").master("local[*]").getOrCreate()
+
+  session.createDataFrame(
+session.sparkContext.parallelize(Row(Array(1, 2, 3)) :: Nil),
+StructType(Seq(
+  StructField("a", ArrayType(IntegerType, containsNull = false), 
nullable = false
+.write.parquet(tablePath)
+
+  session.read.parquet(tablePath)
+}(ExecutionContext.fromExecutorService(executor1)), 1.minute)
+
+ThreadUtils.awaitResult(Future {
+  assert(df.rdd.collect()(0) === Row(Seq(1, 2, 3)))
+}(ExecutionContext.fromExecutorService(executor2)), 1.minute)
+  } finally {
+executor1.shutdown()
+executor2.shutdown()
+session.stop()
+  }
+}
+  }
 }
 
 object SQLExecutionSuite {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated: [SPARK-32813][SQL] Get default config of ParquetSource vectorized reader if no active SparkSession

2020-09-08 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 4c0f9d8  [SPARK-32813][SQL] Get default config of ParquetSource 
vectorized reader if no active SparkSession
4c0f9d8 is described below

commit 4c0f9d8b44f63a3d1faaeece8b1d6b47c3bfe75f
Author: Liang-Chi Hsieh 
AuthorDate: Wed Sep 9 12:23:05 2020 +0900

[SPARK-32813][SQL] Get default config of ParquetSource vectorized reader if 
no active SparkSession

### What changes were proposed in this pull request?

If no active SparkSession is available, let 
`FileSourceScanExec.needsUnsafeRowConversion` look at default SQL config of 
ParquetSource vectorized reader instead of failing the query execution.

### Why are the changes needed?

Fix a bug that if no active SparkSession is available, file-based data 
source scan for Parquet Source will throw exception.

### Does this PR introduce _any_ user-facing change?

Yes, this change fixes the bug.

### How was this patch tested?

Unit test.

Closes #29667 from viirya/SPARK-32813.

Authored-by: Liang-Chi Hsieh 
Signed-off-by: HyukjinKwon 
(cherry picked from commit de0dc52a842bf4374c1ae4f9546dd95b3f35c4f1)
Signed-off-by: HyukjinKwon 
---
 .../spark/sql/execution/DataSourceScanExec.scala   |  2 +-
 .../spark/sql/execution/SQLExecutionSuite.scala| 40 +-
 2 files changed, 40 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 447e0a6..0fcb0dd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -175,7 +175,7 @@ case class FileSourceScanExec(
 
   private lazy val needsUnsafeRowConversion: Boolean = {
 if (relation.fileFormat.isInstanceOf[ParquetSource]) {
-  
SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
+  sqlContext.conf.parquetVectorizedReaderEnabled
 } else {
   false
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
index 8bf7fe6..81e6920 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala
@@ -17,11 +17,17 @@
 
 package org.apache.spark.sql.execution
 
+import java.util.concurrent.Executors
+
 import scala.collection.parallel.immutable.ParRange
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration._
 
 import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.types._
+import org.apache.spark.util.ThreadUtils
 
 class SQLExecutionSuite extends SparkFunSuite {
 
@@ -119,6 +125,38 @@ class SQLExecutionSuite extends SparkFunSuite {
 
 spark.stop()
   }
+
+  test("SPARK-32813: Table scan should work in different thread") {
+val executor1 = Executors.newSingleThreadExecutor()
+val executor2 = Executors.newSingleThreadExecutor()
+var session: SparkSession = null
+SparkSession.cleanupAnyExistingSession()
+
+withTempDir { tempDir =>
+  try {
+val tablePath = tempDir.toString + "/table"
+val df = ThreadUtils.awaitResult(Future {
+  session = 
SparkSession.builder().appName("test").master("local[*]").getOrCreate()
+
+  session.createDataFrame(
+session.sparkContext.parallelize(Row(Array(1, 2, 3)) :: Nil),
+StructType(Seq(
+  StructField("a", ArrayType(IntegerType, containsNull = false), 
nullable = false
+.write.parquet(tablePath)
+
+  session.read.parquet(tablePath)
+}(ExecutionContext.fromExecutorService(executor1)), 1.minute)
+
+ThreadUtils.awaitResult(Future {
+  assert(df.rdd.collect()(0) === Row(Seq(1, 2, 3)))
+}(ExecutionContext.fromExecutorService(executor2)), 1.minute)
+  } finally {
+executor1.shutdown()
+executor2.shutdown()
+session.stop()
+  }
+}
+  }
 }
 
 object SQLExecutionSuite {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org