Repository: incubator-carbondata Updated Branches: refs/heads/master ac4575536 -> 47658b17d
fixlatedecoder fix comments Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/a9553e6b Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/a9553e6b Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/a9553e6b Branch: refs/heads/master Commit: a9553e6b8d086b98fab6df7b93a4e78150796fc9 Parents: ac45755 Author: QiangCai <qiang...@qq.com> Authored: Tue Dec 6 17:40:21 2016 +0800 Committer: jackylk <jacky.li...@huawei.com> Committed: Wed Dec 7 15:21:51 2016 +0800 ---------------------------------------------------------------------- examples/spark2/src/main/resources/data.csv | 20 +-- .../carbondata/examples/CarbonExample.scala | 10 +- .../spark/sql/CarbonDictionaryDecoder.scala | 1 - .../execution/CarbonLateDecodeStrategy.scala | 157 ++++++++++--------- .../sql/optimizer/CarbonLateDecodeRule.scala | 128 +++++++++++++-- .../carbondata/CarbonDataSourceSuite.scala | 28 +++- 6 files changed, 236 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a9553e6b/examples/spark2/src/main/resources/data.csv ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/resources/data.csv b/examples/spark2/src/main/resources/data.csv index fcdf3c1..b44672f 100644 --- a/examples/spark2/src/main/resources/data.csv +++ b/examples/spark2/src/main/resources/data.csv @@ -1,10 +1,10 @@ -1,10,100,48.4,spark,2015/4/23 -5,17,140,43.4,spark,2015/7/27 -1,11,100,44.4,flink,2015/5/23 -1,10,150,43.4,spark,2015/7/24 -1,10,100,47.4,spark,2015/7/23 -3,14,160,43.4,hive,2015/7/26 -2,10,100,43.4,impala,2015/7/23 -1,10,100,43.4,spark,2015/5/23 -4,16,130,42.4,impala,2015/7/23 -1,10,100,43.4,spark,2015/7/23 \ No newline at end of file +1,10,100,48.4,spark,2015/4/23,1.23 +5,17,140,43.4,spark,2015/7/27,3.45 +1,11,100,44.4,flink,2015/5/23,23.23 +1,10,150,43.4,spark,2015/7/24,254.12 +1,10,100,47.4,spark,2015/7/23,876.14 +3,14,160,43.4,hive,2015/7/26,3454.32 +2,10,100,43.4,impala,2015/7/23,456.98 +1,10,100,43.4,spark,2015/5/23,32.53 +4,16,130,42.4,impala,2015/7/23,67.23 +1,10,100,43.4,spark,2015/7/23,832.23 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a9553e6b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala index 59cc4e9..17674ef 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala @@ -68,8 +68,8 @@ object CarbonExample { | bigintField long, | doubleField double, | stringField string, - | timestampField timestamp - | ) + | timestampField timestamp, + | decimalField decimal(18,2)) | USING org.apache.spark.sql.CarbonSource """.stripMargin) @@ -86,7 +86,8 @@ object CarbonExample { | bigintField long, | doubleField double, | stringField string, - | timestampField string) + | timestampField string, + | decimalField decimal(18,2)) | ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' """.stripMargin) @@ -105,13 +106,14 @@ object CarbonExample { s""" | INSERT INTO TABLE carbon_table | SELECT shortField, intField, bigintField, doubleField, stringField, - | from_unixtime(unix_timestamp(timestampField,'yyyy/M/dd')) timestampField + | from_unixtime(unix_timestamp(timestampField,'yyyy/M/dd')) timestampField, decimalField | FROM csv_table """.stripMargin) spark.sql(""" SELECT * FROM carbon_table + where stringfield = 'spark' and decimalField > 40 """).show spark.sql(""" http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a9553e6b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala index c7ca61d..db864c7 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala @@ -320,7 +320,6 @@ class CarbonDecoderRDD(relations: Seq[CarbonDecoderRelation], new Iterator[Row] { var flag = true var total = 0L - override final def hasNext: Boolean = iter.hasNext override final def next(): Row = { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a9553e6b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala index c73fde6..57b2139 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala @@ -21,8 +21,6 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.rdd.RDD import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.CatalystTypeConverters -import org.apache.spark.sql.catalyst.CatalystTypeConverters._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions.{Attribute, _} @@ -31,14 +29,10 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.optimizer.CarbonDecoderRelation import org.apache.spark.sql.sources.{BaseRelation, Filter} -import org.apache.spark.sql.types.{IntegerType, StringType} -import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.sql.types.IntegerType import org.apache.carbondata.spark.CarbonAliasDecoderRelation - - - /** * Carbon strategy for late decode (convert dictionary key to value as late as possible), which * can improve the aggregation performance and reduce memory usage @@ -92,10 +86,10 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { relation: LogicalRelation, output: Seq[Attribute], rdd: RDD[Row], - needoDecode: ArrayBuffer[AttributeReference]): + needDecode: ArrayBuffer[AttributeReference]): RDD[InternalRow] = { - val newRdd = if (needoDecode.size > 0) { - getDecoderRDD(relation, needoDecode, rdd, output) + val newRdd = if (needDecode.size > 0) { + getDecoderRDD(relation, needDecode, rdd, output) } else { rdd } @@ -310,74 +304,89 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { */ protected[sql] def translateFilter(predicate: Expression): Option[Filter] = { predicate match { - case expressions.EqualTo(a: Attribute, Literal(v, t)) => - Some(sources.EqualTo(a.name, convertToScala(v, t))) - case expressions.EqualTo(Literal(v, t), a: Attribute) => - Some(sources.EqualTo(a.name, convertToScala(v, t))) - - case expressions.EqualNullSafe(a: Attribute, Literal(v, t)) => - Some(sources.EqualNullSafe(a.name, convertToScala(v, t))) - case expressions.EqualNullSafe(Literal(v, t), a: Attribute) => - Some(sources.EqualNullSafe(a.name, convertToScala(v, t))) - - case expressions.GreaterThan(a: Attribute, Literal(v, t)) => - Some(sources.GreaterThan(a.name, convertToScala(v, t))) - case expressions.GreaterThan(Literal(v, t), a: Attribute) => - Some(sources.LessThan(a.name, convertToScala(v, t))) - - case expressions.LessThan(a: Attribute, Literal(v, t)) => - Some(sources.LessThan(a.name, convertToScala(v, t))) - case expressions.LessThan(Literal(v, t), a: Attribute) => - Some(sources.GreaterThan(a.name, convertToScala(v, t))) - - case expressions.GreaterThanOrEqual(a: Attribute, Literal(v, t)) => - Some(sources.GreaterThanOrEqual(a.name, convertToScala(v, t))) - case expressions.GreaterThanOrEqual(Literal(v, t), a: Attribute) => - Some(sources.LessThanOrEqual(a.name, convertToScala(v, t))) - - case expressions.LessThanOrEqual(a: Attribute, Literal(v, t)) => - Some(sources.LessThanOrEqual(a.name, convertToScala(v, t))) - case expressions.LessThanOrEqual(Literal(v, t), a: Attribute) => - Some(sources.GreaterThanOrEqual(a.name, convertToScala(v, t))) - - case expressions.InSet(a: Attribute, set) => - val toScala = CatalystTypeConverters.createToScalaConverter(a.dataType) - Some(sources.In(a.name, set.toArray.map(toScala))) - - // Because we only convert In to InSet in Optimizer when there are more than certain - // items. So it is possible we still get an In expression here that needs to be pushed - // down. - case expressions.In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) => - val hSet = list.map(e => e.eval(EmptyRow)) - val toScala = CatalystTypeConverters.createToScalaConverter(a.dataType) - Some(sources.In(a.name, hSet.toArray.map(toScala))) + case or@ Or(left, right) => - case expressions.IsNull(a: Attribute) => - Some(sources.IsNull(a.name)) - case expressions.IsNotNull(a: Attribute) => - Some(sources.IsNotNull(a.name)) + val leftFilter = translateFilter(left) + val rightFilter = translateFilter(right) + if (leftFilter.isDefined && rightFilter.isDefined) { + Some( sources.Or(leftFilter.get, rightFilter.get)) + } else { + None + } - case expressions.And(left, right) => + case And(left, right) => (translateFilter(left) ++ translateFilter(right)).reduceOption(sources.And) - case expressions.Or(left, right) => - for { - leftFilter <- translateFilter(left) - rightFilter <- translateFilter(right) - } yield sources.Or(leftFilter, rightFilter) - - case expressions.Not(child) => - translateFilter(child).map(sources.Not) - - case expressions.StartsWith(a: Attribute, Literal(v: UTF8String, StringType)) => - Some(sources.StringStartsWith(a.name, v.toString)) - - case expressions.EndsWith(a: Attribute, Literal(v: UTF8String, StringType)) => - Some(sources.StringEndsWith(a.name, v.toString)) - - case expressions.Contains(a: Attribute, Literal(v: UTF8String, StringType)) => - Some(sources.StringContains(a.name, v.toString)) - case _ => None + case EqualTo(a: Attribute, Literal(v, t)) => + Some(sources.EqualTo(a.name, v)) + case EqualTo(l@Literal(v, t), a: Attribute) => + Some(sources.EqualTo(a.name, v)) + case EqualTo(Cast(a: Attribute, _), Literal(v, t)) => + Some(sources.EqualTo(a.name, v)) + case EqualTo(Literal(v, t), Cast(a: Attribute, _)) => + Some(sources.EqualTo(a.name, v)) + + case Not(EqualTo(a: Attribute, Literal(v, t))) => new + Some(sources.Not(sources.EqualTo(a.name, v))) + case Not(EqualTo(Literal(v, t), a: Attribute)) => new + Some(sources.Not(sources.EqualTo(a.name, v))) + case Not(EqualTo(Cast(a: Attribute, _), Literal(v, t))) => new + Some(sources.Not(sources.EqualTo(a.name, v))) + case Not(EqualTo(Literal(v, t), Cast(a: Attribute, _))) => new + Some(sources.Not(sources.EqualTo(a.name, v))) + case IsNotNull(a: Attribute) => Some(sources.IsNotNull(a.name)) + case IsNull(a: Attribute) => Some(sources.IsNull(a.name)) + case Not(In(a: Attribute, list)) if !list.exists(!_.isInstanceOf[Literal]) => + val hSet = list.map(e => e.eval(EmptyRow)) + Some(sources.Not(sources.In(a.name, hSet.toArray))) + case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) => + val hSet = list.map(e => e.eval(EmptyRow)) + Some(sources.In(a.name, hSet.toArray)) + case Not(In(Cast(a: Attribute, _), list)) + if !list.exists(!_.isInstanceOf[Literal]) => + val hSet = list.map(e => e.eval(EmptyRow)) + Some(sources.Not(sources.In(a.name, hSet.toArray))) + case In(Cast(a: Attribute, _), list) if !list.exists(!_.isInstanceOf[Literal]) => + val hSet = list.map(e => e.eval(EmptyRow)) + Some(sources.In(a.name, hSet.toArray)) + + case GreaterThan(a: Attribute, Literal(v, t)) => + Some(sources.GreaterThan(a.name, v)) + case GreaterThan(Literal(v, t), a: Attribute) => + Some(sources.LessThan(a.name, v)) + case GreaterThan(Cast(a: Attribute, _), Literal(v, t)) => + Some(sources.GreaterThan(a.name, v)) + case GreaterThan(Literal(v, t), Cast(a: Attribute, _)) => + Some(sources.LessThan(a.name, v)) + + case LessThan(a: Attribute, Literal(v, t)) => + Some(sources.LessThan(a.name, v)) + case LessThan(Literal(v, t), a: Attribute) => + Some(sources.GreaterThan(a.name, v)) + case LessThan(Cast(a: Attribute, _), Literal(v, t)) => + Some(sources.LessThan(a.name, v)) + case LessThan(Literal(v, t), Cast(a: Attribute, _)) => + Some(sources.GreaterThan(a.name, v)) + + case GreaterThanOrEqual(a: Attribute, Literal(v, t)) => + Some(sources.GreaterThanOrEqual(a.name, v)) + case GreaterThanOrEqual(Literal(v, t), a: Attribute) => + Some(sources.LessThanOrEqual(a.name, v)) + case GreaterThanOrEqual(Cast(a: Attribute, _), Literal(v, t)) => + Some(sources.GreaterThanOrEqual(a.name, v)) + case GreaterThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) => + Some(sources.LessThanOrEqual(a.name, v)) + + case LessThanOrEqual(a: Attribute, Literal(v, t)) => + Some(sources.LessThanOrEqual(a.name, v)) + case LessThanOrEqual(Literal(v, t), a: Attribute) => + Some(sources.GreaterThanOrEqual(a.name, v)) + case LessThanOrEqual(Cast(a: Attribute, _), Literal(v, t)) => + Some(sources.LessThanOrEqual(a.name, v)) + case LessThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) => + Some(sources.GreaterThanOrEqual(a.name, v)) + + case others => None } } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a9553e6b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala index 6b6960d..fb9df70 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala @@ -162,7 +162,38 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { } else { Sort(sort.order, sort.global, child) } - + case union: Union + if !(union.children(0).isInstanceOf[CarbonDictionaryTempDecoder] || + union.children(1).isInstanceOf[CarbonDictionaryTempDecoder]) => + val leftCondAttrs = new util.HashSet[AttributeReferenceWrapper] + val rightCondAttrs = new util.HashSet[AttributeReferenceWrapper] + union.children(0).output.foreach(attr => + leftCondAttrs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))) + union.children(1).output.foreach(attr => + rightCondAttrs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))) + var leftPlan = union.children(0) + var rightPlan = union.children(1) + if (hasCarbonRelation(leftPlan) && leftCondAttrs.size() > 0 && + !leftPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) { + leftPlan = CarbonDictionaryTempDecoder(leftCondAttrs, + new util.HashSet[AttributeReferenceWrapper](), + union.children(0)) + } + if (hasCarbonRelation(rightPlan) && rightCondAttrs.size() > 0 && + !rightPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) { + rightPlan = CarbonDictionaryTempDecoder(rightCondAttrs, + new util.HashSet[AttributeReferenceWrapper](), + union.children(1)) + } + if (!decoder) { + decoder = true + CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](), + new util.HashSet[AttributeReferenceWrapper](), + Union(leftPlan, rightPlan), + isOuter = true) + } else { + Union(leftPlan, rightPlan) + } case agg: Aggregate if !agg.child.isInstanceOf[CarbonDictionaryTempDecoder] => val attrsOndimAggs = new util.HashSet[AttributeReferenceWrapper] agg.aggregateExpressions.map { @@ -456,39 +487,71 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { case cd: CarbonDictionaryCatalystDecoder => cd case sort: Sort => + val tmpAttrMap = new mutable.HashMap[AttributeReferenceWrapper, Attribute]() + if (sort.child.isInstanceOf[CarbonDictionaryTempDecoder]) { + val tempDecoder = sort.child.asInstanceOf[CarbonDictionaryTempDecoder] + tempDecoder.attrList.asScala.foreach{attr => tmpAttrMap.put(attr, attr.attr)} + } val sortExprs = sort.order.map { s => s.transform { case attr: AttributeReference => - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) + val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr)) + if(tempAttr.isDefined) { + tempAttr.get + } else { + updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) + } }.asInstanceOf[SortOrder] } Sort(sortExprs, sort.global, sort.child) case agg: Aggregate if !agg.child.isInstanceOf[CarbonDictionaryCatalystDecoder] => + val tmpAttrMap = new mutable.HashMap[AttributeReferenceWrapper, Attribute]() + if (agg.child.isInstanceOf[CarbonDictionaryTempDecoder]) { + val tempDecoder = agg.child.asInstanceOf[CarbonDictionaryTempDecoder] + tempDecoder.attrList.asScala.foreach{attr => tmpAttrMap.put(attr, attr.attr)} + } + val aggExps = agg.aggregateExpressions.map { aggExp => aggExp.transform { case attr: AttributeReference => - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) + val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr)) + if(tempAttr.isDefined) { + tempAttr.get + } else { + updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) + } } }.asInstanceOf[Seq[NamedExpression]] val grpExps = agg.groupingExpressions.map { gexp => gexp.transform { case attr: AttributeReference => - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) + val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr)) + if(tempAttr.isDefined) { + tempAttr.get + } else { + updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) + } } } Aggregate(grpExps, aggExps, agg.child) case expand: Expand => + val tmpAttrMap = new mutable.HashMap[AttributeReferenceWrapper, Attribute]() + if (expand.child.isInstanceOf[CarbonDictionaryTempDecoder]) { + val tempDecoder = expand.child.asInstanceOf[CarbonDictionaryTempDecoder] + tempDecoder.attrList.asScala.foreach{attr => tmpAttrMap.put(attr, attr.attr)} + } expand.transformExpressions { case attr: AttributeReference => - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) + val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr)) + if(tempAttr.isDefined) { + tempAttr.get + } else { + updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) + } } case filter: Filter => - val filterExps = filter.condition transform { - case attr: AttributeReference => - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) - } - Filter(filterExps, filter.child) + filter case j: Join => marker.pushBinaryMarker(allAttrsNotDecode) j @@ -496,36 +559,71 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { marker.pushBinaryMarker(allAttrsNotDecode) u case p: Project if relations.nonEmpty => + val tmpAttrMap = new mutable.HashMap[AttributeReferenceWrapper, Attribute]() + if (p.child.isInstanceOf[CarbonDictionaryTempDecoder]) { + val tempDecoder = p.child.asInstanceOf[CarbonDictionaryTempDecoder] + tempDecoder.attrList.asScala.foreach{attr => tmpAttrMap.put(attr, attr.attr)} + } val prExps = p.projectList.map { prExp => prExp.transform { case attr: AttributeReference => - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) + val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr)) + if(tempAttr.isDefined) { + tempAttr.get + } else { + updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) + } } }.asInstanceOf[Seq[NamedExpression]] Project(prExps, p.child) case wd: Window if relations.nonEmpty => + val tmpAttrMap = new mutable.HashMap[AttributeReferenceWrapper, Attribute]() + if (wd.child.isInstanceOf[CarbonDictionaryTempDecoder]) { + val tempDecoder = wd.child.asInstanceOf[CarbonDictionaryTempDecoder] + tempDecoder.attrList.asScala.foreach{attr => tmpAttrMap.put(attr, attr.attr)} + } val prExps = wd.output.map { prExp => prExp.transform { case attr: AttributeReference => - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) + val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr)) + if(tempAttr.isDefined) { + tempAttr.get + } else { + updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) + } } }.asInstanceOf[Seq[Attribute]] val wdExps = wd.windowExpressions.map { gexp => gexp.transform { case attr: AttributeReference => - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) + val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr)) + if(tempAttr.isDefined) { + tempAttr.get + } else { + updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) + } } }.asInstanceOf[Seq[NamedExpression]] val partitionSpec = wd.partitionSpec.map{ exp => exp.transform { case attr: AttributeReference => - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) + val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr)) + if(tempAttr.isDefined) { + tempAttr.get + } else { + updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) + } } } val orderSpec = wd.orderSpec.map { exp => exp.transform { case attr: AttributeReference => - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) + val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr)) + if(tempAttr.isDefined) { + tempAttr.get + } else { + updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) + } } }.asInstanceOf[Seq[SortOrder]] Window(wdExps, partitionSpec, orderSpec, wd.child) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a9553e6b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala index aaa0a20..13c0257 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala @@ -26,7 +26,7 @@ class CarbonDataSourceSuite extends FunSuite with BeforeAndAfterAll { override def beforeAll(): Unit = { spark = SparkSession .builder() - .master("local[4]") + .master("local") .appName("CarbonExample") .enableHiveSupport() .config(CarbonCommonConstants.STORE_LOCATION, @@ -48,10 +48,23 @@ class CarbonDataSourceSuite extends FunSuite with BeforeAndAfterAll { | bigintField long, | doubleField double, | stringField string, - | decimalField decimal(13, 0) - | ) + | decimalField decimal(13, 0), + | timestampField string) | USING org.apache.spark.sql.CarbonSource """.stripMargin) + + spark.sql( + s""" + | CREATE TABLE csv_table + | ( shortField short, + | intField int, + | bigintField long, + | doubleField double, + | stringField string, + | decimalField decimal(13, 0), + | timestampField string) + | ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' + """.stripMargin) } override def afterAll(): Unit = { @@ -64,10 +77,17 @@ class CarbonDataSourceSuite extends FunSuite with BeforeAndAfterAll { spark.sql("select * from carbon_testtable").collect() } - test("agg") { spark.sql("select stringField, sum(intField) , sum(decimalField) " + "from carbon_testtable group by stringField").collect() + + spark.sql( + s""" + | INSERT INTO TABLE carbon_testtable + | SELECT shortField, intField, bigintField, doubleField, stringField, + | decimalField, from_unixtime(unix_timestamp(timestampField,'yyyy/M/dd')) timestampField + | FROM csv_table + """.stripMargin) } }