http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a83dba34/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala index 56deb8a..829c487 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala @@ -15,38 +15,27 @@ * limitations under the License. */ - package org.apache.spark.sql.hive -import scala.math.BigInt.int2bigInt +import scala.collection.JavaConverters._ import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression -import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, PhysicalOperation, QueryPlanner} -import org.apache.spark.sql.catalyst.plans.Inner -import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, Limit, LogicalPlan, Sort} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions.{AttributeSet, _} +import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, QueryPlanner} +import org.apache.spark.sql.catalyst.plans.logical.{Filter => LogicalFilter, LogicalPlan} import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand, ExecutedCommand, Filter, Project, SparkPlan} -import org.apache.spark.sql.execution.aggregate.Utils import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.{DescribeCommand => LogicalDescribeCommand, LogicalRelation} -import org.apache.spark.sql.execution.joins.{BroadCastFilterPushJoin, BuildLeft, BuildRight} import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, DropTable, HiveNativeCommand} +import org.apache.spark.sql.optimizer.{CarbonAliasDecoderRelation, CarbonDecoderRelation} import org.carbondata.common.logging.LogServiceFactory +import org.carbondata.core.carbon.metadata.schema.table.CarbonTable import org.carbondata.spark.exception.MalformedCarbonCommandException -object CarbonHiveSyntax { - - @transient - protected val sqlParser = new CarbonSqlParser - - def parse(sqlText: String): LogicalPlan = { - sqlParser.parse(sqlText) - } -} class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] { @@ -55,342 +44,179 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] { val LOGGER = LogServiceFactory.getLogService("CarbonStrategies") def getStrategies: Seq[Strategy] = { - val total = sqlContext.planner.strategies :+ CarbonTableScans :+ DDLStrategies + val total = sqlContext.planner.strategies :+ CarbonTableScan total } /** - * Carbon strategies for Carbon cube scanning + * Carbon strategies for performing late materizlization (decoding dictionary key + * as late as possbile) */ - private[sql] object CarbonTableScans extends Strategy { - - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case PhysicalOperation(projectList, predicates, - l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _)) => - carbonScan(projectList, - predicates, - carbonRelation.carbonRelation, - None, - None, - None, - isGroupByPresent = false, - detailQuery = true) :: Nil - - case Limit(IntegerLiteral(limit), - Sort(order, _, - p@CarbonAggregation(groupingExpressions, - aggregateExpressions, - PhysicalOperation( - projectList, - predicates, - l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _))))) => - val aggPlan = handleAggregation(plan, p, projectList, predicates, carbonRelation, - groupingExpressions, aggregateExpressions) - org.apache.spark.sql.execution.TakeOrderedAndProject(limit, - order, - None, - aggPlan.head) :: Nil - - case Limit(IntegerLiteral(limit), p@CarbonAggregation( - groupingExpressions, - aggregateExpressions, - PhysicalOperation(projectList, predicates, - l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _)))) => - val aggPlan = handleAggregation(plan, p, projectList, predicates, carbonRelation, - groupingExpressions, aggregateExpressions) - org.apache.spark.sql.execution.Limit(limit, aggPlan.head) :: Nil - - case CarbonAggregation( - groupingExpressions, - aggregateExpressions, - PhysicalOperation(projectList, predicates, - l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _))) => - handleAggregation(plan, plan, projectList, predicates, carbonRelation, - groupingExpressions, aggregateExpressions) - - case Limit(IntegerLiteral(limit), - PhysicalOperation(projectList, predicates, - l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _))) => - val (_, _, _, _, groupExprs, substitutesortExprs, limitExpr) = extractPlan(plan) - val s = carbonScan(projectList, predicates, carbonRelation.carbonRelation, groupExprs, - substitutesortExprs, limitExpr, isGroupByPresent = false, detailQuery = true) - org.apache.spark.sql.execution.Limit(limit, s) :: Nil - - case Limit(IntegerLiteral(limit), - Sort(order, _, - PhysicalOperation(projectList, predicates, - l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _)))) => - val (_, _, _, _, groupExprs, substitutesortExprs, limitExpr) = extractPlan(plan) - val s = carbonScan(projectList, predicates, carbonRelation.carbonRelation, groupExprs, - substitutesortExprs, limitExpr, isGroupByPresent = false, detailQuery = true) - org.apache.spark.sql.execution.TakeOrderedAndProject(limit, - order, - None, - s) :: Nil - - case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, - PhysicalOperation(projectList, predicates, - l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _)), right) - if canPushDownJoin(right, condition) => - LOGGER.info(s"pushing down for ExtractEquiJoinKeys:right") - val carbon = carbonScan(projectList, - predicates, - carbonRelation.carbonRelation, - None, - None, - None, - isGroupByPresent = false, - detailQuery = true) - val pushedDownJoin = BroadCastFilterPushJoin( - leftKeys: Seq[Expression], - rightKeys: Seq[Expression], - BuildRight, - carbon, - planLater(right), - condition) - - condition.map(Filter(_, pushedDownJoin)).getOrElse(pushedDownJoin) :: Nil - - case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, - PhysicalOperation(projectList, predicates, - l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _))) - if canPushDownJoin(left, condition) => - LOGGER.info(s"pushing down for ExtractEquiJoinKeys:left") - val carbon = carbonScan(projectList, - predicates, - carbonRelation.carbonRelation, - None, - None, - None, - isGroupByPresent = false, - detailQuery = true) - - val pushedDownJoin = BroadCastFilterPushJoin( - leftKeys: Seq[Expression], - rightKeys: Seq[Expression], - BuildLeft, - planLater(left), - carbon, - condition) - condition.map(Filter(_, pushedDownJoin)).getOrElse(pushedDownJoin) :: Nil - - case _ => Nil - } - - def handleAggregation(plan: LogicalPlan, - aggPlan: LogicalPlan, - projectList: Seq[NamedExpression], - predicates: Seq[Expression], - carbonRelation: CarbonDatasourceRelation, - groupingExpressions: Seq[Expression], - namedGroupingAttributes: Seq[NamedExpression]): - Seq[SparkPlan] = { - val (_, _, _, _, groupExprs, substitutesortExprs, limitExpr) = extractPlan(plan) - - val s = - try { - carbonScan(projectList, predicates, carbonRelation.carbonRelation, - Some(namedGroupingAttributes), substitutesortExprs, - limitExpr, groupingExpressions.nonEmpty) - } catch { - case e: Exception => null - } - - if (s != null) { - aggregatePlan(groupingExpressions, namedGroupingAttributes, s) - - } else { - (aggPlan, true) match { - case CarbonAggregation( - groupingExpressions, - namedGroupingAttributes, - PhysicalOperation(projectList, predicates, - l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _))) => - val (_, _, _, _, groupExprs, substitutesortExprs, limitExpr) = extractPlan(plan) - - - val s = carbonScan(projectList, predicates, carbonRelation.carbonRelation, - Some(namedGroupingAttributes), substitutesortExprs, limitExpr, - groupingExpressions.nonEmpty, detailQuery = true) - - aggregatePlan(groupingExpressions, namedGroupingAttributes, s) - } + private[sql] object CarbonTableScan extends Strategy { + + def apply(plan: LogicalPlan): Seq[SparkPlan] = { + plan match { + case PhysicalOperation(projectList, predicates, + l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _)) => + if (isStarQuery(plan)) { + carbonRawScanForStarQuery(projectList, predicates, carbonRelation, l)(sqlContext) :: Nil + } else { + carbonRawScan(projectList, + predicates, + carbonRelation, + l, + None, + detailQuery = true, + useBinaryAggregation = false)(sqlContext)._1 :: Nil + } + case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) => + CarbonDictionaryDecoder(relations, + profile, + aliasMap, + planLater(child))(sqlContext) :: Nil + case _ => + Nil } } - // TODO: It is duplicated code from spark. Need to find a way - private def aggregatePlan(groupingExpressions: Seq[Expression], - resultExpressions: Seq[NamedExpression], - child: SparkPlan) = { - // A single aggregate expression might appear multiple times in resultExpressions. - // In order to avoid evaluating an individual aggregate function multiple times, we'll - // build a set of the distinct aggregate expressions and build a function which can - // be used to re-write expressions so that they reference the single copy of the - // aggregate function which actually gets computed. - val aggregateExpressions = resultExpressions.flatMap { expr => - expr.collect { - case agg: AggregateExpression => agg - } - }.distinct - // For those distinct aggregate expressions, we create a map from the - // aggregate function to the corresponding attribute of the function. - val aggregateFunctionToAttribute = aggregateExpressions.map { agg => - val aggregateFunction = agg.aggregateFunction - val attribute = Alias(aggregateFunction, aggregateFunction.toString)().toAttribute - (aggregateFunction, agg.isDistinct) -> attribute - }.toMap - - val (functionsWithDistinct, functionsWithoutDistinct) = - aggregateExpressions.partition(_.isDistinct) - if (functionsWithDistinct.map(_.aggregateFunction.children).distinct.length > 1) { - // This is a sanity check. We should not reach here when we have multiple distinct - // column sets. Our MultipleDistinctRewriter should take care this case. - sys.error("You hit a query analyzer bug. Please report your query to " + - "Spark user mailing list.") - } - - val namedGroupingExpressions = groupingExpressions.map { - case ne: NamedExpression => ne -> ne - // If the expression is not a NamedExpressions, we add an alias. - // So, when we generate the result of the operator, the Aggregate Operator - // can directly get the Seq of attributes representing the grouping expressions. - case other => - val withAlias = Alias(other, other.toString)() - other -> withAlias - } - val groupExpressionMap = namedGroupingExpressions.toMap - - // The original `resultExpressions` are a set of expressions which may reference - // aggregate expressions, grouping column values, and constants. When aggregate operator - // emits output rows, we will use `resultExpressions` to generate an output projection - // which takes the grouping columns and final aggregate result buffer as input. - // Thus, we must re-write the result expressions so that their attributes match up with - // the attributes of the final result projection's input row: - val rewrittenResultExpressions = resultExpressions.map { expr => - expr.transformDown { - case AggregateExpression(aggregateFunction, _, isDistinct) => - // The final aggregation buffer's attributes will be `finalAggregationAttributes`, - // so replace each aggregate expression by its corresponding attribute in the set: - aggregateFunctionToAttribute(aggregateFunction, isDistinct) - case expression => - // Since we're using `namedGroupingAttributes` to extract the grouping key - // columns, we need to replace grouping key expressions with their corresponding - // attributes. We do not rely on the equality check at here since attributes may - // differ cosmetically. Instead, we use semanticEquals. - groupExpressionMap.collectFirst { - case (expr, ne) if expr semanticEquals expression => ne.toAttribute - }.getOrElse(expression) - }.asInstanceOf[NamedExpression] + /** + * Create carbon scan + */ + private def carbonRawScan(projectList: Seq[NamedExpression], + predicates: Seq[Expression], + relation: CarbonDatasourceRelation, + logicalRelation: LogicalRelation, + groupExprs: Option[Seq[Expression]], + detailQuery: Boolean, + useBinaryAggregation: Boolean)(sc: SQLContext): (SparkPlan, Boolean) = { + + val tableName: String = + relation.carbonRelation.metaData.carbonTable.getFactTableName.toLowerCase + // Check out any expressions are there in project list. if they are present then we need to + // decode them as well. + val projectSet = AttributeSet(projectList.flatMap(_.references)) + val scan = CarbonScan(projectSet.toSeq, + relation.carbonRelation, + predicates, + groupExprs, + useBinaryAggregation)(sqlContext) + val dimAggrsPresence: Boolean = scan.buildCarbonPlan.getDimAggregatorInfos.size() > 0 + projectList.map { + case attr: AttributeReference => + case Alias(attr: AttributeReference, _) => + case others => + others.references + .map(f => scan.attributesNeedToDecode.add(f.asInstanceOf[AttributeReference])) } - - val aggregateOperator = - if (aggregateExpressions.map(_.aggregateFunction).exists(!_.supportsPartial)) { - if (functionsWithDistinct.nonEmpty) { - sys.error("Distinct columns cannot exist in Aggregate operator containing " + - "aggregate functions which don't support partial aggregation.") + if (!detailQuery) { + if (scan.attributesNeedToDecode.size > 0) { + val decoder = getCarbonDecoder(logicalRelation, + sc, + tableName, + scan.attributesNeedToDecode.asScala.toSeq, + scan) + if (scan.unprocessedExprs.nonEmpty) { + val filterCondToAdd = scan.unprocessedExprs.reduceLeftOption(expressions.And) + (Project(projectList, filterCondToAdd.map(Filter(_, decoder)).getOrElse(decoder)), true) } else { - Utils.planAggregateWithoutPartial( - namedGroupingExpressions.map(_._2), - aggregateExpressions, - aggregateFunctionToAttribute, - rewrittenResultExpressions, - child) + (Project(projectList, decoder), true) } - } else if (functionsWithDistinct.isEmpty) { - Utils.planAggregateWithoutDistinct( - namedGroupingExpressions.map(_._2), - aggregateExpressions, - aggregateFunctionToAttribute, - rewrittenResultExpressions, - child) } else { - Utils.planAggregateWithOneDistinct( - namedGroupingExpressions.map(_._2), - functionsWithDistinct, - functionsWithoutDistinct, - aggregateFunctionToAttribute, - rewrittenResultExpressions, - child) + (scan, dimAggrsPresence) + } + } else { + if (scan.attributesNeedToDecode.size() > 0) { + val decoder = getCarbonDecoder(logicalRelation, + sc, + tableName, + scan.attributesNeedToDecode.asScala.toSeq, + scan) + if (scan.unprocessedExprs.nonEmpty) { + val filterCondToAdd = scan.unprocessedExprs.reduceLeftOption(expressions.And) + (Project(projectList, filterCondToAdd.map(Filter(_, decoder)).getOrElse(decoder)), true) + } else { + (Project(projectList, decoder), true) + } + } else { + (Project(projectList, scan), dimAggrsPresence) } - - aggregateOperator - } - - private def canPushDownJoin(otherRDDPlan: LogicalPlan, - joinCondition: Option[Expression]): Boolean = { - val pushdowmJoinEnabled = sqlContext.sparkContext.conf - .getBoolean("spark.carbon.pushdown.join.as.filter", defaultValue = true) - - if (!pushdowmJoinEnabled) { - return false - } - - otherRDDPlan match { - case BroadcastHint(p) => true - case p if sqlContext.conf.autoBroadcastJoinThreshold > 0 && - p.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold => - LOGGER.info("canPushDownJoin statistics:" + p.statistics.sizeInBytes) - true - case _ => false } } /** - * Create carbon scan + * Create carbon scan for star query */ - private def carbonScan(projectList: Seq[NamedExpression], - predicates: Seq[Expression], - relation: CarbonRelation, - groupExprs: Option[Seq[Expression]], - substitutesortExprs: Option[Seq[SortOrder]], - limitExpr: Option[Expression], - isGroupByPresent: Boolean, - detailQuery: Boolean = false) = { + private def carbonRawScanForStarQuery(projectList: Seq[NamedExpression], + predicates: Seq[Expression], + relation: CarbonDatasourceRelation, + logicalRelation: LogicalRelation)(sc: SQLContext): SparkPlan = { + + val tableName: String = + relation.carbonRelation.metaData.carbonTable.getFactTableName.toLowerCase + // Check out any expressions are there in project list. if they are present then we need to + // decode them as well. + val projectExprsNeedToDecode = new java.util.HashSet[Attribute]() + val scan = CarbonScan(projectList.map(_.toAttribute), + relation.carbonRelation, + predicates, + None, + useBinaryAggregator = false)(sqlContext) + projectExprsNeedToDecode.addAll(scan.attributesNeedToDecode) + if (projectExprsNeedToDecode.size() > 0) { + val decoder = getCarbonDecoder(logicalRelation, + sc, + tableName, + projectExprsNeedToDecode.asScala.toSeq, + scan) + if (scan.unprocessedExprs.nonEmpty) { + val filterCondToAdd = scan.unprocessedExprs.reduceLeftOption(expressions.And) + filterCondToAdd.map(Filter(_, decoder)).getOrElse(decoder) + } else { + decoder + } + } else { + scan + } + } - if (!detailQuery) { - val projectSet = AttributeSet(projectList.flatMap(_.references)) - CarbonTableScan( - projectSet.toSeq, - relation, - predicates, - groupExprs, - substitutesortExprs, - limitExpr, - isGroupByPresent, - detailQuery)(sqlContext) + def getCarbonDecoder(logicalRelation: LogicalRelation, + sc: SQLContext, + tableName: String, + projectExprsNeedToDecode: Seq[Attribute], + scan: CarbonScan): CarbonDictionaryDecoder = { + val relation = CarbonDecoderRelation(logicalRelation.attributeMap, + logicalRelation.relation.asInstanceOf[CarbonDatasourceRelation]) + val attrs = projectExprsNeedToDecode.map { attr => + val newAttr = AttributeReference(attr.name, + attr.dataType, + attr.nullable, + attr.metadata)(attr.exprId, Seq(tableName)) + relation.addAttribute(newAttr) + newAttr } - else { - val projectSet = AttributeSet(projectList.flatMap(_.references)) - Project(projectList, - CarbonTableScan(projectSet.toSeq, - relation, - predicates, - groupExprs, - substitutesortExprs, - limitExpr, - isGroupByPresent, - detailQuery)(sqlContext)) + CarbonDictionaryDecoder(Seq(relation), IncludeProfile(attrs), + CarbonAliasDecoderRelation(), scan)(sc) + } + private def isStarQuery(plan: LogicalPlan) = { + plan match { + case LogicalFilter(condition, + LogicalRelation(carbonRelation: CarbonDatasourceRelation, _)) => true + case LogicalRelation(carbonRelation: CarbonDatasourceRelation, _) => true + case _ => false } } - private def extractPlan(plan: LogicalPlan) = { - val (a, b, c, aliases, groupExprs, sortExprs, limitExpr) = - PhysicalOperation1.collectProjectsAndFilters(plan) - val substitutesortExprs = sortExprs match { - case Some(sort) => - Some(sort.map { - case SortOrder(a: Alias, direction) => - val ref = aliases.getOrElse(a.toAttribute, a) match { - case Alias(reference, name) => reference - case others => others - } - SortOrder(ref, direction) - case others => others - }) - case others => others + private def isGroupByPresentOnMeasures(groupingExpressions: Seq[Expression], + carbonTable: CarbonTable): Boolean = { + groupingExpressions.map { g => + g.collect { + case attr: AttributeReference + if carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name) != null => + return true + } } - (a, b, c, aliases, groupExprs, substitutesortExprs, limitExpr) + false } } @@ -406,7 +232,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] { ExecutedCommand(ShowAllTablesDetail(schemaName, plan.output)) :: Nil case DropTable(tableName, ifNotExists) if CarbonEnv.getInstance(sqlContext).carbonCatalog - .tableExists(TableIdentifier(tableName.toLowerCase))(sqlContext) => + .tableExists(TableIdentifier(tableName.toLowerCase))(sqlContext) => ExecutedCommand(DropCubeCommand(ifNotExists, None, tableName.toLowerCase)) :: Nil case ShowAggregateTablesCommand(schemaName) => ExecutedCommand(ShowAggregateTables(schemaName, plan.output)) :: Nil @@ -415,7 +241,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] { case LoadCube(schemaNameOp, cubeName, factPathFromUser, dimFilesPath, partionValues, isOverwriteExist, inputSqlString) => val isCarbonTable = CarbonEnv.getInstance(sqlContext).carbonCatalog - .tableExists(TableIdentifier(cubeName, schemaNameOp))(sqlContext) + .tableExists(TableIdentifier(cubeName, schemaNameOp))(sqlContext) if (isCarbonTable || partionValues.nonEmpty) { ExecutedCommand(LoadCube(schemaNameOp, cubeName, factPathFromUser, dimFilesPath, partionValues, isOverwriteExist, inputSqlString)) :: Nil @@ -433,15 +259,14 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] { } case DescribeFormattedCommand(sql, tblIdentifier) => val isCube = CarbonEnv.getInstance(sqlContext).carbonCatalog - .tableExists(tblIdentifier)(sqlContext) + .tableExists(tblIdentifier)(sqlContext) if (isCube) { val describe = LogicalDescribeCommand(UnresolvedRelation(tblIdentifier, None), isExtended = false) val resolvedTable = sqlContext.executePlan(describe.table).analyzed val resultPlan = sqlContext.executePlan(resolvedTable).executedPlan ExecutedCommand(DescribeCommandFormatted(resultPlan, plan.output, tblIdentifier)) :: Nil - } - else { + } else { ExecutedCommand(DescribeNativeCommand(sql, plan.output)) :: Nil } case describe@LogicalDescribeCommand(table, isExtended) => @@ -461,3 +286,13 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] { } } + +object CarbonHiveSyntax { + + @transient + protected val sqlParser = new CarbonSqlParser + + def parse(sqlText: String): LogicalPlan = { + sqlParser.parse(sqlText) + } +}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a83dba34/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategy.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategy.scala deleted file mode 100644 index 58c02ff..0000000 --- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategy.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan - -import org.carbondata.spark.exception.MalformedCarbonCommandException - -private[sql] object CarbonStrategy { - def getStrategy(context: SQLContext): Seq[Strategy] = { - val carbonStrategy = new CarbonStrategies(context) - if (context.conf.asInstanceOf[CarbonSQLConf].pushComputation) { - Seq(carbonStrategy.CarbonTableScans, carbonStrategy.DDLStrategies) - } else { - // TODO: need to remove duplicate code in strategies. - Seq(new CarbonRawStrategies(context).CarbonRawTableScans, carbonStrategy.DDLStrategies) - } - } -} - -private[spark] class CarbonSQLDialect(context: HiveContext) extends HiveQLDialect(context) { - - @transient - protected val sqlParser = new CarbonSqlParser - - override def parse(sqlText: String): LogicalPlan = { - - try { - sqlParser.parse(sqlText) - } catch { - // MalformedCarbonCommandException need to throw directly - // because hive can no parse carbon command - case ce: MalformedCarbonCommandException => - throw ce - case _ => super.parse(sqlText) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a83dba34/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala deleted file mode 100644 index 60c6a78..0000000 --- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.carbondata.spark.rdd - -import java.util - -import scala.collection.JavaConverters._ - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.mapreduce.Job -import org.apache.spark.{Logging, Partition, SparkContext, TaskContext} -import org.apache.spark.rdd.RDD - -import org.carbondata.common.logging.LogServiceFactory -import org.carbondata.core.cache.dictionary.Dictionary -import org.carbondata.core.carbon.datastore.block.TableBlockInfo -import org.carbondata.core.iterator.CarbonIterator -import org.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit} -import org.carbondata.query.carbon.executor.QueryExecutorFactory -import org.carbondata.query.carbon.model.QueryModel -import org.carbondata.query.carbon.result.RowResult -import org.carbondata.query.expression.Expression -import org.carbondata.query.filter.resolver.FilterResolverIntf -import org.carbondata.spark.KeyVal -import org.carbondata.spark.load.CarbonLoaderUtil -import org.carbondata.spark.util.QueryPlanUtil - -class CarbonSparkPartition(rddId: Int, val idx: Int, - val locations: Array[String], - val tableBlockInfos: util.List[TableBlockInfo]) - extends Partition { - - override val index: Int = idx - - // val serializableHadoopSplit = new SerializableWritable[Array[String]](locations) - override def hashCode(): Int = { - 41 * (41 + rddId) + idx - } -} - - - /** - * This RDD is used to perform query. - */ -class CarbonQueryRDD[K, V]( - sc: SparkContext, - queryModel: QueryModel, - filterExpression: Expression, - keyClass: KeyVal[K, V], - @transient conf: Configuration, - cubeCreationTime: Long, - schemaLastUpdatedTime: Long, - baseStoreLocation: String) - extends RDD[(K, V)](sc, Nil) with Logging { - - val defaultParallelism = sc.defaultParallelism - - override def getPartitions: Array[Partition] = { - val startTime = System.currentTimeMillis() - val (carbonInputFormat: CarbonInputFormat[RowResult], job: Job) = - QueryPlanUtil.createCarbonInputFormat(queryModel.getAbsoluteTableIdentifier) - - val result = new util.ArrayList[Partition](defaultParallelism) - val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - // set filter resolver tree - try { - var filterResolver = carbonInputFormat - .getResolvedFilter(job.getConfiguration, filterExpression) - - CarbonInputFormat.setFilterPredicates(job.getConfiguration, filterResolver) - queryModel.setFilterExpressionResolverTree(filterResolver) - } - catch { - case e: Exception => - LOGGER.error(e) - sys.error("Exception occurred in query execution :: " + e.getMessage) - } - // get splits - val splits = carbonInputFormat.getSplits(job) - if (!splits.isEmpty) { - val carbonInputSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit]) - - val blockList = carbonInputSplits.map(inputSplit => - new TableBlockInfo(inputSplit.getPath.toString, - inputSplit.getStart, inputSplit.getSegmentId, - inputSplit.getLocations, inputSplit.getLength - ) - ) - if (blockList.nonEmpty) { - // group blocks to nodes, tasks - val nodeBlockMapping = - CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1, defaultParallelism) - - var i = 0 - // Create Spark Partition for each task and assign blocks - nodeBlockMapping.asScala.foreach { entry => - entry._2.asScala.foreach { blocksPerTask => - if (blocksPerTask.size() != 0) { - result.add(new CarbonSparkPartition(id, i, Seq(entry._1).toArray, blocksPerTask)) - i += 1 - } - } - } - val noOfBlocks = blockList.size - val noOfNodes = nodeBlockMapping.size - val noOfTasks = result.size() - logInfo(s"Identified no.of.Blocks: $noOfBlocks," - + s"parallelism: $defaultParallelism , " + - s"no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks" - ) - logInfo("Time taken to identify Blocks to scan : " + - (System.currentTimeMillis() - startTime) - ) - result.asScala.foreach { r => - val cp = r.asInstanceOf[CarbonSparkPartition] - logInfo(s"Node : " + cp.locations.toSeq.mkString(",") - + ", No.Of Blocks : " + cp.tableBlockInfos.size() - ) - } - } else { - logInfo("No blocks identified to scan") - val nodesPerBlock = new util.ArrayList[TableBlockInfo]() - result.add(new CarbonSparkPartition(id, 0, Seq("").toArray, nodesPerBlock)) - } - } - else { - logInfo("No valid segments found to scan") - val nodesPerBlock = new util.ArrayList[TableBlockInfo]() - result.add(new CarbonSparkPartition(id, 0, Seq("").toArray, nodesPerBlock)) - } - result.toArray(new Array[Partition](result.size())) - } - - - override def compute(thepartition: Partition, context: TaskContext): Iterator[(K, V)] = { - val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - val iter = new Iterator[(K, V)] { - var rowIterator: CarbonIterator[_] = _ - var queryStartTime: Long = 0 - try { - val carbonSparkPartition = thepartition.asInstanceOf[CarbonSparkPartition] - if (!carbonSparkPartition.tableBlockInfos.isEmpty) { - queryModel.setQueryId(queryModel.getQueryId + "_" + carbonSparkPartition.idx) - // fill table block info - queryModel.setTableBlockInfos(carbonSparkPartition.tableBlockInfos) - queryStartTime = System.currentTimeMillis - - val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null) - logInfo("*************************" + carbonPropertiesFilePath) - if (null == carbonPropertiesFilePath) { - System.setProperty("carbon.properties.filepath", - System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties" - ) - } - // execute query - rowIterator = QueryExecutorFactory.getQueryExecutor(queryModel).execute(queryModel) - .asInstanceOf[CarbonIterator[RowResult]] - } - // TODOi - // : CarbonQueryUtil.isQuickFilter quick filter from dictionary needs to support - } catch { - case e: Throwable => - clearDictionaryCache(queryModel.getColumnToDictionaryMapping) - LOGGER.error(e) - // updateCubeAndLevelCacheStatus(levelCacheKeys) - if (null != e.getMessage) { - sys.error("Exception occurred in query execution :: " + e.getMessage) - } else { - sys.error("Exception occurred in query execution.Please check logs.") - } - } - - var havePair = false - var finished = false - var recordCount = 0 - - override def hasNext: Boolean = { - if (!finished && !havePair) { - finished = (null == rowIterator) || (!rowIterator.hasNext) - havePair = !finished - } - if (finished) { - clearDictionaryCache(queryModel.getColumnToDictionaryMapping) - } - !finished - } - - override def next(): (K, V) = { - if (!hasNext) { - throw new java.util.NoSuchElementException("End of stream") - } - havePair = false - val row = rowIterator.next() - val key = row.asInstanceOf[RowResult].getKey() - val value = row.asInstanceOf[RowResult].getValue() - recordCount += 1 - if (queryModel.getLimit != -1 && recordCount >= queryModel.getLimit) { - clearDictionaryCache(queryModel.getColumnToDictionaryMapping) - } - keyClass.getKey(key, value) - } - - def clearDictionaryCache(columnToDictionaryMap: java.util.Map[String, Dictionary]) = { - if (null != columnToDictionaryMap) { - org.carbondata.spark.util.CarbonQueryUtil - .clearColumnDictionaryCache(columnToDictionaryMap) - } - } - - logInfo("*************************** Total Time Taken to execute the query in Carbon Side: " + - (System.currentTimeMillis - queryStartTime) - ) - } - iter - } - - - /** - * Get the preferred locations where to launch this task. - */ - override def getPreferredLocations(partition: Partition): Seq[String] = { - val theSplit = partition.asInstanceOf[CarbonSparkPartition] - theSplit.locations.filter(_ != "localhost") - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a83dba34/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonRawQueryRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonRawQueryRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonRawQueryRDD.scala deleted file mode 100644 index 5993677..0000000 --- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonRawQueryRDD.scala +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.carbondata.spark.rdd - -import org.apache.hadoop.conf.Configuration -import org.apache.spark.{Logging, Partition, SparkContext, TaskContext} - -import org.carbondata.common.logging.LogServiceFactory -import org.carbondata.core.iterator.CarbonIterator -import org.carbondata.query.carbon.executor.QueryExecutorFactory -import org.carbondata.query.carbon.model.QueryModel -import org.carbondata.query.carbon.result.BatchRawResult -import org.carbondata.query.carbon.result.iterator.ChunkRawRowIterartor -import org.carbondata.query.expression.Expression -import org.carbondata.spark.{RawKey, RawKeyVal} - - -/** - * This RDD is used to perform query with raw data, it means it doesn't convert dictionary values - * to actual data. - * - * @param sc - * @param queryModel - * @param filterExpression - * @param keyClass - * @param conf - * @param cubeCreationTime - * @param schemaLastUpdatedTime - * @param baseStoreLocation - * @tparam K - * @tparam V - */ -class CarbonRawQueryRDD[K, V]( - sc: SparkContext, - queryModel: QueryModel, - filterExpression: Expression, - keyClass: RawKey[K, V], - @transient conf: Configuration, - cubeCreationTime: Long, - schemaLastUpdatedTime: Long, - baseStoreLocation: String) - extends CarbonQueryRDD[K, V](sc, - queryModel, - filterExpression, - null, - conf, - cubeCreationTime, - schemaLastUpdatedTime, - baseStoreLocation) with Logging { - - - override def compute(thepartition: Partition, context: TaskContext): Iterator[(K, V)] = { - val LOGGER = LogServiceFactory.getLogService(this.getClass().getName()); - val iter = new Iterator[(K, V)] { - var rowIterator: CarbonIterator[Array[Any]] = _ - var queryStartTime: Long = 0 - try { - val carbonSparkPartition = thepartition.asInstanceOf[CarbonSparkPartition] - if(!carbonSparkPartition.tableBlockInfos.isEmpty) { - queryModel.setQueryId(queryModel.getQueryId() + "_" + carbonSparkPartition.idx) - // fill table block info - queryModel.setTableBlockInfos(carbonSparkPartition.tableBlockInfos) - queryStartTime = System.currentTimeMillis - - val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null) - logInfo("*************************" + carbonPropertiesFilePath) - if (null == carbonPropertiesFilePath) { - System.setProperty("carbon.properties.filepath", - System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties"); - } - // execute query - rowIterator = new ChunkRawRowIterartor( - QueryExecutorFactory.getQueryExecutor(queryModel).execute(queryModel) - .asInstanceOf[CarbonIterator[BatchRawResult]]).asInstanceOf[CarbonIterator[Array[Any]]] - } - } catch { - case e: Exception => - LOGGER.error(e) - // updateCubeAndLevelCacheStatus(levelCacheKeys) - if (null != e.getMessage) { - sys.error("Exception occurred in query execution :: " + e.getMessage) - } else { - sys.error("Exception occurred in query execution.Please check logs.") - } - } - - var havePair = false - var finished = false - - override def hasNext: Boolean = { - if (!finished && !havePair) { - finished = (null == rowIterator) || (!rowIterator.hasNext()) - havePair = !finished - } - !finished - } - - override def next(): (K, V) = { - if (!hasNext) { - throw new java.util.NoSuchElementException("End of stream") - } - havePair = false - val row = rowIterator.next() - keyClass.getKey(row, null) - } - - logInfo("*************************** Total Time Taken to execute the query in Carbon Side: " + - (System.currentTimeMillis - queryStartTime) - ) - } - iter - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a83dba34/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala new file mode 100644 index 0000000..a95ae27 --- /dev/null +++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.carbondata.spark.rdd + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapreduce.Job +import org.apache.spark.{Logging, Partition, SparkContext, TaskContext} +import org.apache.spark.rdd.RDD + +import org.carbondata.common.logging.LogServiceFactory +import org.carbondata.core.carbon.datastore.block.TableBlockInfo +import org.carbondata.core.iterator.CarbonIterator +import org.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit} +import org.carbondata.query.carbon.executor.QueryExecutorFactory +import org.carbondata.query.carbon.model.QueryModel +import org.carbondata.query.carbon.result.{BatchRawResult, RowResult} +import org.carbondata.query.carbon.result.iterator.ChunkRawRowIterartor +import org.carbondata.query.expression.Expression +import org.carbondata.spark.RawKey +import org.carbondata.spark.load.CarbonLoaderUtil +import org.carbondata.spark.util.QueryPlanUtil + +class CarbonSparkPartition(rddId: Int, val idx: Int, + val locations: Array[String], + val tableBlockInfos: util.List[TableBlockInfo]) + extends Partition { + + override val index: Int = idx + + // val serializableHadoopSplit = new SerializableWritable[Array[String]](locations) + override def hashCode(): Int = { + 41 * (41 + rddId) + idx + } +} + + /** + * This RDD is used to perform query on CarbonData file. Before sending tasks to scan + * CarbonData file, this RDD will leverage CarbonData's index information to do CarbonData file + * level filtering in driver side. + */ +class CarbonScanRDD[K, V]( + sc: SparkContext, + queryModel: QueryModel, + filterExpression: Expression, + keyClass: RawKey[K, V], + @transient conf: Configuration, + cubeCreationTime: Long, + schemaLastUpdatedTime: Long, + baseStoreLocation: String) + extends RDD[(K, V)](sc, Nil) with Logging { + + val defaultParallelism = sc.defaultParallelism + + override def getPartitions: Array[Partition] = { + val startTime = System.currentTimeMillis() + val (carbonInputFormat: CarbonInputFormat[RowResult], job: Job) = + QueryPlanUtil.createCarbonInputFormat(queryModel.getAbsoluteTableIdentifier) + + val result = new util.ArrayList[Partition](defaultParallelism) + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + // set filter resolver tree + try { + var filterResolver = carbonInputFormat + .getResolvedFilter(job.getConfiguration, filterExpression) + + CarbonInputFormat.setFilterPredicates(job.getConfiguration, filterResolver) + queryModel.setFilterExpressionResolverTree(filterResolver) + } + catch { + case e: Exception => + LOGGER.error(e) + sys.error("Exception occurred in query execution :: " + e.getMessage) + } + // get splits + val splits = carbonInputFormat.getSplits(job) + if (!splits.isEmpty) { + val carbonInputSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit]) + + val blockList = carbonInputSplits.map(inputSplit => + new TableBlockInfo(inputSplit.getPath.toString, + inputSplit.getStart, inputSplit.getSegmentId, + inputSplit.getLocations, inputSplit.getLength + ) + ) + if (blockList.nonEmpty) { + // group blocks to nodes, tasks + val nodeBlockMapping = + CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1, defaultParallelism) + + var i = 0 + // Create Spark Partition for each task and assign blocks + nodeBlockMapping.asScala.foreach { entry => + entry._2.asScala.foreach { blocksPerTask => + if (blocksPerTask.size() != 0) { + result.add(new CarbonSparkPartition(id, i, Seq(entry._1).toArray, blocksPerTask)) + i += 1 + } + } + } + val noOfBlocks = blockList.size + val noOfNodes = nodeBlockMapping.size + val noOfTasks = result.size() + logInfo(s"Identified no.of.Blocks: $noOfBlocks," + + s"parallelism: $defaultParallelism , " + + s"no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks" + ) + logInfo("Time taken to identify Blocks to scan : " + + (System.currentTimeMillis() - startTime) + ) + result.asScala.foreach { r => + val cp = r.asInstanceOf[CarbonSparkPartition] + logInfo(s"Node : " + cp.locations.toSeq.mkString(",") + + ", No.Of Blocks : " + cp.tableBlockInfos.size() + ) + } + } else { + logInfo("No blocks identified to scan") + val nodesPerBlock = new util.ArrayList[TableBlockInfo]() + result.add(new CarbonSparkPartition(id, 0, Seq("").toArray, nodesPerBlock)) + } + } + else { + logInfo("No valid segments found to scan") + val nodesPerBlock = new util.ArrayList[TableBlockInfo]() + result.add(new CarbonSparkPartition(id, 0, Seq("").toArray, nodesPerBlock)) + } + result.toArray(new Array[Partition](result.size())) + } + + override def compute(thepartition: Partition, context: TaskContext): Iterator[(K, V)] = { + val LOGGER = LogServiceFactory.getLogService(this.getClass().getName()); + val iter = new Iterator[(K, V)] { + var rowIterator: CarbonIterator[Array[Any]] = _ + var queryStartTime: Long = 0 + try { + val carbonSparkPartition = thepartition.asInstanceOf[CarbonSparkPartition] + if(!carbonSparkPartition.tableBlockInfos.isEmpty) { + queryModel.setQueryId(queryModel.getQueryId() + "_" + carbonSparkPartition.idx) + // fill table block info + queryModel.setTableBlockInfos(carbonSparkPartition.tableBlockInfos) + queryStartTime = System.currentTimeMillis + + val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null) + logInfo("*************************" + carbonPropertiesFilePath) + if (null == carbonPropertiesFilePath) { + System.setProperty("carbon.properties.filepath", + System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties"); + } + // execute query + rowIterator = new ChunkRawRowIterartor( + QueryExecutorFactory.getQueryExecutor(queryModel).execute(queryModel) + .asInstanceOf[CarbonIterator[BatchRawResult]]) + .asInstanceOf[CarbonIterator[Array[Any]]] + } + } catch { + case e: Exception => + LOGGER.error(e) + if (null != e.getMessage) { + sys.error("Exception occurred in query execution :: " + e.getMessage) + } else { + sys.error("Exception occurred in query execution.Please check logs.") + } + } + + var havePair = false + var finished = false + + override def hasNext: Boolean = { + if (!finished && !havePair) { + finished = (null == rowIterator) || (!rowIterator.hasNext()) + havePair = !finished + } + !finished + } + + override def next(): (K, V) = { + if (!hasNext) { + throw new java.util.NoSuchElementException("End of stream") + } + havePair = false + val row = rowIterator.next() + keyClass.getKey(row, null) + } + + logInfo("********************** Total Time Taken to execute the query in Carbon Side: " + + (System.currentTimeMillis - queryStartTime) + ) + } + iter + } + + /** + * Get the preferred locations where to launch this task. + */ + override def getPreferredLocations(partition: Partition): Seq[String] = { + val theSplit = partition.asInstanceOf[CarbonSparkPartition] + theSplit.locations.filter(_ != "localhost") + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a83dba34/integration/spark/src/test/scala/org/carbondata/spark/testsuite/joinquery/EquiJoinTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/joinquery/EquiJoinTestCase.scala b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/joinquery/EquiJoinTestCase.scala deleted file mode 100644 index 3943352..0000000 --- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/joinquery/EquiJoinTestCase.scala +++ /dev/null @@ -1,41 +0,0 @@ -package org.carbondata.spark.testsuite.joinquery - -import org.apache.spark.sql.common.util.CarbonHiveContext._ -import org.apache.spark.sql.common.util.QueryTest -import org.scalatest.BeforeAndAfterAll -import org.apache.spark.sql.execution.joins.BroadCastFilterPushJoin - -class EquiJoinTestCase extends QueryTest with BeforeAndAfterAll { - override def beforeAll { - //loading to hive table - sql("create table employee_hive (empid string,empname string,mobilename string,mobilecolor string,salary int)row format delimited fields terminated by ','") - sql("create table mobile_hive (mobileid string,mobilename string, mobilecolor string, sales int)row format delimited fields terminated by ','"); - sql("LOAD DATA LOCAL INPATH './src/test/resources/join/employee.csv' into table employee_hive") - sql("LOAD DATA LOCAL INPATH './src/test/resources/join/mobile.csv' into table mobile_hive") - //loading to carbon table - sql("create table employee (empid string,empname string,mobilename string,mobilecolor string,salary int) stored by 'org.apache.carbondata.format'") - sql("create table mobile (mobileid string,mobilename string, mobilecolor string, sales int) stored by 'org.apache.carbondata.format'"); - sql("LOAD DATA LOCAL INPATH './src/test/resources/join/employee.csv' into table employee options('FILEHEADER'='empid,empname,mobilename,mobilecolor,salary')") - sql("LOAD DATA LOCAL INPATH './src/test/resources/join/mobile.csv' into table mobile options('FILEHEADER'='mobileid,mobilename,mobilecolor,sales')") - } - - test("test equijoin query") { - val df = sql("select employee.empname,mobile.mobilename from employee,mobile where employee.mobilename = mobile.mobilename") - var broadcastJoinExists = false - df.queryExecution.sparkPlan.collect { - case bcf: BroadCastFilterPushJoin => - broadcastJoinExists = true - } - if (!broadcastJoinExists) { - assert(false) - } - checkAnswer(df, - sql("select employee_hive.empname,mobile_hive.mobilename from employee_hive,mobile_hive where employee_hive.mobilename = mobile_hive.mobilename")) - } - override def afterAll { - sql("drop table employee_hive") - sql("drop table mobile_hive") - sql("drop table employee") - sql("drop table mobile") - } -} \ No newline at end of file