Repository: spark
Updated Branches:
  refs/heads/master ea465af12 -> 23468e7e9


[SPARK-2220][SQL] Fixes remaining Hive commands

This PR adds support for the `ADD FILE` Hive command, and removes 
`ShellCommand` and `SourceCommand`. The reason is described in [this SPARK-2220 
comment](https://issues.apache.org/jira/browse/SPARK-2220?focusedCommentId=14191841&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14191841).

Author: Cheng Lian <lian.cs....@gmail.com>

Closes #3038 from liancheng/hive-commands and squashes the following commits:

6db61e0 [Cheng Lian] Fixes remaining Hive commands


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/23468e7e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/23468e7e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/23468e7e

Branch: refs/heads/master
Commit: 23468e7e96bf047ba53806352558b9d661567b23
Parents: ea465af
Author: Cheng Lian <lian.cs....@gmail.com>
Authored: Fri Oct 31 11:34:51 2014 -0700
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Fri Oct 31 11:34:51 2014 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/SparkSQLParser.scala     | 14 +------------
 .../sql/catalyst/plans/logical/commands.scala   | 22 +++++++++-----------
 .../apache/spark/sql/hive/HiveStrategies.scala  |  2 ++
 .../spark/sql/hive/execution/commands.scala     | 16 ++++++++++++++
 .../sql/hive/execution/HiveQuerySuite.scala     | 19 ++++++++++++++---
 5 files changed, 45 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/23468e7e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala
index 12e8346..f5c19ee 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SparkSQLParser.scala
@@ -137,7 +137,6 @@ private[sql] class SparkSQLParser(fallback: String => 
LogicalPlan) extends Abstr
   protected val LAZY    = Keyword("LAZY")
   protected val SET     = Keyword("SET")
   protected val TABLE   = Keyword("TABLE")
-  protected val SOURCE  = Keyword("SOURCE")
   protected val UNCACHE = Keyword("UNCACHE")
 
   protected implicit def asParser(k: Keyword): Parser[String] =
@@ -152,8 +151,7 @@ private[sql] class SparkSQLParser(fallback: String => 
LogicalPlan) extends Abstr
 
   override val lexical = new SqlLexical(reservedWords)
 
-  override protected lazy val start: Parser[LogicalPlan] =
-    cache | uncache | set | shell | source | others
+  override protected lazy val start: Parser[LogicalPlan] = cache | uncache | 
set | others
 
   private lazy val cache: Parser[LogicalPlan] =
     CACHE ~> LAZY.? ~ (TABLE ~> ident) ~ (AS ~> restInput).? ^^ {
@@ -171,16 +169,6 @@ private[sql] class SparkSQLParser(fallback: String => 
LogicalPlan) extends Abstr
       case input => SetCommandParser(input)
     }
 
-  private lazy val shell: Parser[LogicalPlan] =
-    "!" ~> restInput ^^ {
-      case input => ShellCommand(input.trim)
-    }
-
-  private lazy val source: Parser[LogicalPlan] =
-    SOURCE ~> restInput ^^ {
-      case input => SourceCommand(input.trim)
-    }
-
   private lazy val others: Parser[LogicalPlan] =
     wholeInput ^^ {
       case input => fallback(input)

http://git-wip-us.apache.org/repos/asf/spark/blob/23468e7e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
index b8ba2ee..1d513d7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.plans.logical
 
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, BoundReference}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.catalyst.types.StringType
 
 /**
@@ -41,6 +41,15 @@ case class NativeCommand(cmd: String) extends Command {
 /**
  * Commands of the form "SET [key [= value] ]".
  */
+case class DFSCommand(kv: Option[(String, Option[String])]) extends Command {
+  override def output = Seq(
+    AttributeReference("DFS output", StringType, nullable = false)())
+}
+
+/**
+ *
+ * Commands of the form "SET [key [= value] ]".
+ */
 case class SetCommand(kv: Option[(String, Option[String])]) extends Command {
   override def output = Seq(
     AttributeReference("", StringType, nullable = false)())
@@ -81,14 +90,3 @@ case class DescribeCommand(
     AttributeReference("data_type", StringType, nullable = false)(),
     AttributeReference("comment", StringType, nullable = false)())
 }
-
-/**
- * Returned for the "! shellCommand" command
- */
-case class ShellCommand(cmd: String) extends Command
-
-
-/**
- * Returned for the "SOURCE file" command
- */
-case class SourceCommand(filePath: String) extends Command

http://git-wip-us.apache.org/repos/asf/spark/blob/23468e7e/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index e59d4d5..3207ad8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -206,6 +206,8 @@ private[hive] trait HiveStrategies {
 
       case hive.AddJar(path) => execution.AddJar(path) :: Nil
 
+      case hive.AddFile(path) => execution.AddFile(path) :: Nil
+
       case hive.AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) 
:: Nil
 
       case describe: logical.DescribeCommand =>

http://git-wip-us.apache.org/repos/asf/spark/blob/23468e7e/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 0fc674a..903075e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -76,3 +76,19 @@ case class AddJar(path: String) extends LeafNode with 
Command {
     Seq.empty[Row]
   }
 }
+
+/**
+ * :: DeveloperApi ::
+ */
+@DeveloperApi
+case class AddFile(path: String) extends LeafNode with Command {
+  def hiveContext = sqlContext.asInstanceOf[HiveContext]
+
+  override def output = Seq.empty
+
+  override protected lazy val sideEffectResult: Seq[Row] = {
+    hiveContext.runSqlHive(s"ADD FILE $path")
+    hiveContext.sparkContext.addFile(path)
+    Seq.empty[Row]
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/23468e7e/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index ffe1f0b..5918f88 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -17,11 +17,13 @@
 
 package org.apache.spark.sql.hive.execution
 
+import java.io.File
+
 import scala.util.Try
 
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkFiles, SparkException}
 import org.apache.spark.sql.catalyst.plans.logical.Project
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.hive.test.TestHive
@@ -569,7 +571,7 @@ class HiveQuerySuite extends HiveComparisonTest {
           |WITH serdeproperties('s1'='9')
         """.stripMargin)
     }
-    // Now only verify 0.12.0, and ignore other versions due to binary 
compatability
+    // Now only verify 0.12.0, and ignore other versions due to binary 
compatibility
     // current TestSerDe.jar is from 0.12.0
     if (HiveShim.version == "0.12.0") {
       sql(s"ADD JAR $testJar")
@@ -581,6 +583,17 @@ class HiveQuerySuite extends HiveComparisonTest {
     sql("DROP TABLE alter1")
   }
 
+  test("ADD FILE command") {
+    val testFile = TestHive.getHiveFile("data/files/v1.txt").getCanonicalFile
+    sql(s"ADD FILE $testFile")
+
+    val checkAddFileRDD = sparkContext.parallelize(1 to 2, 1).mapPartitions { 
_ =>
+      Iterator.single(new File(SparkFiles.get("v1.txt")).canRead)
+    }
+
+    assert(checkAddFileRDD.first())
+  }
+
   case class LogEntry(filename: String, message: String)
   case class LogFile(name: String)
 
@@ -816,7 +829,7 @@ class HiveQuerySuite extends HiveComparisonTest {
 
   createQueryTest("select from thrift based table",
     "SELECT * from src_thrift")
-  
+
   // Put tests that depend on specific Hive settings before these last two 
test,
   // since they modify /clear stuff.
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to