Repository: spark
Updated Branches:
  refs/heads/master 2c3cc7641 -> 5522151eb


[SPARK-2594][SQL] Support CACHE TABLE <name> AS SELECT ...

This feature allows user to add cache table from the select query.
Example : ```CACHE TABLE testCacheTable AS SELECT * FROM TEST_TABLE```
Spark takes this type of SQL as command and it does lazy caching just like 
```SQLContext.cacheTable```, ```CACHE TABLE <name>``` does.
It can be executed from both SQLContext and HiveContext.

Recreated the pull request after rebasing with master.And fixed all the 
comments raised in previous pull requests.
https://github.com/apache/spark/pull/2381
https://github.com/apache/spark/pull/2390

Author : ravipesala ravindra.pesalahuawei.com

Author: ravipesala <ravindra.pes...@huawei.com>

Closes #2397 from ravipesala/SPARK-2594 and squashes the following commits:

a5f0beb [ravipesala] Simplified the code as per Admin comment.
8059cd2 [ravipesala] Changed the behaviour from eager caching to lazy caching.
d6e469d [ravipesala] Code review comments by Admin are handled.
c18aa38 [ravipesala] Merge remote-tracking branch 
'remotes/ravipesala/Add-Cache-table-as' into SPARK-2594
394d5ca [ravipesala] Changed style
fb1759b [ravipesala] Updated as per Admin comments
8c9993c [ravipesala] Changed the style
d8b37b2 [ravipesala] Updated as per the comments by Admin
bc0bffc [ravipesala] Merge remote-tracking branch 
'ravipesala/Add-Cache-table-as' into Add-Cache-table-as
e3265d0 [ravipesala] Updated the code as per the comments by Admin in pull 
request.
724b9db [ravipesala] Changed style
aaf5b59 [ravipesala] Added comment
dc33895 [ravipesala] Updated parser to support add cache table command
b5276b2 [ravipesala] Updated parser to support add cache table command
eebc0c1 [ravipesala] Add CACHE TABLE <name> AS SELECT ...
6758f80 [ravipesala] Changed style
7459ce3 [ravipesala] Added comment
13c8e27 [ravipesala] Updated parser to support add cache table command
4e858d8 [ravipesala] Updated parser to support add cache table command
b803fc8 [ravipesala] Add CACHE TABLE <name> AS SELECT ...


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5522151e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5522151e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5522151e

Branch: refs/heads/master
Commit: 5522151eb14f4208798901f5c090868edd8e8dde
Parents: 2c3cc76
Author: ravipesala <ravindra.pes...@huawei.com>
Authored: Fri Sep 19 15:31:57 2014 -0700
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Fri Sep 19 15:31:57 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/SqlParser.scala   | 14 +++++++--
 .../sql/catalyst/plans/logical/commands.scala   |  5 ++++
 .../spark/sql/execution/SparkStrategies.scala   |  2 ++
 .../apache/spark/sql/execution/commands.scala   | 18 ++++++++++++
 .../org/apache/spark/sql/CachedTableSuite.scala | 13 +++++++++
 .../org/apache/spark/sql/hive/HiveQl.scala      | 30 +++++++++++++-------
 6 files changed, 69 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5522151e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index ca69531..862f787 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -151,7 +151,7 @@ class SqlParser extends StandardTokenParsers with 
PackratParsers {
         EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} |
         UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => 
Distinct(Union(q1, q2)) }
       )
-    | insert | cache
+    | insert | cache | unCache
   )
 
   protected lazy val select: Parser[LogicalPlan] =
@@ -183,9 +183,17 @@ class SqlParser extends StandardTokenParsers with 
PackratParsers {
     }
 
   protected lazy val cache: Parser[LogicalPlan] =
-    (CACHE ^^^ true | UNCACHE ^^^ false) ~ TABLE ~ ident ^^ {
-      case doCache ~ _ ~ tableName => CacheCommand(tableName, doCache)
+    CACHE ~ TABLE ~> ident ~ opt(AS ~> select) <~ opt(";") ^^ {
+      case tableName ~ None => 
+        CacheCommand(tableName, true)
+      case tableName ~ Some(plan) =>
+        CacheTableAsSelectCommand(tableName, plan)
     }
+    
+  protected lazy val unCache: Parser[LogicalPlan] =
+    UNCACHE ~ TABLE ~> ident <~ opt(";") ^^ {
+      case tableName => CacheCommand(tableName, false)
+    }    
 
   protected lazy val projections: Parser[Seq[Expression]] = repsep(projection, 
",")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5522151e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
index a01809c..8366639 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
@@ -75,3 +75,8 @@ case class DescribeCommand(
     AttributeReference("data_type", StringType, nullable = false)(),
     AttributeReference("comment", StringType, nullable = false)())
 }
