spark git commit: [SPARK-23312][SQL] add a config to turn off vectorized cache reader

2018-02-02 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 2b07452ca -> e5e9f9a43


[SPARK-23312][SQL] add a config to turn off vectorized cache reader

## What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-23309 reported a performance 
regression about cached table in Spark 2.3. While the investigating is still 
going on, this PR adds a conf to turn off the vectorized cache reader, to 
unblock the 2.3 release.

## How was this patch tested?

a new test

Author: Wenchen Fan 

Closes #20483 from cloud-fan/cache.

(cherry picked from commit b9503fcbb3f4a3ce263164d1f11a8e99b9ca5710)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e5e9f9a4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e5e9f9a4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e5e9f9a4

Branch: refs/heads/branch-2.3
Commit: e5e9f9a430c827669ecfe9d5c13cc555fc89c980
Parents: 2b07452
Author: Wenchen Fan 
Authored: Fri Feb 2 22:43:28 2018 +0800
Committer: Wenchen Fan 
Committed: Fri Feb 2 22:43:51 2018 +0800

--
 .../org/apache/spark/sql/internal/SQLConf.scala  |  8 
 .../execution/columnar/InMemoryTableScanExec.scala   |  2 +-
 .../org/apache/spark/sql/CachedTableSuite.scala  | 15 +--
 3 files changed, 22 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e5e9f9a4/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 7394a0d..e498f55 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -141,6 +141,12 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
+  val CACHE_VECTORIZED_READER_ENABLED =
+buildConf("spark.sql.inMemoryColumnarStorage.enableVectorizedReader")
+  .doc("Enables vectorized reader for columnar caching.")
+  .booleanConf
+  .createWithDefault(true)
+
   val COLUMN_VECTOR_OFFHEAP_ENABLED =
 buildConf("spark.sql.columnVector.offheap.enabled")
   .internal()
@@ -1256,6 +1262,8 @@ class SQLConf extends Serializable with Logging {
 
   def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE)
 
+  def cacheVectorizedReaderEnabled: Boolean = 
getConf(CACHE_VECTORIZED_READER_ENABLED)
+
   def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
 
   def targetPostShuffleInputSize: Long =

http://git-wip-us.apache.org/repos/asf/spark/blob/e5e9f9a4/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index c167f1e..e972f8b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -54,7 +54,7 @@ case class InMemoryTableScanExec(
   override val supportsBatch: Boolean = {
 // In the initial implementation, for ease of review
 // support only primitive data types and # of fields is less than 
wholeStageMaxNumFields
-relation.schema.fields.forall(f => f.dataType match {
+conf.cacheVectorizedReaderEnabled && relation.schema.fields.forall(f => 
f.dataType match {
   case BooleanType | ByteType | ShortType | IntegerType | LongType |
FloatType | DoubleType => true
   case _ => false

http://git-wip-us.apache.org/repos/asf/spark/blob/e5e9f9a4/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 72fe0f4..9f27fa0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -21,8 +21,6 @@ import scala.collection.mutable.HashSet
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
-import org.scalatest.concurrent.Eventually._
-
 import org.apache.spark.CleanerListener
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
@@ -30,6 +28,7 @@ import org.apache.spark.sql.execution.{RDDScanExe

spark git commit: [SPARK-23312][SQL] add a config to turn off vectorized cache reader

2018-02-02 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 19c7c7ebd -> b9503fcbb


[SPARK-23312][SQL] add a config to turn off vectorized cache reader

## What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-23309 reported a performance 
regression about cached table in Spark 2.3. While the investigating is still 
going on, this PR adds a conf to turn off the vectorized cache reader, to 
unblock the 2.3 release.

## How was this patch tested?

a new test

Author: Wenchen Fan 

Closes #20483 from cloud-fan/cache.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b9503fcb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b9503fcb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b9503fcb

Branch: refs/heads/master
Commit: b9503fcbb3f4a3ce263164d1f11a8e99b9ca5710
Parents: 19c7c7e
Author: Wenchen Fan 
Authored: Fri Feb 2 22:43:28 2018 +0800
Committer: Wenchen Fan 
Committed: Fri Feb 2 22:43:28 2018 +0800

--
 .../org/apache/spark/sql/internal/SQLConf.scala  |  8 
 .../execution/columnar/InMemoryTableScanExec.scala   |  2 +-
 .../org/apache/spark/sql/CachedTableSuite.scala  | 15 +--
 3 files changed, 22 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b9503fcb/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 90654e6..1e2501e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -141,6 +141,12 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
+  val CACHE_VECTORIZED_READER_ENABLED =
+buildConf("spark.sql.inMemoryColumnarStorage.enableVectorizedReader")
+  .doc("Enables vectorized reader for columnar caching.")
+  .booleanConf
+  .createWithDefault(true)
+
   val COLUMN_VECTOR_OFFHEAP_ENABLED =
 buildConf("spark.sql.columnVector.offheap.enabled")
   .internal()
@@ -1272,6 +1278,8 @@ class SQLConf extends Serializable with Logging {
 
   def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE)
 
+  def cacheVectorizedReaderEnabled: Boolean = 
getConf(CACHE_VECTORIZED_READER_ENABLED)
+
   def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
 
   def targetPostShuffleInputSize: Long =

http://git-wip-us.apache.org/repos/asf/spark/blob/b9503fcb/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index c167f1e..e972f8b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -54,7 +54,7 @@ case class InMemoryTableScanExec(
   override val supportsBatch: Boolean = {
 // In the initial implementation, for ease of review
 // support only primitive data types and # of fields is less than 
wholeStageMaxNumFields
-relation.schema.fields.forall(f => f.dataType match {
+conf.cacheVectorizedReaderEnabled && relation.schema.fields.forall(f => 
f.dataType match {
   case BooleanType | ByteType | ShortType | IntegerType | LongType |
FloatType | DoubleType => true
   case _ => false

http://git-wip-us.apache.org/repos/asf/spark/blob/b9503fcb/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 72fe0f4..9f27fa0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -21,8 +21,6 @@ import scala.collection.mutable.HashSet
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
-import org.scalatest.concurrent.Eventually._
-
 import org.apache.spark.CleanerListener
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
@@ -30,6 +28,7 @@ import org.apache.spark.sql.execution.{RDDScanExec, SparkPlan}
 import org.apache.spark.sql.execution.columnar._
 import org.apache.spark.sql.execution.exc