Repository: spark
Updated Branches:
  refs/heads/master ac586bbb0 -> 9ad0f6ea8


[SPARK-25269][SQL] SQL interface support specify StorageLevel when cache table

## What changes were proposed in this pull request?

SQL interface support specify `StorageLevel` when cache table. The semantic is:
```sql
CACHE TABLE tableName OPTIONS('storageLevel' 'DISK_ONLY');
```
All supported `StorageLevel` are:
https://github.com/apache/spark/blob/eefdf9f9dd8afde49ad7d4e230e2735eb817ab0a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala#L172-L183

## How was this patch tested?

unit tests and manual tests.

manual tests configuration:
```
--executor-memory 15G --executor-cores 5 --num-executors 50
```
Data:
Input Size / Records: 1037.7 GB / 11732805788

Result:
![image](https://user-images.githubusercontent.com/5399861/47213362-56a1c980-d3cd-11e8-82e7-28d7abc5923e.png)

Closes #22263 from wangyum/SPARK-25269.

Authored-by: Yuming Wang <yumw...@ebay.com>
Signed-off-by: Dongjoon Hyun <dongj...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9ad0f6ea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9ad0f6ea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9ad0f6ea

Branch: refs/heads/master
Commit: 9ad0f6ea89435391ec16e436bc4c4d5bf6b68493
Parents: ac586bb
Author: Yuming Wang <yumw...@ebay.com>
Authored: Fri Oct 19 09:15:55 2018 -0700
Committer: Dongjoon Hyun <dongj...@apache.org>
Committed: Fri Oct 19 09:15:55 2018 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/parser/SqlBase.g4 |  3 +-
 .../spark/sql/execution/SparkSqlParser.scala    |  3 +-
 .../spark/sql/execution/command/cache.scala     | 23 +++++++-
 .../org/apache/spark/sql/CachedTableSuite.scala | 60 ++++++++++++++++++++
 .../apache/spark/sql/hive/test/TestHive.scala   |  2 +-
 5 files changed, 86 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9ad0f6ea/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 0569986..e2d34d1 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -162,7 +162,8 @@ statement
         tableIdentifier partitionSpec? describeColName?                
#describeTable
     | REFRESH TABLE tableIdentifier                                    
#refreshTable
     | REFRESH (STRING | .*?)                                           
#refreshResource
-    | CACHE LAZY? TABLE tableIdentifier (AS? query)?                   
#cacheTable
+    | CACHE LAZY? TABLE tableIdentifier
+        (OPTIONS options=tablePropertyList)? (AS? query)?              
#cacheTable
     | UNCACHE TABLE (IF EXISTS)? tableIdentifier                       
#uncacheTable
     | CLEAR CACHE                                                      
#clearCache
     | LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE

http://git-wip-us.apache.org/repos/asf/spark/blob/9ad0f6ea/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 4ed14d3..364efea 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -282,7 +282,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder(conf) {
       throw new ParseException(s"It is not allowed to add database prefix 
`$database` to " +
         s"the table name in CACHE TABLE AS SELECT", ctx)
     }
-    CacheTableCommand(tableIdent, query, ctx.LAZY != null)
+    val options = 
Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
+    CacheTableCommand(tableIdent, query, ctx.LAZY != null, options)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/9ad0f6ea/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
index 6b00426..728604a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
@@ -17,16 +17,21 @@
 
 package org.apache.spark.sql.execution.command
 