+
+/**
+ * Returned for the "CACHE TABLE tableName AS SELECT .." command.
+ */
+case class CacheTableAsSelectCommand(tableName: String, plan: LogicalPlan) 
extends Command

http://git-wip-us.apache.org/repos/asf/spark/blob/5522151e/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 7943d6e..45687d9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -305,6 +305,8 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
         Seq(execution.ExplainCommand(logicalPlan, plan.output, 
extended)(context))
       case logical.CacheCommand(tableName, cache) =>
         Seq(execution.CacheCommand(tableName, cache)(context))
+      case logical.CacheTableAsSelectCommand(tableName, plan) =>
+        Seq(execution.CacheTableAsSelectCommand(tableName, plan))
       case _ => Nil
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/5522151e/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 94543fc..c2f48a9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -166,3 +166,21 @@ case class DescribeCommand(child: SparkPlan, output: 
Seq[Attribute])(
       child.output.map(field => Row(field.name, field.dataType.toString, null))
   }
 }
+
+/**
+ * :: DeveloperApi ::
+ */
+@DeveloperApi
+case class CacheTableAsSelectCommand(tableName: String, logicalPlan: 
LogicalPlan)
+  extends LeafNode with Command {
+  
+  override protected[sql] lazy val sideEffectResult = {
+    import sqlContext._
+    logicalPlan.registerTempTable(tableName)
+    cacheTable(tableName) 
+    Seq.empty[Row]
+  }
+
+  override def output: Seq[Attribute] = Seq.empty  
+  
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/5522151e/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index befef46..5915928 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -119,4 +119,17 @@ class CachedTableSuite extends QueryTest {
     }
     assert(!TestSQLContext.isCached("testData"), "Table 'testData' should not 
be cached")
   }
+  
+  test("CACHE TABLE tableName AS SELECT Star Table") {
+    TestSQLContext.sql("CACHE TABLE testCacheTable AS SELECT * FROM testData")
+    TestSQLContext.sql("SELECT * FROM testCacheTable WHERE key = 1").collect()
+    assert(TestSQLContext.isCached("testCacheTable"), "Table 'testCacheTable' 
should be cached")
+    TestSQLContext.uncacheTable("testCacheTable")
+  }
+  
+  test("'CACHE TABLE tableName AS SELECT ..'") {
+    TestSQLContext.sql("CACHE TABLE testCacheTable AS SELECT * FROM testData")
+    assert(TestSQLContext.isCached("testCacheTable"), "Table 'testCacheTable' 
should be cached")
+    TestSQLContext.uncacheTable("testCacheTable")
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5522151e/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 21ecf17..0aa6292 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -229,7 +229,12 @@ private[hive] object HiveQl {
             SetCommand(Some(key), Some(value))
         }
       } else if (sql.trim.toLowerCase.startsWith("cache table")) {
-        CacheCommand(sql.trim.drop(12).trim, true)
+        sql.trim.drop(12).trim.split(" ").toSeq match {
+          case Seq(tableName) => 
+            CacheCommand(tableName, true)
+          case Seq(tableName, _, select @ _*) => 
+            CacheTableAsSelectCommand(tableName, createPlan(select.mkString(" 
").trim))
+        }
       } else if (sql.trim.toLowerCase.startsWith("uncache table")) {
         CacheCommand(sql.trim.drop(14).trim, false)
       } else if (sql.trim.toLowerCase.startsWith("add jar")) {
@@ -243,15 +248,7 @@ private[hive] object HiveQl {
       } else if (sql.trim.startsWith("!")) {
         ShellCommand(sql.drop(1))
       } else {
-        val tree = getAst(sql)
-        if (nativeCommands contains tree.getText) {
-          NativeCommand(sql)
-        } else {
-          nodeToPlan(tree) match {
-            case NativePlaceholder => NativeCommand(sql)
-            case other => other
-          }
-        }
+        createPlan(sql)
       }
     } catch {
       case e: Exception => throw new ParseException(sql, e)
@@ -262,6 +259,19 @@ private[hive] object HiveQl {
         """.stripMargin)
     }
   }
+  
+  /** Creates LogicalPlan for a given HiveQL string. */
+  def createPlan(sql: String) = {
+    val tree = getAst(sql)
+    if (nativeCommands contains tree.getText) {
+      NativeCommand(sql)
+    } else {
+      nodeToPlan(tree) match {
+        case NativePlaceholder => NativeCommand(sql)
+        case other => other
+      }
+    }
+  }
 
   def parseDdl(ddl: String): Seq[Attribute] = {
     val tree =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to