[GitHub] spark pull request #21886: [SPARK-21274][SQL] Implement INTERSECT ALL clause
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21886 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21886: [SPARK-21274][SQL] Implement INTERSECT ALL clause
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21886#discussion_r206013525 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -529,9 +529,13 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Distinct(child) => throw new IllegalStateException( "logical distinct operator should have been replaced by aggregate in the optimizer") - case logical.Intersect(left, right) => + case logical.Intersect(left, right, false) => throw new IllegalStateException( - "logical intersect operator should have been replaced by semi-join in the optimizer") + "logical intersect operator should have been replaced by semi-join in the optimizer") + case logical.Intersect(left, right, true) => +throw new IllegalStateException( + "logical intersect operator should have been replaced by union, aggregate" + --- End diff -- nit: looks we need a space for ` aggregate"` -> ` aggregate "` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21886: [SPARK-21274][SQL] Implement INTERSECT ALL clause
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21886#discussion_r206013205 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -164,7 +164,12 @@ object SetOperation { def unapply(p: SetOperation): Option[(LogicalPlan, LogicalPlan)] = Some((p.left, p.right)) } -case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) { +case class Intersect( + left: LogicalPlan, + right: LogicalPlan, + isAll: Boolean = false) extends SetOperation(left, right) { --- End diff -- not a big deal at all but this has tree spaces .. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21886: [SPARK-21274][SQL] Implement INTERSECT ALL clause
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21886#discussion_r205993347 --- Diff: sql/core/src/test/resources/sql-tests/results/intersect-all.sql.out --- @@ -0,0 +1,236 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 17 + + +-- !query 0 +CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES +(1, 2), +(1, 2), +(1, 3), +(1, 3), +(2, 3), +(null, null), +(null, null) +AS tab1(k, v) +-- !query 0 schema +struct<> +-- !query 0 output + + + +-- !query 1 +CREATE TEMPORARY VIEW tab2 AS SELECT * FROM VALUES +(1, 2), +(1, 2), +(2, 3), +(3, 4), +(null, null), +(null, null) +AS tab2(k, v) +-- !query 1 schema +struct<> +-- !query 1 output + + + +-- !query 2 +SELECT * FROM tab1 +INTERSECT ALL +SELECT * FROM tab2 +-- !query 2 schema +struct +-- !query 2 output +1 2 +1 2 +2 3 +NULL NULL +NULL NULL + + +-- !query 3 +SELECT * FROM tab1 +INTERSECT ALL +SELECT * FROM tab1 WHERE k = 1 +-- !query 3 schema +struct +-- !query 3 output +1 2 +1 2 +1 3 +1 3 + + +-- !query 4 +SELECT * FROM tab1 WHERE k > 2 +INTERSECT ALL +SELECT * FROM tab2 +-- !query 4 schema +struct +-- !query 4 output + + + +-- !query 5 +SELECT * FROM tab1 +INTERSECT ALL +SELECT * FROM tab2 WHERE k > 3 +-- !query 5 schema +struct +-- !query 5 output + + + +-- !query 6 +SELECT * FROM tab1 +INTERSECT ALL +SELECT CAST(1 AS BIGINT), CAST(2 AS BIGINT) +-- !query 6 schema +struct +-- !query 6 output +1 2 + + +-- !query 7 +SELECT * FROM tab1 +INTERSECT ALL +SELECT array(1), 2 +-- !query 7 schema +struct<> +-- !query 7 output +org.apache.spark.sql.AnalysisException +IntersectAll can only be performed on tables with the compatible column types. array <> int at the first column of the second table; + + +-- !query 8 +SELECT k FROM tab1 +INTERSECT ALL +SELECT k, v FROM tab2 +-- !query 8 schema +struct<> +-- !query 8 output +org.apache.spark.sql.AnalysisException +IntersectAll can only be performed on tables with the same number of columns, but the first table has 1 columns and the second table has 2 columns; + + +-- !query 9 +SELECT * FROM tab2 +INTERSECT ALL +SELECT * FROM tab1 +INTERSECT ALL +SELECT * FROM tab2 +-- !query 9 schema +struct +-- !query 9 output +1 2 +1 2 +2 3 +NULL NULL +NULL NULL + + +-- !query 10 +SELECT * FROM tab1 +EXCEPT +SELECT * FROM tab2 +UNION ALL +SELECT * FROM tab1 +INTERSECT ALL +SELECT * FROM tab2 +-- !query 10 schema +struct +-- !query 10 output +1 2 +1 2 +2 3 +NULL NULL +NULL NULL --- End diff -- @gatorsmile Thank you.. I just went over my notes. The reason for the difference in output is because in Spark we give the same precedence to to all the set operators. The operators are basically evaluated in the order they appear in the query from left to right. But per standard, INTERSECT should have higher precedence over UNION and EXCEPT. We do have this problem in our current support of EXCEPT (DISTINCT) and INTERSECT (DISTINCT). I am fixing the test now to add parenthesize around the query block to force certain order of evaluation. I have opened https://issues.apache.org/jira/browse/SPARK-24966 to work in fixing the precedence in our grammer. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21886: [SPARK-21274][SQL] Implement INTERSECT ALL clause
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21886#discussion_r205980691 --- Diff: sql/core/src/test/resources/sql-tests/results/intersect-all.sql.out --- @@ -0,0 +1,236 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 17 + + +-- !query 0 +CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES +(1, 2), +(1, 2), +(1, 3), +(1, 3), +(2, 3), +(null, null), +(null, null) +AS tab1(k, v) +-- !query 0 schema +struct<> +-- !query 0 output + + + +-- !query 1 +CREATE TEMPORARY VIEW tab2 AS SELECT * FROM VALUES +(1, 2), +(1, 2), +(2, 3), +(3, 4), +(null, null), +(null, null) +AS tab2(k, v) +-- !query 1 schema +struct<> +-- !query 1 output + + + +-- !query 2 +SELECT * FROM tab1 +INTERSECT ALL +SELECT * FROM tab2 +-- !query 2 schema +struct +-- !query 2 output +1 2 +1 2 +2 3 +NULL NULL +NULL NULL + + +-- !query 3 +SELECT * FROM tab1 +INTERSECT ALL +SELECT * FROM tab1 WHERE k = 1 +-- !query 3 schema +struct +-- !query 3 output +1 2 +1 2 +1 3 +1 3 + + +-- !query 4 +SELECT * FROM tab1 WHERE k > 2 +INTERSECT ALL +SELECT * FROM tab2 +-- !query 4 schema +struct +-- !query 4 output + + + +-- !query 5 +SELECT * FROM tab1 +INTERSECT ALL +SELECT * FROM tab2 WHERE k > 3 +-- !query 5 schema +struct +-- !query 5 output + + + +-- !query 6 +SELECT * FROM tab1 +INTERSECT ALL +SELECT CAST(1 AS BIGINT), CAST(2 AS BIGINT) +-- !query 6 schema +struct +-- !query 6 output +1 2 + + +-- !query 7 +SELECT * FROM tab1 +INTERSECT ALL +SELECT array(1), 2 +-- !query 7 schema +struct<> +-- !query 7 output +org.apache.spark.sql.AnalysisException +IntersectAll can only be performed on tables with the compatible column types. array <> int at the first column of the second table; + + +-- !query 8 +SELECT k FROM tab1 +INTERSECT ALL +SELECT k, v FROM tab2 +-- !query 8 schema +struct<> +-- !query 8 output +org.apache.spark.sql.AnalysisException +IntersectAll can only be performed on tables with the same number of columns, but the first table has 1 columns and the second table has 2 columns; + + +-- !query 9 +SELECT * FROM tab2 +INTERSECT ALL +SELECT * FROM tab1 +INTERSECT ALL +SELECT * FROM tab2 +-- !query 9 schema +struct +-- !query 9 output +1 2 +1 2 +2 3 +NULL NULL +NULL NULL + + +-- !query 10 +SELECT * FROM tab1 +EXCEPT +SELECT * FROM tab2 +UNION ALL +SELECT * FROM tab1 +INTERSECT ALL +SELECT * FROM tab2 +-- !query 10 schema +struct +-- !query 10 output +1 2 +1 2 +2 3 +NULL NULL +NULL NULL --- End diff -- This misses one row (1, 3). Could you investigate the cause? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21886: [SPARK-21274][SQL] Implement INTERSECT ALL clause
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21886#discussion_r205933857 --- Diff: sql/core/src/test/resources/sql-tests/results/intersect-all.sql.out --- @@ -0,0 +1,212 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 17 + + +-- !query 0 +CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES +(1, 2), +(1, 2), +(1, 3), +(2, 3) +AS tab1(k, v) +-- !query 0 schema +struct<> +-- !query 0 output + + + +-- !query 1 +CREATE TEMPORARY VIEW tab2 AS SELECT * FROM VALUES +(1, 2), --- End diff -- also add another duplicate rows for (1, 2); --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21886: [SPARK-21274][SQL] Implement INTERSECT ALL clause
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21886#discussion_r205933861 --- Diff: sql/core/src/test/resources/sql-tests/results/intersect-all.sql.out --- @@ -0,0 +1,212 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 17 + + +-- !query 0 +CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES +(1, 2), +(1, 2), +(1, 3), --- End diff -- also add another duplicate row (1, 3) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21886: [SPARK-21274][SQL] Implement INTERSECT ALL clause
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21886#discussion_r205933866 --- Diff: sql/core/src/test/resources/sql-tests/results/intersect-all.sql.out --- @@ -0,0 +1,212 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 17 + + +-- !query 0 +CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES +(1, 2), +(1, 2), +(1, 3), +(2, 3) +AS tab1(k, v) +-- !query 0 schema +struct<> +-- !query 0 output + + + +-- !query 1 +CREATE TEMPORARY VIEW tab2 AS SELECT * FROM VALUES +(1, 2), +(2, 3) --- End diff -- add one more row (3, 4) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21886: [SPARK-21274][SQL] Implement INTERSECT ALL clause
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21886#discussion_r205933837 --- Diff: sql/core/src/test/resources/sql-tests/results/intersect-all.sql.out --- @@ -0,0 +1,212 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 17 + + +-- !query 0 +CREATE TEMPORARY VIEW tab1 AS SELECT * FROM VALUES +(1, 2), +(1, 2), +(1, 3), +(2, 3) +AS tab1(k, v) +-- !query 0 schema +struct<> +-- !query 0 output + + + +-- !query 1 +CREATE TEMPORARY VIEW tab2 AS SELECT * FROM VALUES +(1, 2), +(2, 3) +AS tab2(k, v) +-- !query 1 schema +struct<> +-- !query 1 output + + + +-- !query 2 +SELECT * FROM tab1 +INTERSECT ALL +SELECT * FROM tab2 +-- !query 2 schema +struct +-- !query 2 output +1 2 +2 3 + + +-- !query 3 +SELECT * FROM tab1 +INTERSECT ALL +SELECT * FROM tab1 WHERE k = 1 +-- !query 3 schema +struct +-- !query 3 output +1 2 +1 2 +1 3 + + +-- !query 4 +SELECT * FROM tab1 WHERE k > 2 +INTERSECT ALL +SELECT * FROM tab2 +-- !query 4 schema +struct +-- !query 4 output + + + +-- !query 5 +SELECT * FROM tab1 +INTERSECT ALL +SELECT * FROM tab2 WHERE k > 2 +-- !query 5 schema +struct +-- !query 5 output + + + +-- !query 6 +SELECT * FROM tab1 +INTERSECT ALL +SELECT CAST(1 AS BIGINT), CAST(2 AS BIGINT) +-- !query 6 schema +struct +-- !query 6 output +1 2 + + +-- !query 7 +SELECT * FROM tab1 +INTERSECT ALL +SELECT array(1), 2 +-- !query 7 schema +struct<> +-- !query 7 output +org.apache.spark.sql.AnalysisException +IntersectAll can only be performed on tables with the compatible column types. array <> int at the first column of the second table; + + +-- !query 8 +SELECT c1 FROM tab1 +INTERSECT ALL +SELECT c1, c2 FROM tab2 --- End diff -- use `k` and `v` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21886: [SPARK-21274][SQL] Implement INTERSECT ALL clause
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21886#discussion_r205933692 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -1934,6 +1934,23 @@ class Dataset[T] private[sql]( Intersect(planWithBarrier, other.planWithBarrier) } + /** + * Returns a new Dataset containing rows only in both this Dataset and another Dataset while + * preserving the duplicates. + * This is equivalent to `INTERSECT ALL` in SQL. + * + * @note Equality checking is performed directly on the encoded representation of the data + * and thus is not affected by a custom `equals` function defined on `T`. Also as standard + * in SQL, this function resolves columns by position (not by name). + * + * @group typedrel + * @since 2.4.0 + */ + def intersectAll(other: Dataset[T]): Dataset[T] = withSetOperator { +Intersect(planWithBarrier, other.planWithBarrier, isAll = true) --- End diff -- yes. Please do it too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21886: [SPARK-21274][SQL] Implement INTERSECT ALL clause
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21886#discussion_r205933642 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -1934,6 +1934,23 @@ class Dataset[T] private[sql]( Intersect(planWithBarrier, other.planWithBarrier) } + /** + * Returns a new Dataset containing rows only in both this Dataset and another Dataset while + * preserving the duplicates. + * This is equivalent to `INTERSECT ALL` in SQL. + * + * @note Equality checking is performed directly on the encoded representation of the data + * and thus is not affected by a custom `equals` function defined on `T`. Also as standard + * in SQL, this function resolves columns by position (not by name). + * + * @group typedrel + * @since 2.4.0 + */ + def intersectAll(other: Dataset[T]): Dataset[T] = withSetOperator { +Intersect(planWithBarrier, other.planWithBarrier, isAll = true) --- End diff -- @gatorsmile Sure.. how about exceptAll that was checked in today ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21886: [SPARK-21274][SQL] Implement INTERSECT ALL clause
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21886#discussion_r205933541 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -1934,6 +1934,23 @@ class Dataset[T] private[sql]( Intersect(planWithBarrier, other.planWithBarrier) } + /** + * Returns a new Dataset containing rows only in both this Dataset and another Dataset while + * preserving the duplicates. + * This is equivalent to `INTERSECT ALL` in SQL. + * + * @note Equality checking is performed directly on the encoded representation of the data + * and thus is not affected by a custom `equals` function defined on `T`. Also as standard + * in SQL, this function resolves columns by position (not by name). + * + * @group typedrel + * @since 2.4.0 + */ + def intersectAll(other: Dataset[T]): Dataset[T] = withSetOperator { +Intersect(planWithBarrier, other.planWithBarrier, isAll = true) --- End diff -- could you use logicalPlan? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21886: [SPARK-21274][SQL] Implement INTERSECT ALL clause
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21886#discussion_r205596431 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1407,6 +1408,87 @@ object ReplaceExceptWithAntiJoin extends Rule[LogicalPlan] { } } +/** + * Replaces logical [[Intersect]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : + * {{{ + *SELECT c1 FROM ut1 INTERSECT ALL SELECT c1 FROM ut2 + * }}} + * + * Rewritten Query: + * {{{ + * SELECT c1 + * FROM ( + *SELECT replicate_row(min_count, c1) AS (min_count, c1) + *FROM ( + * SELECT c1, + *vcol1_cnt, + *vcol2_cnt, + *IF (vcol1_cnt > vcol1_cnt, vcol2_cnt, vcol1_cnt) AS min_count + * FROM ( + * SELECT c1, count(vcol1) as vcol1_cnt, count(vcol2) as vcol2_cnt + * FROM ( + * SELECT c1, true as vcol1, null as vcol2 FROM ut1 + * UNION ALL + * SELECT c1, null as vcol1, true as vcol2 FROM ut2 --- End diff -- @viirya OK :-) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21886: [SPARK-21274][SQL] Implement INTERSECT ALL clause
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21886#discussion_r205596360 --- Diff: python/pyspark/sql/dataframe.py --- @@ -1475,6 +1475,28 @@ def intersect(self, other): """ return DataFrame(self._jdf.intersect(other._jdf), self.sql_ctx) +@since(2.4) +def intersectAll(self, other): +""" Return a new :class:`DataFrame` containing rows in bothe this frame and other frame --- End diff -- @viirya Will fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21886: [SPARK-21274][SQL] Implement INTERSECT ALL clause
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21886#discussion_r205596189 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1407,6 +1408,87 @@ object ReplaceExceptWithAntiJoin extends Rule[LogicalPlan] { } } +/** + * Replaces logical [[Intersect]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : + * {{{ + *SELECT c1 FROM ut1 INTERSECT ALL SELECT c1 FROM ut2 + * }}} + * + * Rewritten Query: + * {{{ + * SELECT c1 + * FROM ( + *SELECT replicate_row(min_count, c1) AS (min_count, c1) + *FROM ( + * SELECT c1, + *vcol1_cnt, + *vcol2_cnt, --- End diff -- @viirya Thanks !! No we don't. In the actual code, we don't project these columns out. I will fix the doc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21886: [SPARK-21274][SQL] Implement INTERSECT ALL clause
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21886#discussion_r205596232 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -1933,6 +1933,22 @@ class Dataset[T] private[sql]( Intersect(planWithBarrier, other.planWithBarrier) } + /** + * Returns a new Dataset containing rows only in both this Dataset and another Dataset while + * preserving the duplicates. + * This is equivalent to `INTERSECT ALL` in SQL. + * + * @note Equality checking is performed directly on the encoded representation of the data + * and thus is not affected by a custom `equals` function defined on `T`. + * + * @group typedrel + * @since 2.4.0 + */ --- End diff -- @viirya OK --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21886: [SPARK-21274][SQL] Implement INTERSECT ALL clause
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21886#discussion_r205575498 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1407,6 +1408,87 @@ object ReplaceExceptWithAntiJoin extends Rule[LogicalPlan] { } } +/** + * Replaces logical [[Intersect]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : + * {{{ + *SELECT c1 FROM ut1 INTERSECT ALL SELECT c1 FROM ut2 + * }}} + * + * Rewritten Query: + * {{{ + * SELECT c1 + * FROM ( + *SELECT replicate_row(min_count, c1) AS (min_count, c1) + *FROM ( + * SELECT c1, + *vcol1_cnt, + *vcol2_cnt, + *IF (vcol1_cnt > vcol1_cnt, vcol2_cnt, vcol1_cnt) AS min_count + * FROM ( + * SELECT c1, count(vcol1) as vcol1_cnt, count(vcol2) as vcol2_cnt + * FROM ( + * SELECT c1, true as vcol1, null as vcol2 FROM ut1 + * UNION ALL + * SELECT c1, null as vcol1, true as vcol2 FROM ut2 --- End diff -- Based on the implementation, I think this should be: ``` SELECT true as vcol1, null as vcol2, c1 FROM ut1 UNION ALL SELECT null as vcol1, true as vcol2, c1 FROM ut2 ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21886: [SPARK-21274][SQL] Implement INTERSECT ALL clause
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21886#discussion_r205574160 --- Diff: python/pyspark/sql/dataframe.py --- @@ -1475,6 +1475,28 @@ def intersect(self, other): """ return DataFrame(self._jdf.intersect(other._jdf), self.sql_ctx) +@since(2.4) +def intersectAll(self, other): +""" Return a new :class:`DataFrame` containing rows in bothe this frame and other frame --- End diff -- typo: `bothe`. frame -> dataframe. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21886: [SPARK-21274][SQL] Implement INTERSECT ALL clause
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21886#discussion_r205573413 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1407,6 +1408,87 @@ object ReplaceExceptWithAntiJoin extends Rule[LogicalPlan] { } } +/** + * Replaces logical [[Intersect]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : + * {{{ + *SELECT c1 FROM ut1 INTERSECT ALL SELECT c1 FROM ut2 + * }}} + * + * Rewritten Query: + * {{{ + * SELECT c1 + * FROM ( + *SELECT replicate_row(min_count, c1) AS (min_count, c1) + *FROM ( + * SELECT c1, + *vcol1_cnt, + *vcol2_cnt, + *IF (vcol1_cnt > vcol1_cnt, vcol2_cnt, vcol1_cnt) AS min_count --- End diff -- typo here `vcol1_cnt > vcol1_cnt` -> `vcol1_cnt > vcol2_cnt`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21886: [SPARK-21274][SQL] Implement INTERSECT ALL clause
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21886#discussion_r205573922 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -1933,6 +1933,22 @@ class Dataset[T] private[sql]( Intersect(planWithBarrier, other.planWithBarrier) } + /** + * Returns a new Dataset containing rows only in both this Dataset and another Dataset while + * preserving the duplicates. + * This is equivalent to `INTERSECT ALL` in SQL. + * + * @note Equality checking is performed directly on the encoded representation of the data + * and thus is not affected by a custom `equals` function defined on `T`. + * + * @group typedrel + * @since 2.4.0 + */ --- End diff -- Better to add `resolves columns by position (not by name)`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21886: [SPARK-21274][SQL] Implement INTERSECT ALL clause
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21886#discussion_r205573522 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1407,6 +1408,87 @@ object ReplaceExceptWithAntiJoin extends Rule[LogicalPlan] { } } +/** + * Replaces logical [[Intersect]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : + * {{{ + *SELECT c1 FROM ut1 INTERSECT ALL SELECT c1 FROM ut2 + * }}} + * + * Rewritten Query: + * {{{ + * SELECT c1 + * FROM ( + *SELECT replicate_row(min_count, c1) AS (min_count, c1) + *FROM ( + * SELECT c1, + *vcol1_cnt, + *vcol2_cnt, --- End diff -- Do we need to have `vcol1_cnt` and `vcol2_cnt` here? I think above `replicate_row` only takes `min_count` input. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21886: [SPARK-21274][SQL] Implement INTERSECT ALL clause
GitHub user dilipbiswal opened a pull request: https://github.com/apache/spark/pull/21886 [SPARK-21274][SQL] Implement INTERSECT ALL clause ## What changes were proposed in this pull request? Implements INTERSECT ALL clause through query rewrites using existing operators in Spark. Please refer to [Link](https://drive.google.com/open?id=1nyW0T0b_ajUduQoPgZLAsyHK8s3_dko3ulQuxaLpUXE) for the design. Input Query ``` SQL SELECT c1 FROM ut1 INTERSECT ALL SELECT c1 FROM ut2 ``` Rewritten Query ```SQL SELECT c1 FROM ( SELECT replicate_row(min_count, c1) AS (min_count, c1) FROM ( SELECT c1, vcol1_cnt, vcol2_cnt, IF (vcol1_cnt > vcol1_cnt, vcol2_cnt, vcol1_cnt) AS min_count FROM ( SELECT c1, count(vcol1) as vcol1_cnt, count(vcol2) as vcol2_cnt FROM ( SELECT c1, true as vcol1, null as vcol2 FROM ut1 UNION ALL SELECT c1, null as vcol1, true as vcol2 FROM ut2 ) AS union_all GROUP BY c1 HAVING vcol1_cnt >= 1 AND vcol2_cnt >= 1 ) ) ) ``` ## How was this patch tested? Added test cases in SQLQueryTestSuite, DataFrameSuite, SetOperationSuite You can merge this pull request into a Git repository by running: $ git pull https://github.com/dilipbiswal/spark dkb_intersect_all_final Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21886.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21886 commit b313782ae3aa0756b3f2046bf0b9ac4cab4870f8 Author: Dilip Biswal Date: 2018-07-25T06:23:28Z generator commit 1039e47e98efdcccbf64e392848b6cc04156bf77 Author: Dilip Biswal Date: 2018-05-07T08:31:11Z [SPARK-21274] Implement INTERSECT ALL clause --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org