+import java.util.Locale
+
 import org.apache.spark.sql.{Dataset, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.storage.StorageLevel
 
 case class CacheTableCommand(
     tableIdent: TableIdentifier,
     plan: Option[LogicalPlan],
-    isLazy: Boolean) extends RunnableCommand {
+    isLazy: Boolean,
+    options: Map[String, String]) extends RunnableCommand {
   require(plan.isEmpty || tableIdent.database.isEmpty,
     "Database name is not allowed in CACHE TABLE AS SELECT")
 
@@ -36,7 +41,21 @@ case class CacheTableCommand(
     plan.foreach { logicalPlan =>
       Dataset.ofRows(sparkSession, 
logicalPlan).createTempView(tableIdent.quotedString)
     }
-    sparkSession.catalog.cacheTable(tableIdent.quotedString)
+
+    val storageLevelKey = "storagelevel"
+    val storageLevelValue =
+      
CaseInsensitiveMap(options).get(storageLevelKey).map(_.toUpperCase(Locale.ROOT))
+    val withoutStorageLevel = options.filterKeys(_.toLowerCase(Locale.ROOT) != 
storageLevelKey)
+    if (withoutStorageLevel.nonEmpty) {
+      logWarning(s"Invalid options: ${withoutStorageLevel.mkString(", ")}")
+    }
+
+    if (storageLevelValue.nonEmpty) {
+      sparkSession.catalog.cacheTable(
+        tableIdent.quotedString, 
StorageLevel.fromString(storageLevelValue.get))
+    } else {
+      sparkSession.catalog.cacheTable(tableIdent.quotedString)
+    }
 
     if (!isLazy) {
       // Performs eager caching

http://git-wip-us.apache.org/repos/asf/spark/blob/9ad0f6ea/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 60c73df..6e805c4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -22,6 +22,8 @@ import scala.concurrent.duration._
 import scala.language.postfixOps
 
 import org.apache.spark.CleanerListener
+import org.apache.spark.executor.DataReadMethod._
+import org.apache.spark.executor.DataReadMethod.DataReadMethod
 import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
@@ -64,6 +66,13 @@ class CachedTableSuite extends QueryTest with SQLTestUtils 
with SharedSQLContext
     maybeBlock.nonEmpty
   }
 
+  def isExpectStorageLevel(rddId: Int, level: DataReadMethod): Boolean = {
+    val maybeBlock = sparkContext.env.blockManager.get(RDDBlockId(rddId, 0))
+    val isExpectLevel = maybeBlock.forall(_.readMethod === level)
+    maybeBlock.foreach(_ => 
sparkContext.env.blockManager.releaseLock(RDDBlockId(rddId, 0)))
+    maybeBlock.nonEmpty && isExpectLevel
+  }
+
   private def getNumInMemoryRelations(ds: Dataset[_]): Int = {
     val plan = ds.queryExecution.withCachedData
     var sum = plan.collect { case _: InMemoryRelation => 1 }.sum
@@ -288,6 +297,57 @@ class CachedTableSuite extends QueryTest with SQLTestUtils 
with SharedSQLContext
     }
   }
 
+  private def assertStorageLevel(cacheOptions: String, level: DataReadMethod): 
Unit = {
+    sql(s"CACHE TABLE testData OPTIONS$cacheOptions")
+    assertCached(spark.table("testData"))
+    val rddId = rddIdOf("testData")
+    assert(isExpectStorageLevel(rddId, level))
+  }
+
+  test("SQL interface support storageLevel(DISK_ONLY)") {
+    assertStorageLevel("('storageLevel' 'DISK_ONLY')", Disk)
+  }
+
+  test("SQL interface support storageLevel(DISK_ONLY) with invalid options") {
+    assertStorageLevel("('storageLevel' 'DISK_ONLY', 'a' '1', 'b' '2')", Disk)
+  }
+
+  test("SQL interface support storageLevel(MEMORY_ONLY)") {
+    assertStorageLevel("('storageLevel' 'MEMORY_ONLY')", Memory)
+  }
+
+  test("SQL interface cache SELECT ... support storageLevel(DISK_ONLY)") {
+    withTempView("testCacheSelect") {
+      sql("CACHE TABLE testCacheSelect OPTIONS('storageLevel' 'DISK_ONLY') 
SELECT * FROM testData")
+      assertCached(spark.table("testCacheSelect"))
+      val rddId = rddIdOf("testCacheSelect")
+      assert(isExpectStorageLevel(rddId, Disk))
+    }
+  }
+
+  test("SQL interface support storageLevel(Invalid StorageLevel)") {
+    val message = intercept[IllegalArgumentException] {
+      sql("CACHE TABLE testData OPTIONS('storageLevel' 
'invalid_storage_level')")
+    }.getMessage
+    assert(message.contains("Invalid StorageLevel: INVALID_STORAGE_LEVEL"))
+  }
+
+  test("SQL interface support storageLevel(with LAZY)") {
+    sql("CACHE LAZY TABLE testData OPTIONS('storageLevel' 'disk_only')")
+    assertCached(spark.table("testData"))
+
+    val rddId = rddIdOf("testData")
+    assert(
+      !isMaterialized(rddId),
+      "Lazily cached in-memory table shouldn't be materialized eagerly")
+
+    sql("SELECT COUNT(*) FROM testData").collect()
+    assert(
+      isMaterialized(rddId),
+      "Lazily cached in-memory table should have been materialized")
+    assert(isExpectStorageLevel(rddId, Disk))
+  }
+
   test("InMemoryRelation statistics") {
     sql("CACHE TABLE testData")
     spark.table("testData").queryExecution.withCachedData.collect {

http://git-wip-us.apache.org/repos/asf/spark/blob/9ad0f6ea/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 71f15a4..634b3db 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -571,7 +571,7 @@ private[hive] class TestHiveQueryExecution(
 
   override lazy val analyzed: LogicalPlan = {
     val describedTables = logical match {
-      case CacheTableCommand(tbl, _, _) => tbl.table :: Nil
+      case CacheTableCommand(tbl, _, _, _) => tbl.table :: Nil
       case _ => Nil
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to