Repository: spark Updated Branches: refs/heads/master b3e768e15 -> 5dadda864
[SPARK-2234][SQL]Spark SQL basicOperators add Except operator Hi all, I want to submit a Except operator in basicOperators.scala In SQL case.SQL support two table do except operator. select * from table1 except select * from table2 This operator support the substract function .Return an table with the elements from `this` that are not in `other`.This operator should limit the input SparkPlan Seq only has two member.The check will later support JIRA:https://issues.apache.org/jira/browse/SPARK-2234 Author: Yanjie Gao <gaoyanji...@163.com> Author: YanjieGao <396154...@qq.com> Author: root <root@node4.(none)> Author: gaoyanjie <gaoyanji...@163.com> Closes #1151 from YanjieGao/patch-6 and squashes the following commits: f19f899 [YanjieGao] add a new blank line in basicoperators.scala 2ff7d73 [YanjieGao] resolve the identation in SqlParser and SparkStrategies fdb5227 [YanjieGao] Merge remote branch 'upstream/master' into patch-6 9940d19 [YanjieGao] make comment less than 100c 09c7413 [YanjieGao] pr 1151 SqlParser add cache ,basic Operator rename Except and modify comment b4b5867 [root] Merge remote branch 'upstream/master' into patch-6 b4c3869 [Yanjie Gao] change SparkStrategies Sparkcontext to SqlContext 7e0ec29 [Yanjie Gao] delete multi test 7e7c83f [Yanjie Gao] delete conflict except b01beb8 [YanjieGao] resolve conflict sparkstrategies and basicOperators 4dc8166 [YanjieGao] resolve conflict fa68a98 [Yanjie Gao] Update joins.scala 8e6bb00 [Yanjie Gao] delete conflict except dd9ba5e [Yanjie Gao] Update joins.scala a0d4e73 [Yanjie Gao] delete skew join 60f5ddd [Yanjie Gao] update less than 100c 0e72233 [Yanjie Gao] update SQLQuerySuite on master branch 7f916b5 [Yanjie Gao] update execution/basicOperators on master branch a28dece [Yanjie Gao] Update logical/basicOperators on master branch a639935 [Yanjie Gao] Update SparkStrategies.scala 3bf7def [Yanjie Gao] update SqlParser on master branch 26f833f [Yanjie Gao] update SparkStrategies.scala on master branch 8dd063f [Yanjie Gao] Update logical/basicOperators on master branch 9847dcf [Yanjie Gao] update SqlParser on masterbranch d6a4604 [Yanjie Gao] Update joins.scala 424c507 [Yanjie Gao] Update joins.scala 7680742 [Yanjie Gao] Update SqlParser.scala a7193d8 [gaoyanjie] [SPARK-2234][SQL]Spark SQL basicOperators add Except operator #1151 5c8a224 [Yanjie Gao] update the line less than 100c ee066b3 [Yanjie Gao] Update basicOperators.scala 32a80ab [Yanjie Gao] remove except in HiveQl cf232eb [Yanjie Gao] update 1comment 2space3 left.out f1ea3f3 [Yanjie Gao] remove comment 7ea9b91 [Yanjie Gao] remove annotation 7f3d613 [Yanjie Gao] update .map(_.copy()) 670a1bb [Yanjie Gao] Update HiveQl.scala 3fe7746 [Yanjie Gao] Update SQLQuerySuite.scala a36eb0a [Yanjie Gao] Update basicOperators.scala 7859e56 [Yanjie Gao] Update SparkStrategies.scala 052346d [Yanjie Gao] Subtract is conflict with Subtract(e1,e2) aab3785 [Yanjie Gao] Update SQLQuerySuite.scala 4bf80b1 [Yanjie Gao] update subtract to except 4bdd520 [Yanjie Gao] Update SqlParser.scala 2d4bfbd [Yanjie Gao] Update SQLQuerySuite.scala 0808921 [Yanjie Gao] SQLQuerySuite a8a1948 [Yanjie Gao] SparkStrategies 1fe96c0 [Yanjie Gao] HiveQl.scala update 3305e40 [Yanjie Gao] SqlParser 7a98c37 [Yanjie Gao] Update basicOperators.scala cf5b9d0 [Yanjie Gao] Update basicOperators.scala 8945835 [Yanjie Gao] object SkewJoin extends Strategy 2b98962 [Yanjie Gao] Update SqlParser.scala dd32980 [Yanjie Gao] update1 68815b2 [Yanjie Gao] Reformat the code style 4eb43ec [Yanjie Gao] Update basicOperators.scala aa06072 [Yanjie Gao] Reformat the code sytle Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5dadda86 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5dadda86 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5dadda86 Branch: refs/heads/master Commit: 5dadda86456e1d3918e320b83aec7e2f1352d95d Parents: b3e768e Author: Yanjie Gao <gaoyanji...@163.com> Authored: Fri Jul 4 02:43:57 2014 -0700 Committer: Michael Armbrust <mich...@databricks.com> Committed: Fri Jul 4 02:43:57 2014 -0700 ---------------------------------------------------------------------- .../org/apache/spark/sql/catalyst/SqlParser.scala | 3 +++ .../sql/catalyst/plans/logical/basicOperators.scala | 6 ++++++ .../apache/spark/sql/execution/SparkStrategies.scala | 2 ++ .../apache/spark/sql/execution/basicOperators.scala | 15 +++++++++++++++ .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 14 ++++++++++++++ 5 files changed, 40 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5dadda86/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 61762fa..ecb1112 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -118,6 +118,8 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected val UNCACHE = Keyword("UNCACHE") protected val UNION = Keyword("UNION") protected val WHERE = Keyword("WHERE") + protected val EXCEPT = Keyword("EXCEPT") + // Use reflection to find the reserved words defined in this class. protected val reservedWords = @@ -138,6 +140,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected lazy val query: Parser[LogicalPlan] = ( select * ( UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } | + EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} | UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) } ) | insert | cache http://git-wip-us.apache.org/repos/asf/spark/blob/5dadda86/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 3e06398..bac5a72 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -89,6 +89,12 @@ case class Join( } } +case class Except(left: LogicalPlan, right: LogicalPlan) extends BinaryNode { + def output = left.output + + def references = Set.empty +} + case class InsertIntoTable( table: LogicalPlan, partition: Map[String, Option[String]], http://git-wip-us.apache.org/repos/asf/spark/blob/5dadda86/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 0925605..9e036e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -273,6 +273,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Limit(limit, planLater(child))(sqlContext) :: Nil case Unions(unionChildren) => execution.Union(unionChildren.map(planLater))(sqlContext) :: Nil + case logical.Except(left,right) => + execution.Except(planLater(left),planLater(right)) :: Nil case logical.Generate(generator, join, outer, _, child) => execution.Generate(generator, join = join, outer = outer, planLater(child)) :: Nil case logical.NoRelation => http://git-wip-us.apache.org/repos/asf/spark/blob/5dadda86/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index a278f1c..4b59e0b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -205,3 +205,18 @@ object ExistingRdd { case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode { override def execute() = rdd } + +/** + * :: DeveloperApi :: + * Returns a table with the elements from left that are not in right using + * the built-in spark subtract function. + */ +@DeveloperApi +case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode { + override def output = left.output + + override def execute() = { + left.execute().map(_.copy()).subtract(right.execute().map(_.copy())) + } +} + http://git-wip-us.apache.org/repos/asf/spark/blob/5dadda86/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 2c1cb18..5c6701e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -371,6 +371,20 @@ class SQLQuerySuite extends QueryTest { (3, null))) } + test("EXCEPT") { + + checkAnswer( + sql("SELECT * FROM lowerCaseData EXCEPT SELECT * FROM upperCaseData "), + (1, "a") :: + (2, "b") :: + (3, "c") :: + (4, "d") :: Nil) + checkAnswer( + sql("SELECT * FROM lowerCaseData EXCEPT SELECT * FROM lowerCaseData "), Nil) + checkAnswer( + sql("SELECT * FROM upperCaseData EXCEPT SELECT * FROM upperCaseData "), Nil) + } + test("SET commands semantics using sql()") { TestSQLContext.settings.synchronized { clear()