Repository: incubator-carbondata Updated Branches: refs/heads/master 56aa1f8c0 -> 2a6d097d1
do not use inner interface of spark style add r style comment fix Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/06d44608 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/06d44608 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/06d44608 Branch: refs/heads/master Commit: 06d44608acaebe99a5a99e754a27e92262242004 Parents: 56aa1f8 Author: wangfei <wangfei_he...@126.com> Authored: Sun Dec 11 07:04:37 2016 +0800 Committer: jackylk <jacky.li...@huawei.com> Committed: Sun Dec 11 14:34:15 2016 +0800 ---------------------------------------------------------------------- .../spark/sql/CarbonDictionaryDecoder.scala | 13 ++--- .../spark/sql/SparkUnknownExpression.scala | 4 +- .../execution/CarbonLateDecodeStrategy.scala | 56 +++++++++++--------- 3 files changed, 39 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/06d44608/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala index db864c7..940c6d7 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala @@ -181,7 +181,7 @@ case class CarbonDictionaryDecoder( getDictionaryColumnIds(index)._3) } } - val result = unsafeProjection(new GenericMutableRow(data)) + val result = unsafeProjection(new GenericInternalRow(data)) total += System.currentTimeMillis() - startTime result } @@ -223,11 +223,12 @@ case class CarbonDictionaryDecoder( -class CarbonDecoderRDD(relations: Seq[CarbonDecoderRelation], - profile: CarbonProfile, - aliasMap: CarbonAliasDecoderRelation, - prev: RDD[Row], - output: Seq[Attribute]) +class CarbonDecoderRDD( + relations: Seq[CarbonDecoderRelation], + profile: CarbonProfile, + aliasMap: CarbonAliasDecoderRelation, + prev: RDD[Row], + output: Seq[Attribute]) extends RDD[Row](prev) { def canBeDecoded(attr: Attribute): Boolean = { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/06d44608/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala index 1a310c7..b4b0f3c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala @@ -22,7 +22,7 @@ import java.util.{ArrayList, List} import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Expression => SparkExpression, GenericMutableRow} +import org.apache.spark.sql.catalyst.expressions.{Expression => SparkExpression, GenericInternalRow} import org.apache.carbondata.core.carbon.metadata.encoder.Encoding import org.apache.carbondata.scan.expression.{ColumnExpression, ExpressionResult, UnknownExpression} @@ -48,7 +48,7 @@ class SparkUnknownExpression(var sparkExp: SparkExpression) } try { val result = evaluateExpression( - new GenericMutableRow(values.map(a => a.asInstanceOf[Any]).toArray)) + new GenericInternalRow(values.map(a => a.asInstanceOf[Any]).toArray)) val sparkRes = if (isExecutor) { result.asInstanceOf[InternalRow].get(0, sparkExp.dataType) } else { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/06d44608/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala index 57b2139..7a8920f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions.{Attribute, _} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.optimizer.CarbonDecoderRelation import org.apache.spark.sql.sources.{BaseRelation, Filter} @@ -63,10 +64,11 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { } - def getDecoderRDD(logicalRelation: LogicalRelation, - projectExprsNeedToDecode: ArrayBuffer[AttributeReference], - rdd: RDD[Row], - output: Seq[Attribute]): RDD[Row] = { + def getDecoderRDD( + logicalRelation: LogicalRelation, + projectExprsNeedToDecode: ArrayBuffer[AttributeReference], + rdd: RDD[Row], + output: Seq[Attribute]): RDD[Row] = { val table = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation] val relation = CarbonDecoderRelation(logicalRelation.attributeMap, logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]) @@ -83,10 +85,10 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { } private[this] def toCatalystRDD( - relation: LogicalRelation, - output: Seq[Attribute], - rdd: RDD[Row], - needDecode: ArrayBuffer[AttributeReference]): + relation: LogicalRelation, + output: Seq[Attribute], + rdd: RDD[Row], + needDecode: ArrayBuffer[AttributeReference]): RDD[InternalRow] = { val newRdd = if (needDecode.size > 0) { getDecoderRDD(relation, needDecode, rdd, output) @@ -101,12 +103,11 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { } protected def pruneFilterProject( - relation: LogicalRelation, - projects: Seq[NamedExpression], - filterPredicates: Seq[Expression], - scanBuilder: (Seq[Attribute], Array[Filter], - ArrayBuffer[AttributeReference]) => - RDD[InternalRow]) = { + relation: LogicalRelation, + projects: Seq[NamedExpression], + filterPredicates: Seq[Expression], + scanBuilder: (Seq[Attribute], Array[Filter], + ArrayBuffer[AttributeReference]) => RDD[InternalRow]) = { pruneFilterProjectRaw( relation, projects, @@ -117,12 +118,11 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { } protected def pruneFilterProjectRaw( - relation: LogicalRelation, - projects: Seq[NamedExpression], - filterPredicates: Seq[Expression], - scanBuilder: (Seq[Attribute], Seq[Expression], - Seq[Filter], ArrayBuffer[AttributeReference]) => - RDD[InternalRow]) = { + relation: LogicalRelation, + projects: Seq[NamedExpression], + filterPredicates: Seq[Expression], + scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter], + ArrayBuffer[AttributeReference]) => RDD[InternalRow]) = { val projectSet = AttributeSet(projects.flatMap(_.references)) val filterSet = AttributeSet(filterPredicates.flatMap(_.references)) @@ -212,20 +212,24 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { } attr } - val scan = execution.DataSourceScanExec.create( + val scan = new execution.RowDataSourceScanExec( updateProject, scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder), - relation.relation, metadata, relation.metastoreTableIdentifier) + // now carbon do not support partitioning, use UnknownPartitioning here, in future if + // we add bucket, we should change the partitioning + relation.relation, UnknownPartitioning(0), metadata, None) filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan) } else { // Don't request columns that are only referenced by pushed filters. val requestedColumns = (projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq val updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns, table, needDecoder) - val scan = execution.DataSourceScanExec.create( + val scan = new execution.RowDataSourceScanExec( updateRequestedColumns, scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder), - relation.relation, metadata, relation.metastoreTableIdentifier) + // now carbon do not support partitioning, use UnknownPartitioning here, in future if + // we add bucket, we should change the partitioning + relation.relation, UnknownPartitioning(0), metadata, None) execution.ProjectExec( projects, filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)) } @@ -254,8 +258,8 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { protected[sql] def selectFilters( - relation: BaseRelation, - predicates: Seq[Expression]): (Seq[Expression], Seq[Filter]) = { + relation: BaseRelation, + predicates: Seq[Expression]): (Seq[Expression], Seq[Filter]) = { // For conciseness, all Catalyst filter expressions of type `expressions.Expression` below are // called `predicate`s, while all data source filters of type `sources.Filter` are simply called