Fixed order by limit with select * query
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/82b61d47 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/82b61d47 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/82b61d47 Branch: refs/heads/12-dev Commit: 82b61d4799a0e2eea6064d4e997fa1524c7f7b1d Parents: 68cbe15 Author: ravipesala <ravi.pes...@gmail.com> Authored: Tue Apr 4 15:18:46 2017 +0530 Committer: kumarvishal <kumarvishal.1...@gmail.com> Committed: Thu Apr 6 16:28:21 2017 +0530 ---------------------------------------------------------------------- .../sortexpr/AllDataTypesTestCaseSort.scala | 18 +++++++++-- .../spark/sql/CarbonCatalystOperators.scala | 19 +++++++++-- .../spark/sql/CarbonDictionaryDecoder.scala | 33 +++++++++++++++++++- .../execution/CarbonLateDecodeStrategy.scala | 14 ++++++--- .../sql/optimizer/CarbonLateDecodeRule.scala | 27 ++++++++++++++-- 5 files changed, 98 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82b61d47/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortexpr/AllDataTypesTestCaseSort.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortexpr/AllDataTypesTestCaseSort.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortexpr/AllDataTypesTestCaseSort.scala index bdb470a..34d3cee 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortexpr/AllDataTypesTestCaseSort.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortexpr/AllDataTypesTestCaseSort.scala @@ -27,6 +27,8 @@ import org.scalatest.BeforeAndAfterAll class AllDataTypesTestCaseSort extends QueryTest with BeforeAndAfterAll { override def beforeAll { + sql("drop table if exists alldatatypestablesort") + sql("drop table if exists alldatatypestablesort_hive") sql("CREATE TABLE alldatatypestablesort (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) STORED BY 'org.apache.carbondata.format'") sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE alldatatypestablesort OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')"""); @@ -41,8 +43,20 @@ class AllDataTypesTestCaseSort extends QueryTest with BeforeAndAfterAll { sql("select empno,empname,utilization,count(salary),sum(empno) from alldatatypestablesort_hive where empname in ('arvind','ayushi') group by empno,empname,utilization order by empno")) } + test("select * from alldatatypestablesort order by empname limit 10") { + sql("select * from alldatatypestablesort order by empname limit 10").collect() + } + + test("select * from alldatatypestablesort order by salary limit 2") { + sql("select * from alldatatypestablesort order by salary limit 2").collect() + } + + test("select * from alldatatypestablesort where empname='arvind' order by salary limit 2") { + sql("select * from alldatatypestablesort where empname='arvind' order by salary limit 2").collect() + } + override def afterAll { - sql("drop table alldatatypestablesort") - sql("drop table alldatatypestablesort_hive") + sql("drop table if exists alldatatypestablesort") + sql("drop table if exists alldatatypestablesort_hive") } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82b61d47/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala index 4070088..9b1533e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, _} +import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hive.{HiveContext, HiveSessionCatalog} import org.apache.spark.sql.optimizer.CarbonDecoderRelation import org.apache.spark.sql.types.{StringType, TimestampType} @@ -33,8 +34,22 @@ case class CarbonDictionaryCatalystDecoder( isOuter: Boolean, child: LogicalPlan) extends UnaryNode { // the output should be updated with converted datatype, it is need for limit+sort plan. - override val output: Seq[Attribute] = - CarbonDictionaryDecoder.convertOutput(child.output, relations, profile, aliasMap) + override def output: Seq[Attribute] = { + child match { + case l: LogicalRelation => + // If the child is logical plan then firts update all dictionary attr with IntegerType + val logicalOut = + CarbonDictionaryDecoder.updateAttributes(child.output, relations, aliasMap) + CarbonDictionaryDecoder.convertOutput(logicalOut, relations, profile, aliasMap) + case Filter(cond, l: LogicalRelation) => + // If the child is logical plan then firts update all dictionary attr with IntegerType + val logicalOut = + CarbonDictionaryDecoder.updateAttributes(child.output, relations, aliasMap) + CarbonDictionaryDecoder.convertOutput(logicalOut, relations, profile, aliasMap) + case _ => CarbonDictionaryDecoder.convertOutput(child.output, relations, profile, aliasMap) + } + } + } abstract class CarbonProfile(attributes: Seq[Attribute]) extends Serializable { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82b61d47/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala index 543da6f..d450b69 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala @@ -347,11 +347,42 @@ object CarbonDictionaryDecoder { } /** + * Updates all dictionary attributes with integer datatype. + */ + def updateAttributes(output: Seq[Attribute], + relations: Seq[CarbonDecoderRelation], + aliasMap: CarbonAliasDecoderRelation): Seq[Attribute] = { + output.map { a => + val attr = aliasMap.getOrElse(a, a) + val relation = relations.find(p => p.contains(attr)) + if (relation.isDefined) { + val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable + val carbonDimension = carbonTable + .getDimensionByName(carbonTable.getFactTableName, attr.name) + if (carbonDimension != null && + carbonDimension.hasEncoding(Encoding.DICTIONARY) && + !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) && + !carbonDimension.isComplex()) { + val newAttr = AttributeReference(a.name, + IntegerType, + a.nullable, + a.metadata)(a.exprId).asInstanceOf[Attribute] + newAttr + } else { + a + } + } else { + a + } + } + } + + /** * Whether the attributed requires to decode or not based on the profile. */ def canBeDecoded(attr: Attribute, profile: CarbonProfile): Boolean = { profile match { - case ip: IncludeProfile if ip.attributes.nonEmpty => + case ip: IncludeProfile => ip.attributes .exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId) case ep: ExcludeProfile => http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82b61d47/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala index 16e8a99..ed5d362 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala @@ -61,11 +61,15 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { a.map(_.name).toArray, f), needDecoder)) :: Nil case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) => - CarbonDictionaryDecoder(relations, - profile, - aliasMap, - planLater(child) - ) :: Nil + if (profile.isInstanceOf[IncludeProfile] && profile.isEmpty) { + planLater(child) :: Nil + } else { + CarbonDictionaryDecoder(relations, + profile, + aliasMap, + planLater(child) + ) :: Nil + } case _ => Nil } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/82b61d47/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala index 36478b4..181328d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala @@ -168,7 +168,21 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { if (attrsOnSort.size() > 0 && !child.isInstanceOf[Sort]) { child = CarbonDictionaryTempDecoder(attrsOnSort, new util.HashSet[AttributeReferenceWrapper](), sort.child) + } else { + // In case of select * from query it gets logical relation and there is no way + // to convert the datatypes of attributes, so just add this dummy decoder to convert + // to dictionary datatypes. + child match { + case l: LogicalRelation => + child = CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](), + new util.HashSet[AttributeReferenceWrapper](), sort.child) + case Filter(cond, l: LogicalRelation) => + child = CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](), + new util.HashSet[AttributeReferenceWrapper](), sort.child) + case _ => + } } + if (!decoder) { decoder = true CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](), @@ -609,9 +623,16 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { } // Remove unnecessary decoders val finalPlan = transFormedPlan transform { - case CarbonDictionaryCatalystDecoder(_, profile, _, false, child) - if profile.isInstanceOf[IncludeProfile] && profile.isEmpty => - child + case cd@ CarbonDictionaryCatalystDecoder(_, profile, _, false, child) => + if (profile.isInstanceOf[IncludeProfile] && profile.isEmpty) { + child match { + case l: LogicalRelation => cd + case Filter(condition, l: LogicalRelation) => cd + case _ => child + } + } else { + cd + } } finalPlan }