spark git commit: [SPARK-23312][SQL] add a config to turn off vectorized cache reader
Repository: spark Updated Branches: refs/heads/branch-2.3 2b07452ca -> e5e9f9a43 [SPARK-23312][SQL] add a config to turn off vectorized cache reader ## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-23309 reported a performance regression about cached table in Spark 2.3. While the investigating is still going on, this PR adds a conf to turn off the vectorized cache reader, to unblock the 2.3 release. ## How was this patch tested? a new test Author: Wenchen Fan Closes #20483 from cloud-fan/cache. (cherry picked from commit b9503fcbb3f4a3ce263164d1f11a8e99b9ca5710) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e5e9f9a4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e5e9f9a4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e5e9f9a4 Branch: refs/heads/branch-2.3 Commit: e5e9f9a430c827669ecfe9d5c13cc555fc89c980 Parents: 2b07452 Author: Wenchen Fan Authored: Fri Feb 2 22:43:28 2018 +0800 Committer: Wenchen Fan Committed: Fri Feb 2 22:43:51 2018 +0800 -- .../org/apache/spark/sql/internal/SQLConf.scala | 8 .../execution/columnar/InMemoryTableScanExec.scala | 2 +- .../org/apache/spark/sql/CachedTableSuite.scala | 15 +-- 3 files changed, 22 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e5e9f9a4/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 7394a0d..e498f55 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -141,6 +141,12 @@ object SQLConf { .booleanConf .createWithDefault(true) + val CACHE_VECTORIZED_READER_ENABLED = +buildConf("spark.sql.inMemoryColumnarStorage.enableVectorizedReader") + .doc("Enables vectorized reader for columnar caching.") + .booleanConf + .createWithDefault(true) + val COLUMN_VECTOR_OFFHEAP_ENABLED = buildConf("spark.sql.columnVector.offheap.enabled") .internal() @@ -1256,6 +1262,8 @@ class SQLConf extends Serializable with Logging { def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE) + def cacheVectorizedReaderEnabled: Boolean = getConf(CACHE_VECTORIZED_READER_ENABLED) + def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS) def targetPostShuffleInputSize: Long = http://git-wip-us.apache.org/repos/asf/spark/blob/e5e9f9a4/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index c167f1e..e972f8b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -54,7 +54,7 @@ case class InMemoryTableScanExec( override val supportsBatch: Boolean = { // In the initial implementation, for ease of review // support only primitive data types and # of fields is less than wholeStageMaxNumFields -relation.schema.fields.forall(f => f.dataType match { +conf.cacheVectorizedReaderEnabled && relation.schema.fields.forall(f => f.dataType match { case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType => true case _ => false http://git-wip-us.apache.org/repos/asf/spark/blob/e5e9f9a4/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 72fe0f4..9f27fa0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -21,8 +21,6 @@ import scala.collection.mutable.HashSet import scala.concurrent.duration._ import scala.language.postfixOps -import org.scalatest.concurrent.Eventually._ - import org.apache.spark.CleanerListener import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.SubqueryExpression @@ -30,6 +28,7 @@ import org.apache.spark.sql.execution.{RDDScanExe
spark git commit: [SPARK-23312][SQL] add a config to turn off vectorized cache reader
Repository: spark Updated Branches: refs/heads/master 19c7c7ebd -> b9503fcbb [SPARK-23312][SQL] add a config to turn off vectorized cache reader ## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-23309 reported a performance regression about cached table in Spark 2.3. While the investigating is still going on, this PR adds a conf to turn off the vectorized cache reader, to unblock the 2.3 release. ## How was this patch tested? a new test Author: Wenchen Fan Closes #20483 from cloud-fan/cache. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b9503fcb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b9503fcb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b9503fcb Branch: refs/heads/master Commit: b9503fcbb3f4a3ce263164d1f11a8e99b9ca5710 Parents: 19c7c7e Author: Wenchen Fan Authored: Fri Feb 2 22:43:28 2018 +0800 Committer: Wenchen Fan Committed: Fri Feb 2 22:43:28 2018 +0800 -- .../org/apache/spark/sql/internal/SQLConf.scala | 8 .../execution/columnar/InMemoryTableScanExec.scala | 2 +- .../org/apache/spark/sql/CachedTableSuite.scala | 15 +-- 3 files changed, 22 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b9503fcb/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 90654e6..1e2501e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -141,6 +141,12 @@ object SQLConf { .booleanConf .createWithDefault(true) + val CACHE_VECTORIZED_READER_ENABLED = +buildConf("spark.sql.inMemoryColumnarStorage.enableVectorizedReader") + .doc("Enables vectorized reader for columnar caching.") + .booleanConf + .createWithDefault(true) + val COLUMN_VECTOR_OFFHEAP_ENABLED = buildConf("spark.sql.columnVector.offheap.enabled") .internal() @@ -1272,6 +1278,8 @@ class SQLConf extends Serializable with Logging { def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE) + def cacheVectorizedReaderEnabled: Boolean = getConf(CACHE_VECTORIZED_READER_ENABLED) + def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS) def targetPostShuffleInputSize: Long = http://git-wip-us.apache.org/repos/asf/spark/blob/b9503fcb/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index c167f1e..e972f8b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -54,7 +54,7 @@ case class InMemoryTableScanExec( override val supportsBatch: Boolean = { // In the initial implementation, for ease of review // support only primitive data types and # of fields is less than wholeStageMaxNumFields -relation.schema.fields.forall(f => f.dataType match { +conf.cacheVectorizedReaderEnabled && relation.schema.fields.forall(f => f.dataType match { case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType => true case _ => false http://git-wip-us.apache.org/repos/asf/spark/blob/b9503fcb/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 72fe0f4..9f27fa0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -21,8 +21,6 @@ import scala.collection.mutable.HashSet import scala.concurrent.duration._ import scala.language.postfixOps -import org.scalatest.concurrent.Eventually._ - import org.apache.spark.CleanerListener import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.SubqueryExpression @@ -30,6 +28,7 @@ import org.apache.spark.sql.execution.{RDDScanExec, SparkPlan} import org.apache.spark.sql.execution.columnar._ import org.apache.spark.sql.execution.exc