http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala index 600519f..4c7c460 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors.attachTree import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.{SparkPlan, UnaryNode} -import org.apache.spark.sql.hive.CarbonMetastoreCatalog +import org.apache.spark.sql.hive.{CarbonMetastoreCatalog, CarbonMetastoreTypes} import org.apache.spark.sql.optimizer.{CarbonAliasDecoderRelation, CarbonDecoderRelation} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -54,20 +54,19 @@ case class CarbonDictionaryDecoder( child.output.map { a => val attr = aliasMap.getOrElse(a, a) val relation = relations.find(p => p.contains(attr)) - if(relation.isDefined) { + if(relation.isDefined && canBeDecoded(attr)) { val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable val carbonDimension = carbonTable .getDimensionByName(carbonTable.getFactTableName, attr.name) if (carbonDimension != null && carbonDimension.hasEncoding(Encoding.DICTIONARY) && - !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) && - canBeDecoded(attr)) { + !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) { val newAttr = AttributeReference(a.name, - convertCarbonToSparkDataType(carbonDimension), + convertCarbonToSparkDataType(carbonDimension, + relation.get.carbonRelation.carbonRelation), a.nullable, a.metadata)(a.exprId, a.qualifiers).asInstanceOf[Attribute] - newAttr.resolved newAttr } else { a @@ -89,15 +88,29 @@ case class CarbonDictionaryDecoder( } } - def convertCarbonToSparkDataType(carbonDimension: CarbonDimension): types.DataType = { + def convertCarbonToSparkDataType(carbonDimension: CarbonDimension, + relation: CarbonRelation): types.DataType = { carbonDimension.getDataType match { case DataType.STRING => StringType case DataType.INT => IntegerType case DataType.LONG => LongType case DataType.DOUBLE => DoubleType case DataType.BOOLEAN => BooleanType - case DataType.DECIMAL => DecimalType.DoubleDecimal + case DataType.DECIMAL => + val scale: Int = carbonDimension.getColumnSchema.getScale + val precision: Int = carbonDimension.getColumnSchema.getPrecision + if (scale > 0 && precision > 0) { + DecimalType(scale, precision) + } else { + DecimalType(18, 2) + } case DataType.TIMESTAMP => TimestampType + case DataType.STRUCT => + CarbonMetastoreTypes + .toDataType(s"struct<${ relation.getStructChildren(carbonDimension.getColName) }>") + case DataType.ARRAY => + CarbonMetastoreTypes + .toDataType(s"array<${ relation.getArrayChildren(carbonDimension.getColName) }>") } }
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala index cb20246..ba4c37e 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala @@ -26,24 +26,20 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.execution.LeafNode import org.apache.spark.sql.hive.CarbonMetastoreCatalog -import org.apache.spark.sql.types.{DataType, Decimal} import org.apache.spark.unsafe.types.UTF8String import org.carbondata.core.constants.CarbonCommonConstants import org.carbondata.core.util.CarbonProperties import org.carbondata.query.carbon.model._ -import org.carbondata.spark.{CarbonFilters, RawKey, RawKeyImpl} +import org.carbondata.spark.{CarbonFilters, RawValue, RawValueImpl} import org.carbondata.spark.rdd.CarbonScanRDD case class CarbonScan( var attributesRaw: Seq[Attribute], relationRaw: CarbonRelation, - dimensionPredicatesRaw: Seq[Expression], - aggExprsRaw: Option[Seq[Expression]], - useBinaryAggregator: Boolean)(@transient val ocRaw: SQLContext) extends LeafNode { + dimensionPredicatesRaw: Seq[Expression])(@transient val ocRaw: SQLContext) extends LeafNode { val carbonTable = relationRaw.metaData.carbonTable val selectedDims = scala.collection.mutable.MutableList[QueryDimension]() val selectedMsrs = scala.collection.mutable.MutableList[QueryMeasure]() @@ -95,24 +91,6 @@ case class CarbonScan( } } } - // Just find out that any aggregation functions are present on dimensions. - aggExprsRaw match { - case Some(aggExprs) => - aggExprs.foreach { - case Alias(agg: AggregateExpression, name) => - agg.collect { - case attr: AttributeReference => - val dims = selectedDims.filter(m => m.getColumnName.equalsIgnoreCase(attr.name)) - if(dims.nonEmpty) { - plan.addAggDimAggInfo(dims.head.getColumnName, - dims.head.getAggregateFunction, - dims.head.getQueryOrder) - } - } - case _ => - } - case _ => - } // Fill the selected dimensions & measures obtained from // attributes to query plan for detailed query @@ -179,14 +157,13 @@ case class CarbonScan( } - def inputRdd: CarbonScanRDD[Array[Any], Any] = { + def inputRdd: CarbonScanRDD[Array[Any]] = { val conf = new Configuration() val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier - buildCarbonPlan.getDimAggregatorInfos.clear() val model = QueryModel.createModel( absoluteTableIdentifier, buildCarbonPlan, carbonTable) - val kv: RawKey[Array[Any], Any] = new RawKeyImpl() + val kv: RawValue[Array[Any]] = new RawValueImpl // setting queryid buildCarbonPlan.setQueryId(ocRaw.getConf("queryId", System.nanoTime() + "")) @@ -224,9 +201,9 @@ case class CarbonScan( override def next(): InternalRow = if (outUnsafeRows) { - unsafeProjection(new GenericMutableRow(iter.next()._1.map(toType))) + unsafeProjection(new GenericMutableRow(iter.next().map(toType))) } else { - new GenericMutableRow(iter.next()._1.map(toType)) + new GenericMutableRow(iter.next().map(toType)) } } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSQLConf.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSQLConf.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSQLConf.scala index 96a86a7..6ed8c0d 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSQLConf.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSQLConf.scala @@ -20,13 +20,6 @@ package org.apache.spark.sql import org.apache.spark.sql.SQLConf.SQLConfEntry import org.apache.spark.sql.hive.CarbonSQLDialect -object CarbonSQLConf { - - val PUSH_COMPUTATION = SQLConfEntry.booleanConf("spark.sql.carbon.push.computation", - defaultValue = Some(true)) - -} - /** * A trait that enables the setting and getting of mutable config parameters/hints. * @@ -40,8 +33,4 @@ class CarbonSQLConf extends SQLConf { override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false) - import CarbonSQLConf._ - - private[sql] def pushComputation: Boolean = getConf(PUSH_COMPUTATION) - } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala index a72aaa1..9239cec 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala @@ -170,7 +170,7 @@ class CarbonSqlParser() initLexical phrase(start)(new lexical.Scanner(input)) match { case Success(plan, _) => plan match { - case x: LoadCube => + case x: LoadTable => x.inputSqlString = input x case logicalPlan => logicalPlan @@ -969,7 +969,7 @@ class CarbonSqlParser() } val patitionOptionsMap = partionDataOptions.toMap - LoadCube(schema, cubename, filePath, dimFolderPath.getOrElse(Seq()), + LoadTable(schema, cubename, filePath, dimFolderPath.getOrElse(Seq()), patitionOptionsMap, false) } @@ -985,7 +985,7 @@ class CarbonSqlParser() validateOptions(partionDataOptions) } val patitionOptionsMap = partionDataOptions.getOrElse(List.empty[(String, String)]).toMap - LoadCube(schema, cubename, filePath, Seq(), patitionOptionsMap, isOverwrite.isDefined) + LoadTable(schema, cubename, filePath, Seq(), patitionOptionsMap, isOverwrite.isDefined) } private def validateOptions(partionDataOptions: Option[List[(String, String)]]): Unit = { @@ -1079,9 +1079,7 @@ class CarbonSqlParser() opt(";") ^^ { case tabletype ~ exists ~ schemaName ~ resourceName => tabletype match { - case agg ~ table => - DropAggregateTableCommand(exists.isDefined, schemaName, resourceName.toLowerCase()) - case _ => DropCubeCommand(exists.isDefined, schemaName, resourceName.toLowerCase()) + case _ => DropTableCommand(exists.isDefined, schemaName, resourceName.toLowerCase()) } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index fcc2e8a..e36d148 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -1381,7 +1381,7 @@ private[sql] case class DeleteLoadsByLoadDate( } -private[sql] case class LoadCube( +private[sql] case class LoadTable( schemaNameOp: Option[String], tableName: String, factPathFromUser: String, @@ -1693,7 +1693,7 @@ private[sql] case class MergeTable(dbName: String, cubeName: String, tableName: } } -private[sql] case class DropCubeCommand(ifExistsSet: Boolean, schemaNameOp: Option[String], +private[sql] case class DropTableCommand(ifExistsSet: Boolean, schemaNameOp: Option[String], tableName: String) extends RunnableCommand { @@ -1755,7 +1755,7 @@ private[sql] case class DropCubeCommand(ifExistsSet: Boolean, schemaNameOp: Opti Some(relation.cubeMeta.carbonTableIdentifier.getDatabaseName)) )(sqlContext) CarbonDataRDDFactory - .dropCube(sqlContext.sparkContext, dbName, tableName, + .dropTable(sqlContext.sparkContext, dbName, tableName, relation.cubeMeta.partitioner) QueryPartitionHelper.getInstance().removePartition(dbName, tableName) @@ -1789,33 +1789,6 @@ private[sql] case class DropCubeCommand(ifExistsSet: Boolean, schemaNameOp: Opti } } -private[sql] case class DropAggregateTableCommand(ifExistsSet: Boolean, - schemaNameOp: Option[String], - tableName: String) extends RunnableCommand { - - def run(sqlContext: SQLContext): Seq[Row] = { - val dbName = getDB.getDatabaseName(schemaNameOp, sqlContext) - val identifier = TableIdentifier(tableName, Option(dbName)) - val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog - .lookupRelation1(identifier)(sqlContext).asInstanceOf[CarbonRelation] - - if (relation == null) { - if (!ifExistsSet) { - sys.error(s"Aggregate Table $dbName.$tableName does not exist") - } - } - else { - CarbonDataRDDFactory.dropAggregateTable( - sqlContext.sparkContext, - dbName, - tableName, - relation.cubeMeta.partitioner) - } - - Seq.empty - } -} - private[sql] case class ShowLoads( schemaNameOp: Option[String], tableName: String, http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala index 829c487..76edc11 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala @@ -33,7 +33,6 @@ import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, DropTable, import org.apache.spark.sql.optimizer.{CarbonAliasDecoderRelation, CarbonDecoderRelation} import org.carbondata.common.logging.LogServiceFactory -import org.carbondata.core.carbon.metadata.schema.table.CarbonTable import org.carbondata.spark.exception.MalformedCarbonCommandException @@ -64,10 +63,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] { carbonRawScan(projectList, predicates, carbonRelation, - l, - None, - detailQuery = true, - useBinaryAggregation = false)(sqlContext)._1 :: Nil + l)(sqlContext) :: Nil } case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) => CarbonDictionaryDecoder(relations, @@ -85,10 +81,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] { private def carbonRawScan(projectList: Seq[NamedExpression], predicates: Seq[Expression], relation: CarbonDatasourceRelation, - logicalRelation: LogicalRelation, - groupExprs: Option[Seq[Expression]], - detailQuery: Boolean, - useBinaryAggregation: Boolean)(sc: SQLContext): (SparkPlan, Boolean) = { + logicalRelation: LogicalRelation)(sc: SQLContext): SparkPlan = { val tableName: String = relation.carbonRelation.metaData.carbonTable.getFactTableName.toLowerCase @@ -97,49 +90,32 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] { val projectSet = AttributeSet(projectList.flatMap(_.references)) val scan = CarbonScan(projectSet.toSeq, relation.carbonRelation, - predicates, - groupExprs, - useBinaryAggregation)(sqlContext) - val dimAggrsPresence: Boolean = scan.buildCarbonPlan.getDimAggregatorInfos.size() > 0 + predicates)(sqlContext) projectList.map { case attr: AttributeReference => case Alias(attr: AttributeReference, _) => case others => - others.references - .map(f => scan.attributesNeedToDecode.add(f.asInstanceOf[AttributeReference])) - } - if (!detailQuery) { - if (scan.attributesNeedToDecode.size > 0) { - val decoder = getCarbonDecoder(logicalRelation, - sc, - tableName, - scan.attributesNeedToDecode.asScala.toSeq, - scan) - if (scan.unprocessedExprs.nonEmpty) { - val filterCondToAdd = scan.unprocessedExprs.reduceLeftOption(expressions.And) - (Project(projectList, filterCondToAdd.map(Filter(_, decoder)).getOrElse(decoder)), true) - } else { - (Project(projectList, decoder), true) + others.references.map{f => + val dictionary = relation.carbonRelation.metaData.dictionaryMap.get(f.name) + if (dictionary.isDefined && dictionary.get) { + scan.attributesNeedToDecode.add(f.asInstanceOf[AttributeReference]) + } } + } + if (scan.attributesNeedToDecode.size() > 0) { + val decoder = getCarbonDecoder(logicalRelation, + sc, + tableName, + scan.attributesNeedToDecode.asScala.toSeq, + scan) + if (scan.unprocessedExprs.nonEmpty) { + val filterCondToAdd = scan.unprocessedExprs.reduceLeftOption(expressions.And) + Project(projectList, filterCondToAdd.map(Filter(_, decoder)).getOrElse(decoder)) } else { - (scan, dimAggrsPresence) + Project(projectList, decoder) } } else { - if (scan.attributesNeedToDecode.size() > 0) { - val decoder = getCarbonDecoder(logicalRelation, - sc, - tableName, - scan.attributesNeedToDecode.asScala.toSeq, - scan) - if (scan.unprocessedExprs.nonEmpty) { - val filterCondToAdd = scan.unprocessedExprs.reduceLeftOption(expressions.And) - (Project(projectList, filterCondToAdd.map(Filter(_, decoder)).getOrElse(decoder)), true) - } else { - (Project(projectList, decoder), true) - } - } else { - (Project(projectList, scan), dimAggrsPresence) - } + Project(projectList, scan) } } @@ -158,9 +134,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] { val projectExprsNeedToDecode = new java.util.HashSet[Attribute]() val scan = CarbonScan(projectList.map(_.toAttribute), relation.carbonRelation, - predicates, - None, - useBinaryAggregator = false)(sqlContext) + predicates)(sqlContext) projectExprsNeedToDecode.addAll(scan.attributesNeedToDecode) if (projectExprsNeedToDecode.size() > 0) { val decoder = getCarbonDecoder(logicalRelation, @@ -206,18 +180,6 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] { case _ => false } } - - private def isGroupByPresentOnMeasures(groupingExpressions: Seq[Expression], - carbonTable: CarbonTable): Boolean = { - groupingExpressions.map { g => - g.collect { - case attr: AttributeReference - if carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name) != null => - return true - } - } - false - } } object DDLStrategies extends Strategy { @@ -233,17 +195,17 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] { case DropTable(tableName, ifNotExists) if CarbonEnv.getInstance(sqlContext).carbonCatalog .tableExists(TableIdentifier(tableName.toLowerCase))(sqlContext) => - ExecutedCommand(DropCubeCommand(ifNotExists, None, tableName.toLowerCase)) :: Nil + ExecutedCommand(DropTableCommand(ifNotExists, None, tableName.toLowerCase)) :: Nil case ShowAggregateTablesCommand(schemaName) => ExecutedCommand(ShowAggregateTables(schemaName, plan.output)) :: Nil case ShowLoadsCommand(schemaName, cube, limit) => ExecutedCommand(ShowLoads(schemaName, cube, limit, plan.output)) :: Nil - case LoadCube(schemaNameOp, cubeName, factPathFromUser, dimFilesPath, + case LoadTable(schemaNameOp, cubeName, factPathFromUser, dimFilesPath, partionValues, isOverwriteExist, inputSqlString) => val isCarbonTable = CarbonEnv.getInstance(sqlContext).carbonCatalog .tableExists(TableIdentifier(cubeName, schemaNameOp))(sqlContext) if (isCarbonTable || partionValues.nonEmpty) { - ExecutedCommand(LoadCube(schemaNameOp, cubeName, factPathFromUser, + ExecutedCommand(LoadTable(schemaNameOp, cubeName, factPathFromUser, dimFilesPath, partionValues, isOverwriteExist, inputSqlString)) :: Nil } else { ExecutedCommand(HiveNativeCommand(inputSqlString)) :: Nil http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala index 03bb23e..0f583d0 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala @@ -55,17 +55,13 @@ class CarbonDecoderProcessor { case cd: CarbonDictionaryTempDecoder => nodeList.add(Node(cd)) process(cd.child, nodeList) - case j: Join => + case j: BinaryNode => val leftList = new util.ArrayList[AbstractNode] val rightList = new util.ArrayList[AbstractNode] nodeList.add(JoinNode(leftList, rightList)) process(j.left, leftList) process(j.right, rightList) - case p: Project => process(p.child, nodeList) - case f: Filter => process(f.child, nodeList) - case s: Sort => process(s.child, nodeList) - case a: Aggregate => process(a.child, nodeList) - case l: Limit => process(l.child, nodeList) + case e: UnaryNode => process(e.child, nodeList) case _ => } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala index d80c065..73cb3d5 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala @@ -44,13 +44,9 @@ class CarbonOptimizer(optimizer: Optimizer, conf: CatalystConf) override def execute(plan: LogicalPlan): LogicalPlan = { val executedPlan: LogicalPlan = optimizer.execute(plan) - if (!conf.asInstanceOf[CarbonSQLConf].pushComputation) { - val relations = collectCarbonRelation(plan) - if (relations.nonEmpty) { - new ResolveCarbonFunctions(relations)(executedPlan) - } else { - executedPlan - } + val relations = collectCarbonRelation(plan) + if (relations.nonEmpty) { + new ResolveCarbonFunctions(relations)(executedPlan) } else { executedPlan } @@ -111,19 +107,28 @@ class CarbonOptimizer(optimizer: Optimizer, conf: CatalystConf) case agg: Aggregate if !agg.child.isInstanceOf[CarbonDictionaryTempDecoder] => val attrsOndimAggs = new util.HashSet[Attribute] - agg.aggregateExpressions.map { aggExp => - aggExp.transform { - case aggExp: AggregateExpression => - collectDimensionAggregates(aggExp, attrsOndimAggs, aliasMap) - aggExp - case a@Alias(attr: Attribute, name) => - aliasMap.put(a.toAttribute, attr) - a - } + agg.aggregateExpressions.map { + case attr: AttributeReference => + case a@Alias(attr: AttributeReference, name) => aliasMap.put(a.toAttribute, attr) + case aggExp: AggregateExpression => + aggExp.transform { + case aggExp: AggregateExpression => + collectDimensionAggregates(aggExp, attrsOndimAggs, aliasMap) + aggExp + case a@Alias(attr: Attribute, name) => + aliasMap.put(a.toAttribute, attr) + a + } + case others => + others.collect { + case attr: AttributeReference + if isDictionaryEncoded(attr, relations, aliasMap) => + attrsOndimAggs.add(aliasMap.getOrElse(attr, attr)) + } } var child = agg.child // Incase if the child also aggregate then push down decoder to child - if (attrsOndimAggs.size() > 0 && !(child.equals(agg))) { + if (attrsOndimAggs.size() > 0 && !child.equals(agg)) { child = CarbonDictionaryTempDecoder(attrsOndimAggs, new util.HashSet[Attribute](), agg.child) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala b/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala index aee375a..cf31a7b 100644 --- a/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala +++ b/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala @@ -105,14 +105,19 @@ object CarbonFilters { def selectFilters(filters: Seq[Expression], attrList: java.util.HashSet[Attribute], aliasMap: CarbonAliasDecoderRelation): Unit = { - def translate(expr: Expression): Option[sources.Filter] = { + def translate(expr: Expression, or: Boolean = false): Option[sources.Filter] = { expr match { - case Or(left, right) => - for { - leftFilter <- translate(left) - rightFilter <- translate(right) - } yield { - sources.Or(leftFilter, rightFilter) + case or@ Or(left, right) => + + val leftFilter = translate(left, true) + val rightFilter = translate(right, true) + if (leftFilter.isDefined && rightFilter.isDefined) { + Some( sources.Or(leftFilter.get, rightFilter.get)) + } else { + or.collect { + case attr: AttributeReference => attrList.add(aliasMap.getOrElse(attr, attr)) + } + None } case And(left, right) => @@ -151,28 +156,35 @@ object CarbonFilters { Some(sources.In(a.name, hSet.toArray)) case others => - others.collect { - case attr: AttributeReference => - attrList.add(aliasMap.getOrElse(attr, attr)) + if (!or) { + others.collect { + case attr: AttributeReference => + attrList.add(aliasMap.getOrElse(attr, attr)) + } } None } } - filters.flatMap(translate).toArray + filters.flatMap(translate(_, false)).toArray } def processExpression(exprs: Seq[Expression], attributesNeedToDecode: java.util.HashSet[AttributeReference], unprocessedExprs: ArrayBuffer[Expression], carbonTable: CarbonTable): Option[CarbonExpression] = { - def transformExpression(expr: Expression): Option[CarbonExpression] = { + def transformExpression(expr: Expression, or: Boolean = false): Option[CarbonExpression] = { expr match { - case Or(left, right) => - for { - leftFilter <- transformExpression(left) - rightFilter <- transformExpression(right) - } yield { - new OrExpression(leftFilter, rightFilter) + case or@ Or(left, right) => + val leftFilter = transformExpression(left, true) + val rightFilter = transformExpression(right, true) + if (leftFilter.isDefined && rightFilter.isDefined) { + Some(new OrExpression(leftFilter.get, rightFilter.get)) + } else { + or.collect { + case attr: AttributeReference => attributesNeedToDecode.add(attr) + } + unprocessedExprs += or + None } case And(left, right) => @@ -220,14 +232,16 @@ object CarbonFilters { CarbonLiteralExpression(name, CarbonScalaUtil.convertSparkToCarbonDataType(dataType))) case Cast(left, right) if !left.isInstanceOf[Literal] => transformExpression(left) case others => - others.collect { - case attr: AttributeReference => attributesNeedToDecode.add(attr) + if (!or) { + others.collect { + case attr: AttributeReference => attributesNeedToDecode.add(attr) + } + unprocessedExprs += others } - unprocessedExprs += others None } } - exprs.flatMap(transformExpression).reduceOption(new AndExpression(_, _)) + exprs.flatMap(transformExpression(_, false)).reduceOption(new AndExpression(_, _)) } private def getActualCarbonDataType(column: String, carbonTable: CarbonTable) = { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/carbondata/spark/KeyVal.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/carbondata/spark/KeyVal.scala b/integration/spark/src/main/scala/org/carbondata/spark/KeyVal.scala index 6ee882b..cb87818 100644 --- a/integration/spark/src/main/scala/org/carbondata/spark/KeyVal.scala +++ b/integration/spark/src/main/scala/org/carbondata/spark/KeyVal.scala @@ -26,35 +26,23 @@ package org.carbondata.spark import org.carbondata.core.load.LoadMetadataDetails -import org.carbondata.query.carbon.result.BatchRawResult -import org.carbondata.query.scanner.impl.{CarbonKey, CarbonValue} -trait KeyVal[K, V] extends Serializable { - def getKey(key: CarbonKey, value: CarbonValue): (K, V) - -} - -class KeyValImpl extends KeyVal[CarbonKey, CarbonValue] { - override def getKey(key: CarbonKey, value: CarbonValue): (CarbonKey, CarbonValue) = (key, value) +trait Value[V] extends Serializable { + def getValue(value: Array[Object]): V } -trait RawKeyVal[K, V] extends Serializable { - def getKey(key: BatchRawResult, value: Any): (K, V) - +class ValueImpl extends Value[Array[Object]] { + override def getValue(value: Array[Object]): Array[Object] = value } -class RawKeyValImpl extends RawKeyVal[BatchRawResult, Any] { - override def getKey(key: BatchRawResult, value: Any): (BatchRawResult, Any) = (key, value) +trait RawValue[V] extends Serializable { + def getValue(value: Array[Any]): V } -trait RawKey[K, V] extends Serializable { - def getKey(key: Array[Any], value: Any): (K, V) - +class RawValueImpl extends RawValue[Array[Any]] { + override def getValue(value: Array[Any]): Array[Any] = value } -class RawKeyImpl extends RawKey[Array[Any], Any] { - override def getKey(key: Array[Any], value: Any): (Array[Any], Any) = (key, value) -} trait Result[K, V] extends Serializable { def getKey(key: Int, value: LoadMetadataDetails): (K, V) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonCleanFilesRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonCleanFilesRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonCleanFilesRDD.scala index 4e71be1..17d83b2 100644 --- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonCleanFilesRDD.scala +++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonCleanFilesRDD.scala @@ -18,23 +18,23 @@ package org.carbondata.spark.rdd import scala.collection.JavaConverters._ +import scala.reflect.ClassTag import org.apache.spark.{Logging, Partition, SparkContext, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.execution.command.Partitioner -import org.carbondata.query.scanner.impl.{CarbonKey, CarbonValue} -import org.carbondata.spark.KeyVal +import org.carbondata.spark.Value import org.carbondata.spark.util.CarbonQueryUtil -class CarbonCleanFilesRDD[K, V]( +class CarbonCleanFilesRDD[V: ClassTag]( sc: SparkContext, - keyClass: KeyVal[K, V], + valueClass: Value[V], schemaName: String, cubeName: String, partitioner: Partitioner) - extends RDD[(K, V)](sc, Nil) with Logging { + extends RDD[V](sc, Nil) with Logging { sc.setLocalProperty("spark.scheduler.pool", "DDL") @@ -43,8 +43,8 @@ class CarbonCleanFilesRDD[K, V]( splits.zipWithIndex.map(s => new CarbonLoadPartition(id, s._2, s._1)) } - override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { - val iter = new Iterator[(K, V)] { + override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = { + val iter = new Iterator[(V)] { val split = theSplit.asInstanceOf[CarbonLoadPartition] logInfo("Input split: " + split.serializableHadoopSplit.value) // TODO call CARBON delete API @@ -61,14 +61,12 @@ class CarbonCleanFilesRDD[K, V]( !finished } - override def next(): (K, V) = { + override def next(): V = { if (!hasNext) { throw new java.util.NoSuchElementException("End of stream") } havePair = false - val row = new CarbonKey(null) - val value = new CarbonValue(null) - keyClass.getKey(row, value) + valueClass.getValue(null) } } @@ -78,7 +76,7 @@ class CarbonCleanFilesRDD[K, V]( override def getPreferredLocations(split: Partition): Seq[String] = { val theSplit = split.asInstanceOf[CarbonLoadPartition] val s = theSplit.serializableHadoopSplit.value.getLocations.asScala - logInfo("Host Name : " + s(0) + s.length) + logInfo("Host Name : " + s.head + s.length) s } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 0672281..af2271f 100644 --- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -29,23 +29,22 @@ import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileSplit import org.apache.spark.{Logging, Partition, SparkContext} -import org.apache.spark.sql.{CarbonEnv, CarbonRelation, SQLContext} +import org.apache.spark.sql.{CarbonEnv, SQLContext} import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionModel, Partitioner} import org.apache.spark.util.{FileUtils, SplitUtils} import org.carbondata.common.logging.LogServiceFactory -import org.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonDataLoadSchema, CarbonTableIdentifier} +import org.carbondata.core.carbon.CarbonDataLoadSchema import org.carbondata.core.carbon.datastore.block.TableBlockInfo import org.carbondata.core.carbon.metadata.CarbonMetadata import org.carbondata.core.carbon.metadata.schema.table.CarbonTable import org.carbondata.core.constants.CarbonCommonConstants import org.carbondata.core.load.{BlockDetails, LoadMetadataDetails} import org.carbondata.core.locks.{CarbonLockFactory, LockUsage} -import org.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.carbondata.core.util.CarbonUtil import org.carbondata.integration.spark.merger.CompactionType import org.carbondata.lcm.status.SegmentStatusManager import org.carbondata.processing.util.CarbonDataProcessorUtil -import org.carbondata.query.scanner.impl.{CarbonKey, CarbonValue} import org.carbondata.spark._ import org.carbondata.spark.load._ import org.carbondata.spark.merger.CarbonDataMergerUtil @@ -129,7 +128,7 @@ object CarbonDataRDDFactory extends Logging { if (-1 == currentRestructNumber) { currentRestructNumber = 0 } - var segmentStatusManager = new SegmentStatusManager(cube.getAbsoluteTableIdentifier) + val segmentStatusManager = new SegmentStatusManager(cube.getAbsoluteTableIdentifier) val loadMetadataDetailsArray = segmentStatusManager.readLoadMetadata(cube.getMetaDataFilepath()) .toList val resultMap = new CarbonDeleteLoadByDateRDD( @@ -408,9 +407,8 @@ object CarbonDataRDDFactory extends Logging { // Check if any load need to be deleted before loading new data deleteLoadsAndUpdateMetadata(carbonLoadModel, carbonTable, partitioner, hdfsStoreLocation, - false, - currentRestructNumber - ) + isForceDeletion = false, + currentRestructNumber) if (null == carbonLoadModel.getLoadMetadataDetails) { readLoadMetadataDetails(carbonLoadModel, hdfsStoreLocation) } @@ -668,9 +666,9 @@ object CarbonDataRDDFactory extends Logging { def readLoadMetadataDetails(model: CarbonLoadModel, hdfsStoreLocation: String): Unit = { val metadataPath = model.getCarbonDataLoadSchema.getCarbonTable.getMetaDataFilepath - var segmentStatusManager = new SegmentStatusManager(model.getCarbonDataLoadSchema.getCarbonTable - . - getAbsoluteTableIdentifier) + val segmentStatusManager = + new SegmentStatusManager( + model.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier) val details = segmentStatusManager.readLoadMetadata(metadataPath) model.setLoadMetadataDetails(details.toList.asJava) } @@ -704,22 +702,13 @@ object CarbonDataRDDFactory extends Logging { } } - def dropAggregateTable( + def dropTable( sc: SparkContext, schema: String, cube: String, partitioner: Partitioner) { - val kv: KeyVal[CarbonKey, CarbonValue] = new KeyValImpl() - new CarbonDropAggregateTableRDD(sc, kv, schema, cube, partitioner).collect - } - - def dropCube( - sc: SparkContext, - schema: String, - cube: String, - partitioner: Partitioner) { - val kv: KeyVal[CarbonKey, CarbonValue] = new KeyValImpl() - new CarbonDropCubeRDD(sc, kv, schema, cube, partitioner).collect + val v: Value[Array[Object]] = new ValueImpl() + new CarbonDropTableRDD(sc, v, schema, cube, partitioner).collect } def cleanFiles( @@ -735,7 +724,7 @@ object CarbonDataRDDFactory extends Logging { if (-1 == currentRestructNumber) { currentRestructNumber = 0 } - var carbonLock = CarbonLockFactory + val carbonLock = CarbonLockFactory .getCarbonLockObj(cube.getMetaDataFilepath, LockUsage.METADATA_LOCK) try { if (carbonLock.lockWithRetries()) { @@ -744,8 +733,7 @@ object CarbonDataRDDFactory extends Logging { partitioner, hdfsStoreLocation, isForceDeletion = true, - currentRestructNumber - ) + currentRestructNumber) } } finally { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala index 9fc5f9e..4616ca9 100644 --- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala +++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala @@ -18,36 +18,34 @@ package org.carbondata.spark.rdd import scala.collection.JavaConverters._ +import scala.reflect.ClassTag import org.apache.spark.{Logging, Partition, SparkContext, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.execution.command.Partitioner -import org.carbondata.query.scanner.impl.{CarbonKey, CarbonValue} -import org.carbondata.spark.KeyVal +import org.carbondata.spark.Value import org.carbondata.spark.util.CarbonQueryUtil -class CarbonDeleteLoadRDD[K, V]( - sc: SparkContext, - keyClass: KeyVal[K, V], - loadId: Int, - schemaName: String, - cubeName: String, - partitioner: Partitioner) - extends RDD[(K, V)](sc, Nil) with Logging { +class CarbonDeleteLoadRDD[V: ClassTag]( + sc: SparkContext, + valueClass: Value[V], + loadId: Int, + schemaName: String, + cubeName: String, + partitioner: Partitioner) + extends RDD[V](sc, Nil) with Logging { sc.setLocalProperty("spark.scheduler.pool", "DDL") override def getPartitions: Array[Partition] = { val splits = CarbonQueryUtil.getTableSplits(schemaName, cubeName, null, partitioner) - val result = new Array[Partition](splits.length) - for (i <- 0 until result.length) { - result(i) = new CarbonLoadPartition(id, i, splits(i)) + splits.zipWithIndex.map {f => + new CarbonLoadPartition(id, f._2, f._1) } - result } - override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { - val iter = new Iterator[(K, V)] { + override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = { + val iter = new Iterator[V] { val split = theSplit.asInstanceOf[CarbonLoadPartition] logInfo("Input split: " + split.serializableHadoopSplit.value) // TODO call CARBON delete API @@ -57,20 +55,18 @@ class CarbonDeleteLoadRDD[K, V]( override def hasNext: Boolean = { if (!finished && !havePair) { - finished = !false + finished = true havePair = !finished } !finished } - override def next(): (K, V) = { + override def next(): V = { if (!hasNext) { throw new java.util.NoSuchElementException("End of stream") } havePair = false - val row = new CarbonKey(null) - val value = new CarbonValue(null) - keyClass.getKey(row, value) + valueClass.getValue(null) } } @@ -81,7 +77,7 @@ class CarbonDeleteLoadRDD[K, V]( override def getPreferredLocations(split: Partition): Seq[String] = { val theSplit = split.asInstanceOf[CarbonLoadPartition] val s = theSplit.serializableHadoopSplit.value.getLocations.asScala - logInfo("Host Name : " + s(0) + s.length) + logInfo("Host Name : " + s.head + s.length) s } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDropAggregateTableRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDropAggregateTableRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDropAggregateTableRDD.scala deleted file mode 100644 index 1b57890..0000000 --- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDropAggregateTableRDD.scala +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.carbondata.spark.rdd - -import scala.collection.JavaConverters._ - -import org.apache.spark.{Logging, Partition, SparkContext, TaskContext} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.execution.command.Partitioner - -import org.carbondata.query.scanner.impl.{CarbonKey, CarbonValue} -import org.carbondata.spark.KeyVal -import org.carbondata.spark.util.CarbonQueryUtil - - -class CarbonDropAggregateTableRDD[K, V]( - sc: SparkContext, - keyClass: KeyVal[K, V], - schemaName: String, - cubeName: String, - partitioner: Partitioner) - extends RDD[(K, V)](sc, Nil) with Logging { - - sc.setLocalProperty("spark.scheduler.pool", "DDL") - - override def getPartitions: Array[Partition] = { - val splits = CarbonQueryUtil.getTableSplits(schemaName, cubeName, null, partitioner) - splits.zipWithIndex.map { s => - new CarbonLoadPartition(id, s._2, s._1) - } - } - - override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { - val iter = new Iterator[(K, V)] { - val split = theSplit.asInstanceOf[CarbonLoadPartition] - logInfo("Input split: " + split.serializableHadoopSplit.value) - // TODO call CARBON delete API - - var havePair = false - var finished = false - - override def hasNext: Boolean = { - if (!finished && !havePair) { - finished = true - havePair = !finished - } - !finished - } - - override def next(): (K, V) = { - if (!hasNext) { - throw new java.util.NoSuchElementException("End of stream") - } - havePair = false - val row = new CarbonKey(null) - val value = new CarbonValue(null) - keyClass.getKey(row, value) - } - } - iter - } - - override def getPreferredLocations(split: Partition): Seq[String] = { - val theSplit = split.asInstanceOf[CarbonLoadPartition] - val s = theSplit.serializableHadoopSplit.value.getLocations.asScala - logInfo("Host Name : " + s.head + s.length) - s - } -} - http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDropCubeRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDropCubeRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDropCubeRDD.scala deleted file mode 100644 index d0bc5d1..0000000 --- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDropCubeRDD.scala +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.carbondata.spark.rdd - -import org.apache.spark.{Logging, Partition, SparkContext, TaskContext} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.execution.command.Partitioner - -import org.carbondata.query.scanner.impl.{CarbonKey, CarbonValue} -import org.carbondata.spark.KeyVal -import org.carbondata.spark.util.CarbonQueryUtil - -class CarbonDropCubeRDD[K, V]( - sc: SparkContext, - keyClass: KeyVal[K, V], - schemaName: String, - cubeName: String, - partitioner: Partitioner) - extends RDD[(K, V)](sc, Nil) with Logging { - - sc.setLocalProperty("spark.scheduler.pool", "DDL") - - override def getPartitions: Array[Partition] = { - val splits = CarbonQueryUtil.getTableSplits(schemaName, cubeName, null, partitioner) - splits.zipWithIndex.map { s => - new CarbonLoadPartition(id, s._2, s._1) - } - } - - override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { - - val iter = new Iterator[(K, V)] { - val split = theSplit.asInstanceOf[CarbonLoadPartition] - - val partitionCount = partitioner.partitionCount - // TODO: Clear Btree from memory - - var havePair = false - var finished = false - - override def hasNext: Boolean = { - if (!finished && !havePair) { - finished = true - havePair = !finished - } - !finished - } - - override def next(): (K, V) = { - if (!hasNext) { - throw new java.util.NoSuchElementException("End of stream") - } - havePair = false - val row = new CarbonKey(null) - val value = new CarbonValue(null) - keyClass.getKey(row, value) - } - } - iter - } -} - http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDropTableRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDropTableRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDropTableRDD.scala new file mode 100644 index 0000000..513916c --- /dev/null +++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDropTableRDD.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.carbondata.spark.rdd + +import scala.reflect.ClassTag + +import org.apache.spark.{Logging, Partition, SparkContext, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.execution.command.Partitioner + +import org.carbondata.spark.Value +import org.carbondata.spark.util.CarbonQueryUtil + +class CarbonDropTableRDD[V: ClassTag]( + sc: SparkContext, + valueClass: Value[V], + schemaName: String, + cubeName: String, + partitioner: Partitioner) + extends RDD[V](sc, Nil) with Logging { + + sc.setLocalProperty("spark.scheduler.pool", "DDL") + + override def getPartitions: Array[Partition] = { + val splits = CarbonQueryUtil.getTableSplits(schemaName, cubeName, null, partitioner) + splits.zipWithIndex.map { s => + new CarbonLoadPartition(id, s._2, s._1) + } + } + + override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = { + + val iter = new Iterator[V] { + val split = theSplit.asInstanceOf[CarbonLoadPartition] + + val partitionCount = partitioner.partitionCount + // TODO: Clear Btree from memory + + var havePair = false + var finished = false + + override def hasNext: Boolean = { + if (!finished && !havePair) { + finished = true + havePair = !finished + } + !finished + } + + override def next(): V = { + if (!hasNext) { + throw new java.util.NoSuchElementException("End of stream") + } + havePair = false + valueClass.getValue(null) + } + } + iter + } +} + http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala index 5cb33fa..dd2a10a 100644 --- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -36,7 +36,6 @@ import org.carbondata.core.util.CarbonProperties import org.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit} import org.carbondata.integration.spark.merger.{CarbonCompactionExecutor, CarbonCompactionUtil, RowResultMerger} -import org.carbondata.query.carbon.result.{RowResult} import org.carbondata.query.carbon.result.iterator.RawResultIterator import org.carbondata.spark.MergeResult import org.carbondata.spark.load.{CarbonLoaderUtil, CarbonLoadModel} @@ -179,7 +178,7 @@ class CarbonMergerRDD[K, V]( val absoluteTableIdentifier: AbsoluteTableIdentifier = new AbsoluteTableIdentifier( hdfsStoreLocation, new CarbonTableIdentifier(schemaName, factTableName, tableId) ) - val (carbonInputFormat: CarbonInputFormat[RowResult], job: Job) = + val (carbonInputFormat: CarbonInputFormat[Array[Object]], job: Job) = QueryPlanUtil.createCarbonInputFormat(absoluteTableIdentifier) val result = new util.ArrayList[Partition](defaultParallelism) val mapsOfNodeBlockMapping: util.List[util.Map[String, util.List[TableBlockInfo]]] = new http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala index a95ae27..6693108 100644 --- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala @@ -21,6 +21,7 @@ package org.carbondata.spark.rdd import java.util import scala.collection.JavaConverters._ +import scala.reflect.ClassTag import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job @@ -33,10 +34,10 @@ import org.carbondata.core.iterator.CarbonIterator import org.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit} import org.carbondata.query.carbon.executor.QueryExecutorFactory import org.carbondata.query.carbon.model.QueryModel -import org.carbondata.query.carbon.result.{BatchRawResult, RowResult} -import org.carbondata.query.carbon.result.iterator.ChunkRawRowIterartor +import org.carbondata.query.carbon.result.BatchResult +import org.carbondata.query.carbon.result.iterator.ChunkRowIterator import org.carbondata.query.expression.Expression -import org.carbondata.spark.RawKey +import org.carbondata.spark.RawValue import org.carbondata.spark.load.CarbonLoaderUtil import org.carbondata.spark.util.QueryPlanUtil @@ -58,29 +59,29 @@ class CarbonSparkPartition(rddId: Int, val idx: Int, * CarbonData file, this RDD will leverage CarbonData's index information to do CarbonData file * level filtering in driver side. */ -class CarbonScanRDD[K, V]( +class CarbonScanRDD[V: ClassTag]( sc: SparkContext, queryModel: QueryModel, filterExpression: Expression, - keyClass: RawKey[K, V], + keyClass: RawValue[V], @transient conf: Configuration, cubeCreationTime: Long, schemaLastUpdatedTime: Long, baseStoreLocation: String) - extends RDD[(K, V)](sc, Nil) with Logging { + extends RDD[V](sc, Nil) with Logging { val defaultParallelism = sc.defaultParallelism override def getPartitions: Array[Partition] = { val startTime = System.currentTimeMillis() - val (carbonInputFormat: CarbonInputFormat[RowResult], job: Job) = + val (carbonInputFormat: CarbonInputFormat[Array[Object]], job: Job) = QueryPlanUtil.createCarbonInputFormat(queryModel.getAbsoluteTableIdentifier) val result = new util.ArrayList[Partition](defaultParallelism) val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) // set filter resolver tree try { - var filterResolver = carbonInputFormat + val filterResolver = carbonInputFormat .getResolvedFilter(job.getConfiguration, filterExpression) CarbonInputFormat.setFilterPredicates(job.getConfiguration, filterResolver) @@ -147,15 +148,15 @@ class CarbonScanRDD[K, V]( result.toArray(new Array[Partition](result.size())) } - override def compute(thepartition: Partition, context: TaskContext): Iterator[(K, V)] = { - val LOGGER = LogServiceFactory.getLogService(this.getClass().getName()); - val iter = new Iterator[(K, V)] { + override def compute(thepartition: Partition, context: TaskContext): Iterator[V] = { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + val iter = new Iterator[V] { var rowIterator: CarbonIterator[Array[Any]] = _ var queryStartTime: Long = 0 try { val carbonSparkPartition = thepartition.asInstanceOf[CarbonSparkPartition] if(!carbonSparkPartition.tableBlockInfos.isEmpty) { - queryModel.setQueryId(queryModel.getQueryId() + "_" + carbonSparkPartition.idx) + queryModel.setQueryId(queryModel.getQueryId + "_" + carbonSparkPartition.idx) // fill table block info queryModel.setTableBlockInfos(carbonSparkPartition.tableBlockInfos) queryStartTime = System.currentTimeMillis @@ -164,13 +165,13 @@ class CarbonScanRDD[K, V]( logInfo("*************************" + carbonPropertiesFilePath) if (null == carbonPropertiesFilePath) { System.setProperty("carbon.properties.filepath", - System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties"); + System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties") } // execute query - rowIterator = new ChunkRawRowIterartor( - QueryExecutorFactory.getQueryExecutor(queryModel).execute(queryModel) - .asInstanceOf[CarbonIterator[BatchRawResult]]) - .asInstanceOf[CarbonIterator[Array[Any]]] + rowIterator = new ChunkRowIterator( + QueryExecutorFactory.getQueryExecutor(queryModel).execute(queryModel). + asInstanceOf[CarbonIterator[BatchResult]]).asInstanceOf[CarbonIterator[Array[Any]]] + } } catch { case e: Exception => @@ -187,19 +188,18 @@ class CarbonScanRDD[K, V]( override def hasNext: Boolean = { if (!finished && !havePair) { - finished = (null == rowIterator) || (!rowIterator.hasNext()) + finished = (null == rowIterator) || (!rowIterator.hasNext) havePair = !finished } !finished } - override def next(): (K, V) = { + override def next(): V = { if (!hasNext) { throw new java.util.NoSuchElementException("End of stream") } havePair = false - val row = rowIterator.next() - keyClass.getKey(row, null) + keyClass.getValue(rowIterator.next()) } logInfo("********************** Total Time Taken to execute the query in Carbon Side: " + http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/carbondata/spark/util/QueryPlanUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/carbondata/spark/util/QueryPlanUtil.scala b/integration/spark/src/main/scala/org/carbondata/spark/util/QueryPlanUtil.scala index 81b6feb..321e0f8 100644 --- a/integration/spark/src/main/scala/org/carbondata/spark/util/QueryPlanUtil.scala +++ b/integration/spark/src/main/scala/org/carbondata/spark/util/QueryPlanUtil.scala @@ -25,7 +25,6 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.carbondata.core.carbon.AbsoluteTableIdentifier import org.carbondata.hadoop.CarbonInputFormat -import org.carbondata.query.carbon.result.RowResult /** * All the utility functions for carbon plan creation @@ -36,8 +35,8 @@ object QueryPlanUtil { * createCarbonInputFormat from query model */ def createCarbonInputFormat(absoluteTableIdentifier: AbsoluteTableIdentifier) : - (CarbonInputFormat[RowResult], Job) = { - val carbonInputFormat = new CarbonInputFormat[RowResult]() + (CarbonInputFormat[Array[Object]], Job) = { + val carbonInputFormat = new CarbonInputFormat[Array[Object]]() val jobConf: JobConf = new JobConf(new Configuration) val job: Job = new Job(jobConf) FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getStorePath)) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala index 4edfb7a..c2a2277 100644 --- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala +++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala @@ -64,6 +64,7 @@ class AllDataTypesTestCaseAggregate extends QueryTest with BeforeAndAfterAll { sql("select channelsId, Latest_DAY from Carbon_automation_test where count(channelsId) = 1").collect } catch { case ce: UnsupportedOperationException => ce.getMessage + case ce: Exception => ce.getMessage } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/processing/src/test/java/org/carbondata/processing/store/colgroup/ColGroupMinMaxTest.java ---------------------------------------------------------------------- diff --git a/processing/src/test/java/org/carbondata/processing/store/colgroup/ColGroupMinMaxTest.java b/processing/src/test/java/org/carbondata/processing/store/colgroup/ColGroupMinMaxTest.java index 18b2bfe..641d18a 100644 --- a/processing/src/test/java/org/carbondata/processing/store/colgroup/ColGroupMinMaxTest.java +++ b/processing/src/test/java/org/carbondata/processing/store/colgroup/ColGroupMinMaxTest.java @@ -84,7 +84,7 @@ public class ColGroupMinMaxTest { } setMinData(data[i]); setMaxData(data[i]); - System.out.println(Arrays.toString(data[i])); +// System.out.println(Arrays.toString(data[i])); } mdkeyData = new byte[1000][]; for (int i = 0; i < 1000; i++) {