Fixed full join query issue with aggregate Fixed in spark-1.6
Fixed style Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e67003cf Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e67003cf Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e67003cf Branch: refs/heads/branch-1.1 Commit: e67003cf657e743194cf449792b67f896b1adc74 Parents: 0c6f5f3 Author: ravipesala <ravi.pes...@gmail.com> Authored: Tue May 23 10:32:21 2017 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Thu Jun 15 12:57:35 2017 +0530 ---------------------------------------------------------------------- .../joinquery/AllDataTypesTestCaseJoin.scala | 9 +- .../spark/sql/optimizer/CarbonOptimizer.scala | 101 ++++++++++++------- .../sql/optimizer/CarbonLateDecodeRule.scala | 101 +++++++++++-------- 3 files changed, 131 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/e67003cf/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/joinquery/AllDataTypesTestCaseJoin.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/joinquery/AllDataTypesTestCaseJoin.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/joinquery/AllDataTypesTestCaseJoin.scala index be0f8e6..08fad0b 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/joinquery/AllDataTypesTestCaseJoin.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/joinquery/AllDataTypesTestCaseJoin.scala @@ -28,7 +28,7 @@ import org.scalatest.BeforeAndAfterAll class AllDataTypesTestCaseJoin extends QueryTest with BeforeAndAfterAll { override def beforeAll { - sql("CREATE TABLE alldatatypestableJoin (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format'") + sql("CREATE TABLE alldatatypestableJoin (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('DICTIONARY_INCLUDE'='empno','TABLE_BLOCKSIZE'='4')") sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE alldatatypestableJoin OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')"""); sql("CREATE TABLE alldatatypestableJoin_hive (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int)row format delimited fields terminated by ','") @@ -90,6 +90,13 @@ class AllDataTypesTestCaseJoin extends QueryTest with BeforeAndAfterAll { sql("DROP TABLE IF EXISTS carbon_table2") } + test("join with aggregate plan") { + checkAnswer(sql("SELECT c1.empno,c1.empname, c2.empno FROM (SELECT empno,empname FROM alldatatypestableJoin GROUP BY empno,empname) c1 FULL JOIN " + + "(SELECT empno FROM alldatatypestableJoin GROUP BY empno) c2 ON c1.empno = c2.empno"), + sql("SELECT c1.empno,c1.empname, c2.empno FROM (SELECT empno,empname FROM alldatatypestableJoin_hive GROUP BY empno,empname) c1 FULL JOIN " + + "(SELECT empno FROM alldatatypestableJoin_hive GROUP BY empno) c2 ON c1.empno = c2.empno")) + } + override def afterAll { sql("drop table alldatatypestableJoin") sql("drop table alldatatypestableJoin_hive") http://git-wip-us.apache.org/repos/asf/carbondata/blob/e67003cf/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala index 9aa8158..02ac5f8 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala @@ -206,6 +206,47 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation]) relations.foreach(_.fillAttributeMap(attrMap)) def addTempDecoder(currentPlan: LogicalPlan): LogicalPlan = { + + def transformAggregateExpression(agg: Aggregate, + aggonGroups: util.HashSet[AttributeReferenceWrapper] = null): LogicalPlan = { + val attrsOndimAggs = new util.HashSet[AttributeReferenceWrapper] + if (aggonGroups != null) { + attrsOndimAggs.addAll(aggonGroups) + } + agg.aggregateExpressions.map { + case attr: AttributeReference => + case a@Alias(attr: AttributeReference, name) => + case aggExp: AggregateExpression => + aggExp.transform { + case aggExp: AggregateExpression => + collectDimensionAggregates(aggExp, attrsOndimAggs, aliasMap, attrMap) + aggExp + } + case others => + others.collect { + case attr: AttributeReference + if isDictionaryEncoded(attr, attrMap, aliasMap) => + attrsOndimAggs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))) + } + } + var child = agg.child + // Incase if the child also aggregate then push down decoder to child + if (attrsOndimAggs.size() > 0 && !child.equals(agg)) { + child = CarbonDictionaryTempDecoder(attrsOndimAggs, + new util.HashSet[AttributeReferenceWrapper](), + agg.child) + } + if (!decoder && aggonGroups == null) { + decoder = true + CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](), + new util.HashSet[AttributeReferenceWrapper](), + Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child), + isOuter = true) + } else { + Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child) + } + } + currentPlan match { case limit@Limit(_, child: Sort) => if (!decoder) { @@ -288,39 +329,7 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation]) } case agg: Aggregate if !agg.child.isInstanceOf[CarbonDictionaryTempDecoder] => - val attrsOndimAggs = new util.HashSet[AttributeReferenceWrapper] - agg.aggregateExpressions.map { - case attr: AttributeReference => - case a@Alias(attr: AttributeReference, name) => - case aggExp: AggregateExpression => - aggExp.transform { - case aggExp: AggregateExpression => - collectDimensionAggregates(aggExp, attrsOndimAggs, aliasMap, attrMap) - aggExp - } - case others => - others.collect { - case attr: AttributeReference - if isDictionaryEncoded(attr, attrMap, aliasMap) => - attrsOndimAggs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))) - } - } - var child = agg.child - // Incase if the child also aggregate then push down decoder to child - if (attrsOndimAggs.size() > 0 && !child.equals(agg)) { - child = CarbonDictionaryTempDecoder(attrsOndimAggs, - new util.HashSet[AttributeReferenceWrapper](), - agg.child) - } - if (!decoder) { - decoder = true - CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](), - new util.HashSet[AttributeReferenceWrapper](), - Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child), - isOuter = true) - } else { - Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child) - } + transformAggregateExpression(agg) case expand: Expand if !expand.child.isInstanceOf[CarbonDictionaryTempDecoder] => val attrsOnExpand = new util.HashSet[AttributeReferenceWrapper] expand.projections.map {s => @@ -410,15 +419,29 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation]) var rightPlan = j.right if (leftCondAttrs.size() > 0 && !leftPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) { - leftPlan = CarbonDictionaryTempDecoder(leftCondAttrs, - new util.HashSet[AttributeReferenceWrapper](), - j.left) + leftPlan = leftPlan match { + case agg: Aggregate => + CarbonDictionaryTempDecoder(leftCondAttrs, + new util.HashSet[AttributeReferenceWrapper](), + transformAggregateExpression(agg, leftCondAttrs)) + case _ => + CarbonDictionaryTempDecoder(leftCondAttrs, + new util.HashSet[AttributeReferenceWrapper](), + j.left) + } } if (rightCondAttrs.size() > 0 && !rightPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) { - rightPlan = CarbonDictionaryTempDecoder(rightCondAttrs, - new util.HashSet[AttributeReferenceWrapper](), - j.right) + rightPlan = rightPlan match { + case agg: Aggregate => + CarbonDictionaryTempDecoder(rightCondAttrs, + new util.HashSet[AttributeReferenceWrapper](), + transformAggregateExpression(agg, rightCondAttrs)) + case _ => + CarbonDictionaryTempDecoder(rightCondAttrs, + new util.HashSet[AttributeReferenceWrapper](), + j.right) + } } if (!decoder) { decoder = true http://git-wip-us.apache.org/repos/asf/carbondata/blob/e67003cf/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala index fd6f14e..d1a0c90 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala @@ -178,6 +178,46 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { relations.foreach(_.fillAttributeMap(attrMap)) def addTempDecoder(currentPlan: LogicalPlan): LogicalPlan = { + + def transformAggregateExpression(agg: Aggregate, + attrsOnGroup: util.HashSet[AttributeReferenceWrapper] = null): LogicalPlan = { + val attrsOndimAggs = new util.HashSet[AttributeReferenceWrapper] + if (attrsOnGroup != null) { + attrsOndimAggs.addAll(attrsOnGroup) + } + agg.aggregateExpressions.map { + case attr: AttributeReference => + case a@Alias(attr: AttributeReference, name) => + case aggExp: AggregateExpression => + aggExp.transform { + case aggExp: AggregateExpression => + collectDimensionAggregates(aggExp, attrsOndimAggs, aliasMap, attrMap) + aggExp + } + case others => + others.collect { + case attr: AttributeReference + if isDictionaryEncoded(attr, attrMap, aliasMap) => + attrsOndimAggs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))) + } + } + var child = agg.child + // Incase if the child also aggregate then push down decoder to child + if (attrsOndimAggs.size() > 0 && !child.equals(agg)) { + child = CarbonDictionaryTempDecoder(attrsOndimAggs, + new util.HashSet[AttributeReferenceWrapper](), + agg.child) + } + if (!decoder && attrsOnGroup == null) { + decoder = true + CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](), + new util.HashSet[AttributeReferenceWrapper](), + Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child), + isOuter = true) + } else { + Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child) + } + } currentPlan match { case limit@GlobalLimit(_, LocalLimit(_, child: Sort)) => if (!decoder) { @@ -259,39 +299,7 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { Union(children) } case agg: Aggregate if !agg.child.isInstanceOf[CarbonDictionaryTempDecoder] => - val attrsOndimAggs = new util.HashSet[AttributeReferenceWrapper] - agg.aggregateExpressions.map { - case attr: AttributeReference => - case a@Alias(attr: AttributeReference, name) => - case aggExp: AggregateExpression => - aggExp.transform { - case aggExp: AggregateExpression => - collectDimensionAggregates(aggExp, attrsOndimAggs, aliasMap, attrMap) - aggExp - } - case others => - others.collect { - case attr: AttributeReference - if isDictionaryEncoded(attr, attrMap, aliasMap) => - attrsOndimAggs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))) - } - } - var child = agg.child - // Incase if the child also aggregate then push down decoder to child - if (attrsOndimAggs.size() > 0 && !child.equals(agg)) { - child = CarbonDictionaryTempDecoder(attrsOndimAggs, - new util.HashSet[AttributeReferenceWrapper](), - agg.child) - } - if (!decoder) { - decoder = true - CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](), - new util.HashSet[AttributeReferenceWrapper](), - Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child), - isOuter = true) - } else { - Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child) - } + transformAggregateExpression(agg) case expand: Expand if !expand.child.isInstanceOf[CarbonDictionaryTempDecoder] => val attrsOnExpand = new util.HashSet[AttributeReferenceWrapper] expand.projections.map {s => @@ -381,15 +389,29 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { var rightPlan = j.right if (leftCondAttrs.size() > 0 && !leftPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) { - leftPlan = CarbonDictionaryTempDecoder(leftCondAttrs, - new util.HashSet[AttributeReferenceWrapper](), - j.left) + leftPlan = leftPlan match { + case agg: Aggregate => + CarbonDictionaryTempDecoder(leftCondAttrs, + new util.HashSet[AttributeReferenceWrapper](), + transformAggregateExpression(agg, leftCondAttrs)) + case _ => + CarbonDictionaryTempDecoder(leftCondAttrs, + new util.HashSet[AttributeReferenceWrapper](), + j.left) + } } if (rightCondAttrs.size() > 0 && !rightPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) { - rightPlan = CarbonDictionaryTempDecoder(rightCondAttrs, - new util.HashSet[AttributeReferenceWrapper](), - j.right) + rightPlan = rightPlan match { + case agg: Aggregate => + CarbonDictionaryTempDecoder(rightCondAttrs, + new util.HashSet[AttributeReferenceWrapper](), + transformAggregateExpression(agg, rightCondAttrs)) + case _ => + CarbonDictionaryTempDecoder(rightCondAttrs, + new util.HashSet[AttributeReferenceWrapper](), + j.right) + } } Join(leftPlan, rightPlan, j.joinType, j.condition) } else { @@ -503,7 +525,6 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { case others => others } - } val transFormedPlan =