fixUnionIssue and add test case
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/462f6422 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/462f6422 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/462f6422 Branch: refs/heads/master Commit: 462f64226428fc255938d8752226cda262ad0ae4 Parents: 526243b Author: QiangCai <qiang...@qq.com> Authored: Thu Dec 8 19:06:33 2016 +0800 Committer: jackylk <jacky.li...@huawei.com> Committed: Fri Dec 16 10:13:39 2016 +0800 ---------------------------------------------------------------------- .../impl/DictionaryBasedResultCollector.java | 29 +- .../DictionaryBasedResultCollectorTest.java | 9 +- .../carbondata/examples/CarbonExample.scala | 12 + .../apache/carbondata/spark/CarbonFilters.scala | 7 + .../CarbonDecoderOptimizerHelper.scala | 24 +- .../readsupport/SparkRowReadSupportImpl.java | 5 +- .../apache/carbondata/spark/CarbonFilters.scala | 6 + .../spark/sql/CarbonDataFrameWriter.scala | 3 +- .../sql/CarbonDatasourceHadoopRelation.scala | 4 +- .../spark/sql/CarbonDictionaryDecoder.scala | 60 +- .../scala/org/apache/spark/sql/CarbonScan.scala | 44 +- .../org/apache/spark/sql/CarbonSource.scala | 3 +- .../sql/optimizer/CarbonLateDecodeRule.scala | 124 +- integration/spark2/src/test/resources/data.csv | 11 + .../AllDataTypesTestCaseAggregate.scala | 1161 ++++++++++++++++++ .../spark/sql/common/util/CarbonFunSuite.scala | 49 + .../sql/common/util/CarbonSessionTest.scala | 74 ++ .../apache/spark/sql/common/util/PlanTest.scala | 59 + .../spark/sql/common/util/QueryTest.scala | 149 +++ 19 files changed, 1654 insertions(+), 179 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/core/src/main/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollector.java index 108677f..2462caa 100644 --- a/core/src/main/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollector.java +++ b/core/src/main/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollector.java @@ -20,11 +20,13 @@ package org.apache.carbondata.scan.collector.impl; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.carbon.metadata.encoder.Encoding; import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; import org.apache.carbondata.core.util.CarbonUtil; @@ -35,6 +37,7 @@ import org.apache.carbondata.scan.model.QueryDimension; import org.apache.carbondata.scan.model.QueryMeasure; import org.apache.carbondata.scan.result.AbstractScannedResult; +import org.apache.commons.lang3.ArrayUtils; /** * It is not a collector it is just a scanned result holder. */ @@ -52,9 +55,31 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect * it will keep track of how many record is processed, to handle limit scenario */ @Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) { + List<Object[]> listBasedResult = new ArrayList<>(batchSize); boolean isMsrsPresent = measureDatatypes.length > 0; + QueryDimension[] queryDimensions = tableBlockExecutionInfos.getQueryDimensions(); + List<Integer> dictionaryIndexes = new ArrayList<Integer>(); + for (int i = 0; i < queryDimensions.length; i++) { + if(queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY) || + queryDimensions[i].getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY) ) { + dictionaryIndexes.add(queryDimensions[i].getDimension().getOrdinal()); + } + } + int[] primitive = ArrayUtils.toPrimitive(dictionaryIndexes.toArray( + new Integer[dictionaryIndexes.size()])); + Arrays.sort(primitive); + int[] actualIndexInSurrogateKey = new int[dictionaryIndexes.size()]; + int index = 0; + for (int i = 0; i < queryDimensions.length; i++) { + if(queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY) || + queryDimensions[i].getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY) ) { + actualIndexInSurrogateKey[index++] = Arrays.binarySearch(primitive, + queryDimensions[i].getDimension().getOrdinal()); + } + } + QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getQueryMeasures(); Map<Integer, GenericQueryType> comlexDimensionInfoMap = tableBlockExecutionInfos.getComlexDimensionInfoMap(); @@ -99,7 +124,7 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect .getDirectDictionaryGenerator(queryDimensions[i].getDimension().getDataType()); if (directDictionaryGenerator != null) { row[order[i]] = directDictionaryGenerator.getValueFromSurrogate( - surrogateResult[dictionaryColumnIndex++]); + surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]]); } } else if (complexDataTypeArray[i]) { row[order[i]] = comlexDimensionInfoMap @@ -107,7 +132,7 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect .getDataBasedOnDataTypeFromSurrogates( ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++])); } else { - row[order[i]] = surrogateResult[dictionaryColumnIndex++]; + row[order[i]] = surrogateResult[actualIndexInSurrogateKey[dictionaryColumnIndex++]]; } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/core/src/test/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollectorTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollectorTest.java b/core/src/test/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollectorTest.java index 7f21d90..48eab1e 100644 --- a/core/src/test/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollectorTest.java +++ b/core/src/test/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedResultCollectorTest.java @@ -19,14 +19,12 @@ package org.apache.carbondata.scan.collector.impl; import java.nio.ByteBuffer; -import java.util.BitSet; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import org.apache.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; import org.apache.carbondata.core.carbon.metadata.blocklet.datachunk.PresenceMeta; import org.apache.carbondata.core.carbon.metadata.datatype.DataType; +import org.apache.carbondata.core.carbon.metadata.encoder.Encoding; import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.constants.CarbonCommonConstants; @@ -74,6 +72,9 @@ public class DictionaryBasedResultCollectorTest { QueryDimension queryDimension1 = new QueryDimension("QDCol1"); queryDimension1.setQueryOrder(1); ColumnSchema columnSchema = new ColumnSchema(); + List encodeList= new ArrayList<Encoding>(); + encodeList.add(Encoding.DICTIONARY); + columnSchema.setEncodingList(encodeList); queryDimension1.setDimension(new CarbonDimension(columnSchema, 0, 0, 0, 0)); QueryDimension queryDimension2 = new QueryDimension("QDCol2"); queryDimension2.setQueryOrder(2); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala index 4aff45a..c2e135a 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala @@ -153,6 +153,18 @@ object CarbonExample { |where t1.stringField = t2.stringField """.stripMargin).show + spark.sql( + """ + |with t1 as ( + |select * from carbon_table + |union all + |select * from carbon_table + |) + |select t1.*, t2.* + |from t1, carbon_table t2 + |where t1.stringField = t2.stringField + """.stripMargin).show + // Drop table spark.sql("DROP TABLE IF EXISTS carbon_table") spark.sql("DROP TABLE IF EXISTS csv_table") http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala index 2a580dc..5e58235 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala @@ -83,6 +83,13 @@ object CarbonFilters { new ListExpression( convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList)))) + case sources.IsNull(name) => + Some(new EqualToExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, null), true)) + case sources.IsNotNull(name) => + Some(new NotEqualsExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, null), true)) + case sources.And(lhs, rhs) => (createFilter(lhs) ++ createFilter(rhs)).reduceOption(new AndExpression(_, _)) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala index 7909a13..6e84e7e 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala @@ -29,7 +29,7 @@ abstract class AbstractNode case class Node(cd: CarbonDictionaryTempDecoder) extends AbstractNode -case class BinaryCarbonNode(left: util.List[AbstractNode], right: util.List[AbstractNode]) +case class ArrayCarbonNode(children: Seq[util.List[AbstractNode]]) extends AbstractNode case class CarbonDictionaryTempDecoder( @@ -70,9 +70,16 @@ class CarbonDecoderProcessor { case j: BinaryNode => val leftList = new util.ArrayList[AbstractNode] val rightList = new util.ArrayList[AbstractNode] - nodeList.add(BinaryCarbonNode(leftList, rightList)) + nodeList.add(ArrayCarbonNode(Seq(leftList, rightList))) process(j.left, leftList) process(j.right, rightList) + case u: Union => + val nodeListSeq = u.children.map { child => + val list = new util.ArrayList[AbstractNode] + process(child, list) + list + } + nodeList.add(ArrayCarbonNode(nodeListSeq)) case e: UnaryNode => process(e.child, nodeList) case _ => } @@ -91,13 +98,12 @@ class CarbonDecoderProcessor { decoderNotDecode.asScala.foreach(cd.attrsNotDecode.add) decoderNotDecode.asScala.foreach(cd.attrList.remove) decoderNotDecode.addAll(cd.attrList) - case BinaryCarbonNode(left: util.List[AbstractNode], right: util.List[AbstractNode]) => - val leftNotDecode = new util.HashSet[AttributeReferenceWrapper] - val rightNotDecode = new util.HashSet[AttributeReferenceWrapper] - updateDecoderInternal(left.asScala, leftNotDecode) - updateDecoderInternal(right.asScala, rightNotDecode) - decoderNotDecode.addAll(leftNotDecode) - decoderNotDecode.addAll(rightNotDecode) + case ArrayCarbonNode(children) => + children.foreach { child => + val notDecode = new util.HashSet[AttributeReferenceWrapper] + updateDecoderInternal(child.asScala, notDecode) + decoderNotDecode.addAll(notDecode) + } } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java index 1bfcdea..499ef0c 100644 --- a/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java +++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java @@ -39,7 +39,10 @@ public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSuppor @Override public Row readRow(Object[] data) { for (int i = 0; i < dictionaries.length; i++) { - if (dictionaries[i] == null && data[i] != null) { + if (data[i] == null) { + continue; + } + if (dictionaries[i] == null) { if (carbonColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) { //convert the long to timestamp in case of direct dictionary column if (DataType.TIMESTAMP == carbonColumns[i].getDataType()) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala index 2cd4eb7..6d9fb24 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala @@ -83,6 +83,12 @@ object CarbonFilters { new ListExpression( convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList)))) + case sources.IsNull(name) => + Some(new EqualToExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, null), true)) + case sources.IsNotNull(name) => + Some(new NotEqualsExpression(getCarbonExpression(name), + getCarbonLiteralExpression(name, null), true)) case sources.And(lhs, rhs) => (createFilter(lhs) ++ createFilter(rhs)).reduceOption(new AndExpression(_, _)) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala index 5db5d14..3057bee 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala @@ -146,7 +146,8 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) { s""" CREATE TABLE IF NOT EXISTS ${options.dbName}.${options.tableName} (${ carbonSchema.mkString(", ") }) - using 'org.apache.spark.sql.CarbonRelationProvider' + using org.apache.spark.sql.CarbonSource + OPTIONS('dbName'='${options.dbName}', 'tableName'='${options.tableName}') """ } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala index 997106c..09a58ba 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala @@ -21,9 +21,9 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.Job import org.apache.spark.rdd.RDD -import org.apache.spark.sql.execution.command.{ExecutedCommandExec, LoadTableByInsert} +import org.apache.spark.sql.execution.command.LoadTableByInsert import org.apache.spark.sql.hive.CarbonRelation -import org.apache.spark.sql.sources.{BaseRelation, Filter, InsertableRelation, PrunedFilteredScan} +import org.apache.spark.sql.sources.{BaseRelation, Filter, InsertableRelation} import org.apache.spark.sql.types.StructType import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala index 3b63021..9a625ee 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala @@ -303,41 +303,41 @@ class CarbonDecoderRDD( (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier) }.toMap - val cacheProvider: CacheProvider = CacheProvider.getInstance - val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] = - cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storepath) - val dicts: Seq[Dictionary] = getDictionary(absoluteTableIdentifiers, - forwardDictionaryCache) - val dictIndex = dicts.zipWithIndex.filter(x => x._1 != null).map(x => x._2) - // add a task completion listener to clear dictionary that is a decisive factor for - // LRU eviction policy - val dictionaryTaskCleaner = TaskContext.get - dictionaryTaskCleaner.addTaskCompletionListener(context => - dicts.foreach { dictionary => - if (null != dictionary) { - dictionary.clear - } + val cacheProvider: CacheProvider = CacheProvider.getInstance + val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] = + cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storepath) + val dicts: Seq[Dictionary] = getDictionary(absoluteTableIdentifiers, + forwardDictionaryCache) + val dictIndex = dicts.zipWithIndex.filter(x => x._1 != null).map(x => x._2) + // add a task completion listener to clear dictionary that is a decisive factor for + // LRU eviction policy + val dictionaryTaskCleaner = TaskContext.get + dictionaryTaskCleaner.addTaskCompletionListener(context => + dicts.foreach { dictionary => + if (null != dictionary) { + dictionary.clear } - ) - val iter = firstParent[Row].iterator(split, context) - new Iterator[Row] { - var flag = true - var total = 0L - override final def hasNext: Boolean = iter.hasNext + } + ) + val iter = firstParent[Row].iterator(split, context) + new Iterator[Row] { + var flag = true + var total = 0L + override final def hasNext: Boolean = iter.hasNext - override final def next(): Row = { - val startTime = System.currentTimeMillis() - val data = iter.next().asInstanceOf[GenericRow].toSeq.toArray - dictIndex.foreach { index => - if ( data(index) != null) { - data(index) = DataTypeUtil.getDataBasedOnDataType(dicts(index) - .getDictionaryValueForKey(data(index).asInstanceOf[Int]), - getDictionaryColumnIds(index)._3) - } + override final def next(): Row = { + val startTime = System.currentTimeMillis() + val data = iter.next().asInstanceOf[GenericRow].toSeq.toArray + dictIndex.foreach { index => + if ( data(index) != null) { + data(index) = DataTypeUtil.getDataBasedOnDataType(dicts(index) + .getDictionaryValueForKey(data(index).asInstanceOf[Int]), + getDictionaryColumnIds(index)._3) } - new GenericRow(data) } + new GenericRow(data) } + } } private def isRequiredToDecode = { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala index 9e42b44..19c3c9c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonScan.scala @@ -78,39 +78,27 @@ case class CarbonScan( attributesRaw = attributeOut } - val dimensions = carbonTable.getDimensionByTableName(carbonTable.getFactTableName) - val measures = carbonTable.getMeasureByTableName(carbonTable.getFactTableName) - val dimAttr = new Array[Attribute](dimensions.size()) - val msrAttr = new Array[Attribute](measures.size()) + val columns = carbonTable.getCreateOrderColumn(carbonTable.getFactTableName) + val colAttr = new Array[Attribute](columns.size()) attributesRaw.foreach { attr => - val carbonDimension = - carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name) - if(carbonDimension != null) { - dimAttr(dimensions.indexOf(carbonDimension)) = attr - } else { - val carbonMeasure = - carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name) - if(carbonMeasure != null) { - msrAttr(measures.indexOf(carbonMeasure)) = attr - } - } + val column = + carbonTable.getColumnByName(carbonTable.getFactTableName, attr.name) + if(column != null) { + colAttr(columns.indexOf(column)) = attr + } } - - attributesRaw = dimAttr.filter(f => f != null) ++ msrAttr.filter(f => f != null) + attributesRaw = colAttr.filter(f => f != null) var queryOrder: Integer = 0 attributesRaw.foreach { attr => - val carbonDimension = - carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name) - if (carbonDimension != null) { - val dim = new QueryDimension(attr.name) - dim.setQueryOrder(queryOrder) - queryOrder = queryOrder + 1 - selectedDims += dim - } else { - val carbonMeasure = - carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name) - if (carbonMeasure != null) { + val carbonColumn = carbonTable.getColumnByName(carbonTable.getFactTableName, attr.name) + if (carbonColumn != null) { + if (carbonColumn.isDimesion()) { + val dim = new QueryDimension(attr.name) + dim.setQueryOrder(queryOrder) + queryOrder = queryOrder + 1 + selectedDims += dim + } else { val m1 = new QueryMeasure(attr.name) m1.setQueryOrder(queryOrder) queryOrder = queryOrder + 1 http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala index 2ba8a03..b639ea8 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{DecimalType, StructType} import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.spark.CarbonOption /** @@ -54,7 +55,7 @@ class CarbonSource extends CreatableRelationProvider "the path to store carbon file is the 'storePath' specified when creating CarbonContext") val options = new CarbonOption(parameters) - val storePath = sqlContext.sparkSession.conf.get(CarbonCommonConstants.STORE_LOCATION) + val storePath = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION) val tablePath = new Path(storePath + "/" + options.dbName + "/" + options.tableName) val isExists = tablePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) .exists(tablePath) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala index fb9df70..b3a7d5a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala @@ -165,34 +165,27 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { case union: Union if !(union.children(0).isInstanceOf[CarbonDictionaryTempDecoder] || union.children(1).isInstanceOf[CarbonDictionaryTempDecoder]) => - val leftCondAttrs = new util.HashSet[AttributeReferenceWrapper] - val rightCondAttrs = new util.HashSet[AttributeReferenceWrapper] - union.children(0).output.foreach(attr => - leftCondAttrs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))) - union.children(1).output.foreach(attr => - rightCondAttrs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))) - var leftPlan = union.children(0) - var rightPlan = union.children(1) - if (hasCarbonRelation(leftPlan) && leftCondAttrs.size() > 0 && - !leftPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) { - leftPlan = CarbonDictionaryTempDecoder(leftCondAttrs, - new util.HashSet[AttributeReferenceWrapper](), - union.children(0)) - } - if (hasCarbonRelation(rightPlan) && rightCondAttrs.size() > 0 && - !rightPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) { - rightPlan = CarbonDictionaryTempDecoder(rightCondAttrs, - new util.HashSet[AttributeReferenceWrapper](), - union.children(1)) + val children = union.children.map { child => + val condAttrs = new util.HashSet[AttributeReferenceWrapper] + child.output.foreach(attr => + condAttrs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))) + if (hasCarbonRelation(child) && condAttrs.size() > 0 && + !child.isInstanceOf[CarbonDictionaryCatalystDecoder]) { + CarbonDictionaryTempDecoder(condAttrs, + new util.HashSet[AttributeReferenceWrapper](), + union.children(0)) + } else { + child + } } if (!decoder) { decoder = true CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](), new util.HashSet[AttributeReferenceWrapper](), - Union(leftPlan, rightPlan), + Union(children), isOuter = true) } else { - Union(leftPlan, rightPlan) + Union(children) } case agg: Aggregate if !agg.child.isInstanceOf[CarbonDictionaryTempDecoder] => val attrsOndimAggs = new util.HashSet[AttributeReferenceWrapper] @@ -487,68 +480,32 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { case cd: CarbonDictionaryCatalystDecoder => cd case sort: Sort => - val tmpAttrMap = new mutable.HashMap[AttributeReferenceWrapper, Attribute]() - if (sort.child.isInstanceOf[CarbonDictionaryTempDecoder]) { - val tempDecoder = sort.child.asInstanceOf[CarbonDictionaryTempDecoder] - tempDecoder.attrList.asScala.foreach{attr => tmpAttrMap.put(attr, attr.attr)} - } val sortExprs = sort.order.map { s => s.transform { case attr: AttributeReference => - val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr)) - if(tempAttr.isDefined) { - tempAttr.get - } else { - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) - } + updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) }.asInstanceOf[SortOrder] } Sort(sortExprs, sort.global, sort.child) case agg: Aggregate if !agg.child.isInstanceOf[CarbonDictionaryCatalystDecoder] => - val tmpAttrMap = new mutable.HashMap[AttributeReferenceWrapper, Attribute]() - if (agg.child.isInstanceOf[CarbonDictionaryTempDecoder]) { - val tempDecoder = agg.child.asInstanceOf[CarbonDictionaryTempDecoder] - tempDecoder.attrList.asScala.foreach{attr => tmpAttrMap.put(attr, attr.attr)} - } - val aggExps = agg.aggregateExpressions.map { aggExp => aggExp.transform { case attr: AttributeReference => - val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr)) - if(tempAttr.isDefined) { - tempAttr.get - } else { - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) - } + updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) } }.asInstanceOf[Seq[NamedExpression]] val grpExps = agg.groupingExpressions.map { gexp => gexp.transform { case attr: AttributeReference => - val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr)) - if(tempAttr.isDefined) { - tempAttr.get - } else { - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) - } + updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) } } Aggregate(grpExps, aggExps, agg.child) case expand: Expand => - val tmpAttrMap = new mutable.HashMap[AttributeReferenceWrapper, Attribute]() - if (expand.child.isInstanceOf[CarbonDictionaryTempDecoder]) { - val tempDecoder = expand.child.asInstanceOf[CarbonDictionaryTempDecoder] - tempDecoder.attrList.asScala.foreach{attr => tmpAttrMap.put(attr, attr.attr)} - } expand.transformExpressions { case attr: AttributeReference => - val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr)) - if(tempAttr.isDefined) { - tempAttr.get - } else { - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) - } + updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) } case filter: Filter => filter @@ -559,71 +516,36 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { marker.pushBinaryMarker(allAttrsNotDecode) u case p: Project if relations.nonEmpty => - val tmpAttrMap = new mutable.HashMap[AttributeReferenceWrapper, Attribute]() - if (p.child.isInstanceOf[CarbonDictionaryTempDecoder]) { - val tempDecoder = p.child.asInstanceOf[CarbonDictionaryTempDecoder] - tempDecoder.attrList.asScala.foreach{attr => tmpAttrMap.put(attr, attr.attr)} - } val prExps = p.projectList.map { prExp => prExp.transform { case attr: AttributeReference => - val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr)) - if(tempAttr.isDefined) { - tempAttr.get - } else { - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) - } + updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) } }.asInstanceOf[Seq[NamedExpression]] Project(prExps, p.child) case wd: Window if relations.nonEmpty => - val tmpAttrMap = new mutable.HashMap[AttributeReferenceWrapper, Attribute]() - if (wd.child.isInstanceOf[CarbonDictionaryTempDecoder]) { - val tempDecoder = wd.child.asInstanceOf[CarbonDictionaryTempDecoder] - tempDecoder.attrList.asScala.foreach{attr => tmpAttrMap.put(attr, attr.attr)} - } val prExps = wd.output.map { prExp => prExp.transform { case attr: AttributeReference => - val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr)) - if(tempAttr.isDefined) { - tempAttr.get - } else { - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) - } + updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) } }.asInstanceOf[Seq[Attribute]] val wdExps = wd.windowExpressions.map { gexp => gexp.transform { case attr: AttributeReference => - val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr)) - if(tempAttr.isDefined) { - tempAttr.get - } else { - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) - } + updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) } }.asInstanceOf[Seq[NamedExpression]] val partitionSpec = wd.partitionSpec.map{ exp => exp.transform { case attr: AttributeReference => - val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr)) - if(tempAttr.isDefined) { - tempAttr.get - } else { - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) - } + updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) } } val orderSpec = wd.orderSpec.map { exp => exp.transform { case attr: AttributeReference => - val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr)) - if(tempAttr.isDefined) { - tempAttr.get - } else { - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) - } + updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) } }.asInstanceOf[Seq[SortOrder]] Window(wdExps, partitionSpec, orderSpec, wd.child) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/462f6422/integration/spark2/src/test/resources/data.csv ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/resources/data.csv b/integration/spark2/src/test/resources/data.csv new file mode 100644 index 0000000..4ff67da --- /dev/null +++ b/integration/spark2/src/test/resources/data.csv @@ -0,0 +1,11 @@ +empno,empname,designation,doj,workgroupcategory,workgroupcategoryname,deptno,deptname,projectcode,projectjoindate,projectenddate,attendance,utilization,salary +11,arvind,SE,17-01-2007,1,developer,10,network,928478,17-02-2007,29-11-2016,96,96.2,5040.56 +12,krithin,SSE,29-05-2008,1,developer,11,protocol,928378,29-06-2008,30-12-2016,85,95.1,7124.21 +13,madhan,TPL,07-07-2009,2,tester,10,network,928478,07-08-2009,30-12-2016,88,99,9054.235 +14,anandh,SA,29-12-2010,3,manager,11,protocol,928278,29-01-2011,29-06-2016,77,92.2,11248.25 +15,ayushi,SSA,09-11-2011,1,developer,12,security,928375,09-12-2011,29-05-2016,99,91.5,13245.48 +16,pramod,SE,14-10-2012,1,developer,13,configManagement,928478,14-11-2012,29-12-2016,86,93,5040.56 +17,gawrav,PL,22-09-2013,2,tester,12,security,928778,22-10-2013,15-11-2016,78,97.45,9574.24 +18,sibi,TL,15-08-2014,2,tester,14,Learning,928176,15-09-2014,29-05-2016,84,98.23,7245.25 +19,shivani,PL,12-05-2015,1,developer,10,network,928977,12-06-2015,12-11-2016,88,91.678,11254.24 +20,bill,PM,01-12-2015,3,manager,14,Learning,928479,01-01-2016,30-11-2016,75,94.22,13547.25