http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala deleted file mode 100644 index 2e93a6c..0000000 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala +++ /dev/null @@ -1,321 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import scala.collection.JavaConverters._ -import scala.language.implicitConversions - -import org.apache.hadoop.fs.Path -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.hive.{CarbonMetaData, CarbonMetastoreTypes} -import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types.{DataType, StructType} - -import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension} -import org.apache.carbondata.core.metadata.schema.table.column.CarbonImplicitDimension -import org.apache.carbondata.core.statusmanager.SegmentStatusManager -import org.apache.carbondata.core.util.path.CarbonStorePath -import org.apache.carbondata.processing.merger.TableMeta -import org.apache.carbondata.spark.{CarbonOption, _} - -/** - * Carbon relation provider compliant to data source api. - * Creates carbon relations - */ -class CarbonSource extends RelationProvider - with CreatableRelationProvider with HadoopFsRelationProvider with DataSourceRegister { - - override def shortName(): String = "carbondata" - - /** - * Returns a new base relation with the given parameters. - * Note: the parameters' keywords are case insensitive and this insensitivity is enforced - * by the Map that is passed to the function. - */ - override def createRelation( - sqlContext: SQLContext, - parameters: Map[String, String]): BaseRelation = { - // if path is provided we can directly create Hadoop relation. \ - // Otherwise create datasource relation - parameters.get("path") match { - case Some(path) => CarbonDatasourceHadoopRelation(sqlContext, Array(path), parameters, None) - case _ => - val options = new CarbonOption(parameters) - val tableIdentifier = options.tableIdentifier.split("""\.""").toSeq - val identifier = tableIdentifier match { - case Seq(name) => TableIdentifier(name, None) - case Seq(db, name) => TableIdentifier(name, Some(db)) - } - CarbonDatasourceRelation(identifier, None)(sqlContext) - } - } - - override def createRelation( - sqlContext: SQLContext, - mode: SaveMode, - parameters: Map[String, String], - data: SchemaRDD): BaseRelation = { - - // To avoid derby problem, dataframe need to be writen and read using CarbonContext - require(sqlContext.isInstanceOf[CarbonContext], "Error in saving dataframe to carbon file, " + - "must use CarbonContext to save dataframe") - - // User should not specify path since only one store is supported in carbon currently, - // after we support multi-store, we can remove this limitation - require(!parameters.contains("path"), "'path' should not be specified, " + - "the path to store carbon file is the 'storePath' specified when creating CarbonContext") - - val options = new CarbonOption(parameters) - val storePath = CarbonContext.getInstance(sqlContext.sparkContext).storePath - val tablePath = new Path(storePath + "/" + options.dbName + "/" + options.tableName) - val isExists = tablePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) - .exists(tablePath) - val (doSave, doAppend) = (mode, isExists) match { - case (SaveMode.ErrorIfExists, true) => - sys.error(s"ErrorIfExists mode, path $storePath already exists.") - case (SaveMode.Overwrite, true) => - val cc = CarbonContext.getInstance(sqlContext.sparkContext) - cc.sql(s"DROP TABLE IF EXISTS ${ options.dbName }.${ options.tableName }") - (true, false) - case (SaveMode.Overwrite, false) | (SaveMode.ErrorIfExists, false) => - (true, false) - case (SaveMode.Append, _) => - (false, true) - case (SaveMode.Ignore, exists) => - (!exists, false) - } - - if (doSave) { - // save data when the save mode is Overwrite. - new CarbonDataFrameWriter(data).saveAsCarbonFile(parameters) - } else if (doAppend) { - new CarbonDataFrameWriter(data).appendToCarbonFile(parameters) - } - - createRelation(sqlContext, parameters) - } - - override def createRelation(sqlContext: SQLContext, - paths: Array[String], - dataSchema: Option[StructType], - partitionColumns: Option[StructType], - parameters: Map[String, String]): HadoopFsRelation = { - CarbonDatasourceHadoopRelation(sqlContext, paths, parameters, dataSchema) - } -} - -/** - * Creates carbon relation compliant to data source api. - * This relation is stored to hive metastore - */ -private[sql] case class CarbonDatasourceRelation( - tableIdentifier: TableIdentifier, - alias: Option[String]) - (@transient context: SQLContext) - extends BaseRelation with Serializable { - - lazy val carbonRelation: CarbonRelation = { - CarbonEnv.get - .carbonMetastore.lookupRelation1(tableIdentifier, None)(sqlContext) - .asInstanceOf[CarbonRelation] - } - - def getDatabaseName(): String = tableIdentifier.database.getOrElse("default") - - def getTable(): String = tableIdentifier.table - - def schema: StructType = carbonRelation.schema - - def sqlContext: SQLContext = context - - override def sizeInBytes: Long = carbonRelation.sizeInBytes -} - -/** - * Represents logical plan for one carbon table - */ -case class CarbonRelation( - databaseName: String, - tableName: String, - var metaData: CarbonMetaData, - tableMeta: TableMeta, - alias: Option[String])(@transient sqlContext: SQLContext) - extends LeafNode with MultiInstanceRelation { - - def recursiveMethod(dimName: String, childDim: CarbonDimension): String = { - childDim.getDataType.getName.toLowerCase match { - case "array" => s"${ - childDim.getColName.substring(dimName.length + 1) - }:array<${ getArrayChildren(childDim.getColName) }>" - case "struct" => s"${ - childDim.getColName.substring(dimName.length + 1) - }:struct<${ getStructChildren(childDim.getColName) }>" - case dType => s"${ childDim.getColName.substring(dimName.length + 1) }:${ dType }" - } - } - - def getArrayChildren(dimName: String): String = { - metaData.carbonTable.getChildren(dimName).asScala.map(childDim => { - childDim.getDataType.getName.toLowerCase match { - case "array" => s"array<${ getArrayChildren(childDim.getColName) }>" - case "struct" => s"struct<${ getStructChildren(childDim.getColName) }>" - case dType => addDecimalScaleAndPrecision(childDim, dType) - } - }).mkString(",") - } - - def getStructChildren(dimName: String): String = { - metaData.carbonTable.getChildren(dimName).asScala.map(childDim => { - childDim.getDataType.getName.toLowerCase match { - case "array" => s"${ - childDim.getColName.substring(dimName.length + 1) - }:array<${ getArrayChildren(childDim.getColName) }>" - case "struct" => s"${ - childDim.getColName.substring(dimName.length + 1) - }:struct<${ metaData.carbonTable.getChildren(childDim.getColName) - .asScala.map(f => s"${ recursiveMethod(childDim.getColName, f) }").mkString(",") - }>" - case dType => s"${ childDim.getColName - .substring(dimName.length() + 1) }:${ addDecimalScaleAndPrecision(childDim, dType) }" - } - }).mkString(",") - } - - override def newInstance(): LogicalPlan = { - CarbonRelation(databaseName, tableName, metaData, tableMeta, alias)(sqlContext) - .asInstanceOf[this.type] - } - - val dimensionsAttr = { - val sett = new java.util.LinkedHashSet(tableMeta.carbonTable - .getDimensionByTableName(tableMeta.carbonTableIdentifier.getTableName).asScala.asJava) - sett.asScala.toSeq.filter(dim => !dim.isInvisible || - (dim.isInvisible && dim.isInstanceOf[CarbonImplicitDimension])) - .map(dim => { - val dimval = metaData.carbonTable - .getDimensionByName(metaData.carbonTable.getFactTableName, dim.getColName) - val output: DataType = dimval.getDataType.getName.toLowerCase match { - case "array" => - CarbonMetastoreTypes.toDataType(s"array<${ getArrayChildren(dim.getColName) }>") - case "struct" => - CarbonMetastoreTypes.toDataType(s"struct<${ getStructChildren(dim.getColName) }>") - case dType => - val dataType = addDecimalScaleAndPrecision(dimval, dType) - CarbonMetastoreTypes.toDataType(dataType) - } - - AttributeReference( - dim.getColName, - output, - nullable = true)(qualifiers = tableName +: alias.toSeq) - }) - } - - val measureAttr = { - val factTable = tableMeta.carbonTable.getFactTableName - new java.util.LinkedHashSet( - tableMeta.carbonTable. - getMeasureByTableName(tableMeta.carbonTable.getFactTableName). - asScala.asJava).asScala.toSeq.filter(!_.getColumnSchema.isInvisible) - .map(x => AttributeReference(x.getColName, CarbonMetastoreTypes.toDataType( - metaData.carbonTable.getMeasureByName(factTable, x.getColName).getDataType.getName - .toLowerCase match { - case "float" => "double" - case "decimal" => "decimal(" + x.getPrecision + "," + x.getScale + ")" - case others => others - }), - nullable = true)(qualifiers = tableName +: alias.toSeq)) - } - - override val output = { - val columns = tableMeta.carbonTable.getCreateOrderColumn(tableMeta.carbonTable.getFactTableName) - .asScala - columns.filter(!_.isInvisible).map { column => - if (column.isDimension()) { - val output: DataType = column.getDataType.getName.toLowerCase match { - case "array" => - CarbonMetastoreTypes.toDataType(s"array<${getArrayChildren(column.getColName)}>") - case "struct" => - CarbonMetastoreTypes.toDataType(s"struct<${getStructChildren(column.getColName)}>") - case dType => - val dataType = addDecimalScaleAndPrecision(column, dType) - CarbonMetastoreTypes.toDataType(dataType) - } - AttributeReference(column.getColName, output, - nullable = true - )(qualifiers = tableName +: alias.toSeq) - } else { - AttributeReference(column.getColName, CarbonMetastoreTypes.toDataType( - column.getDataType.getName.toLowerCase match { - case "float" => "double" - case "decimal" => "decimal(" + column.getColumnSchema.getPrecision + "," + column - .getColumnSchema.getScale + ")" - case others => others - } - ), - nullable = true - )(qualifiers = tableName +: alias.toSeq) - } - } - } - // TODO: Use data from the footers. - override lazy val statistics = Statistics(sizeInBytes = this.sizeInBytes) - - override def equals(other: Any): Boolean = { - other match { - case p: CarbonRelation => - p.databaseName == databaseName && p.output == output && p.tableName == tableName - case _ => false - } - } - - def addDecimalScaleAndPrecision(dimval: CarbonColumn, dataType: String): String = { - var dType = dataType - if (dimval.getDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DECIMAL) { - dType += - "(" + dimval.getColumnSchema.getPrecision + "," + dimval.getColumnSchema.getScale + ")" - } - dType - } - - private var tableStatusLastUpdateTime = 0L - - private var sizeInBytesLocalValue = 0L - - def sizeInBytes: Long = { - val tableStatusNewLastUpdatedTime = SegmentStatusManager.getTableStatusLastModifiedTime( - tableMeta.carbonTable.getAbsoluteTableIdentifier) - if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) { - val tablePath = CarbonStorePath.getCarbonTablePath( - tableMeta.storePath, - tableMeta.carbonTableIdentifier).getPath - val fileType = FileFactory.getFileType(tablePath) - if(FileFactory.isFileExist(tablePath, fileType)) { - tableStatusLastUpdateTime = tableStatusNewLastUpdatedTime - sizeInBytesLocalValue = FileFactory.getDirectorySize(tablePath) - } - } - sizeInBytesLocalValue - } - -} -
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala deleted file mode 100644 index c14a61a..0000000 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala +++ /dev/null @@ -1,259 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import scala.collection.JavaConverters._ - -import org.apache.spark.TaskContext -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.errors.attachTree -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.execution.{SparkPlan, UnaryNode} -import org.apache.spark.sql.hive.{CarbonMetastore, CarbonMetastoreTypes} -import org.apache.spark.sql.optimizer.CarbonDecoderRelation -import org.apache.spark.sql.types._ - -import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType} -import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier} -import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, ColumnIdentifier} -import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes} -import org.apache.carbondata.core.metadata.datatype.DataType -import org.apache.carbondata.core.metadata.encoder.Encoding -import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension -import org.apache.carbondata.core.util.{CarbonTimeStatisticsFactory, DataTypeUtil} -import org.apache.carbondata.core.util.path.CarbonStorePath -import org.apache.carbondata.spark.CarbonAliasDecoderRelation - -/** - * It decodes the dictionary key to value - */ -case class CarbonDictionaryDecoder( - relations: Seq[CarbonDecoderRelation], - profile: CarbonProfile, - aliasMap: CarbonAliasDecoderRelation, - child: SparkPlan) - (@transient sqlContext: SQLContext) - extends UnaryNode { - - override def otherCopyArgs: Seq[AnyRef] = sqlContext :: Nil - - override val output: Seq[Attribute] = { - child.output.map { a => - val attr = aliasMap.getOrElse(a, a) - val relation = relations.find(p => p.contains(attr)) - if (relation.isDefined && canBeDecoded(attr)) { - val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable - val carbonDimension = carbonTable - .getDimensionByName(carbonTable.getFactTableName, attr.name) - if (carbonDimension != null && - carbonDimension.hasEncoding(Encoding.DICTIONARY) && - !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) && - !carbonDimension.isComplex()) { - val newAttr = AttributeReference(a.name, - convertCarbonToSparkDataType(carbonDimension, - relation.get.carbonRelation.carbonRelation), - a.nullable, - a.metadata)(a.exprId, - a.qualifiers).asInstanceOf[Attribute] - newAttr - } else { - a - } - } else { - a - } - } - } - - - def canBeDecoded(attr: Attribute): Boolean = { - profile match { - case ip: IncludeProfile if ip.attributes.nonEmpty => - ip.attributes - .exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId) - case ep: ExcludeProfile => - !ep.attributes - .exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId) - case _ => true - } - } - - def convertCarbonToSparkDataType(carbonDimension: CarbonDimension, - relation: CarbonRelation): types.DataType = { - carbonDimension.getDataType match { - case CarbonDataTypes.STRING => StringType - case CarbonDataTypes.SHORT => ShortType - case CarbonDataTypes.INT => IntegerType - case CarbonDataTypes.LONG => LongType - case CarbonDataTypes.DOUBLE => DoubleType - case CarbonDataTypes.BOOLEAN => BooleanType - case CarbonDataTypes.DECIMAL => - val scale: Int = carbonDimension.getColumnSchema.getScale - val precision: Int = carbonDimension.getColumnSchema.getPrecision - if (scale == 0 && precision == 0) { - DecimalType(18, 2) - } else { - DecimalType(precision, scale) - } - case CarbonDataTypes.TIMESTAMP => TimestampType - case CarbonDataTypes.DATE => DateType - case CarbonDataTypes.STRUCT => - CarbonMetastoreTypes - .toDataType(s"struct<${ relation.getStructChildren(carbonDimension.getColName) }>") - case CarbonDataTypes.ARRAY => - CarbonMetastoreTypes - .toDataType(s"array<${ relation.getArrayChildren(carbonDimension.getColName) }>") - } - } - - val getDictionaryColumnIds = { - val attributes = child.output - val dictIds: Array[(String, ColumnIdentifier, DataType, CarbonDimension)] = - attributes.map { a => - val attr = aliasMap.getOrElse(a, a) - val relation = relations.find(p => p.contains(attr)) - if (relation.isDefined && canBeDecoded(attr)) { - val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable - val carbonDimension = - carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name) - if (carbonDimension != null && - carbonDimension.hasEncoding(Encoding.DICTIONARY) && - !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) && - !carbonDimension.isComplex()) { - (carbonTable.getFactTableName, carbonDimension.getColumnIdentifier, - carbonDimension.getDataType, carbonDimension) - } else { - (null, null, null, null) - } - } else { - (null, null, null, null) - } - - }.toArray - dictIds - } - - override def outputsUnsafeRows: Boolean = true - - override def canProcessUnsafeRows: Boolean = true - - override def canProcessSafeRows: Boolean = true - - override def doExecute(): RDD[InternalRow] = { - attachTree(this, "execute") { - val storePath = sqlContext.catalog.asInstanceOf[CarbonMetastore].storePath - val queryId = sqlContext.getConf("queryId", System.nanoTime() + "") - val absoluteTableIdentifiers = relations.map { relation => - val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable - (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier) - }.toMap - - val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder(queryId) - if (isRequiredToDecode) { - val dataTypes = child.output.map { attr => attr.dataType } - child.execute().mapPartitions { iter => - val cacheProvider: CacheProvider = CacheProvider.getInstance - val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] = - cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storePath) - val dicts: Seq[Dictionary] = getDictionary(absoluteTableIdentifiers, - forwardDictionaryCache) - val dictIndex = dicts.zipWithIndex.filter(x => x._1 != null).map(x => x._2) - // add a task completion listener to clear dictionary that is a decisive factor for - // LRU eviction policy - val dictionaryTaskCleaner = TaskContext.get - dictionaryTaskCleaner.addTaskCompletionListener(context => - dicts.foreach { dictionary => - if (null != dictionary) { - dictionary.clear() - } - } - ) - new Iterator[InternalRow] { - val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray) - - override final def hasNext: Boolean = { - iter.hasNext - } - - override final def next(): InternalRow = { - val row: InternalRow = iter.next() - val data = row.toSeq(dataTypes).toArray - dictIndex.foreach { index => - if (data(index) != null) { - data(index) = DataTypeUtil.getDataBasedOnDataType(dicts(index) - .getDictionaryValueForKeyInBytes(data(index).asInstanceOf[Int]), - getDictionaryColumnIds(index)._4) - } - } - val result = unsafeProjection(new GenericMutableRow(data)) - result - } - } - } - } else { - child.execute() - } - } - } - - private def isRequiredToDecode = { - getDictionaryColumnIds.find(p => p._1 != null) match { - case Some(value) => true - case _ => false - } - } - - private def getDictionary(atiMap: Map[String, AbsoluteTableIdentifier], - cache: Cache[DictionaryColumnUniqueIdentifier, Dictionary]) = { - val dictionaryColumnIds = getDictionaryColumnIds.map { dictionaryId => - if (dictionaryId._2 != null) { - new DictionaryColumnUniqueIdentifier( - atiMap(dictionaryId._1).getCarbonTableIdentifier, - dictionaryId._2, dictionaryId._3, - CarbonStorePath.getCarbonTablePath(atiMap(dictionaryId._1))) - } else { - null - } - } - try { - val noDictionaryIndexes = new java.util.ArrayList[Int]() - dictionaryColumnIds.zipWithIndex.foreach { columnIndex => - if (columnIndex._1 == null) { - noDictionaryIndexes.add(columnIndex._2) - } - } - val dict = cache.getAll(dictionaryColumnIds.filter(_ != null).toSeq.asJava); - val finalDict = new java.util.ArrayList[Dictionary]() - var dictIndex: Int = 0 - dictionaryColumnIds.zipWithIndex.foreach { columnIndex => - if (!noDictionaryIndexes.contains(columnIndex._2)) { - finalDict.add(dict.get(dictIndex)) - dictIndex += 1 - } else { - finalDict.add(null) - } - } - finalDict.asScala - } catch { - case t: Throwable => Seq.empty - } - - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala deleted file mode 100644 index 36cd6f2..0000000 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend -import org.apache.spark.sql.hive.{CarbonIUDAnalysisRule, CarbonMetastore} - -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.hadoop.readsupport.impl.RawDataReadSupport -import org.apache.carbondata.spark.rdd.SparkReadSupport - -case class CarbonEnv(carbonMetastore: CarbonMetastore) - -object CarbonEnv { - - @volatile private var carbonEnv: CarbonEnv = _ - - // set readsupport class global so that the executor can get it. - SparkReadSupport.readSupportClass = classOf[RawDataReadSupport] - - var initialized = false - - def init(sqlContext: SQLContext): Unit = { - if (!initialized) { - val cc = sqlContext.asInstanceOf[CarbonContext] - val catalog = new CarbonMetastore(cc, cc.storePath, cc.hiveClientInterface, "") - carbonEnv = CarbonEnv(catalog) - CarbonIUDAnalysisRule.init(sqlContext) - initialized = true - CarbonProperties.getInstance.addProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true") - } - } - - def get: CarbonEnv = { - if (initialized) carbonEnv - else throw new RuntimeException("CarbonEnv not initialized") - } -} - - http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSQLConf.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSQLConf.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSQLConf.scala deleted file mode 100644 index 6ed8c0d..0000000 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSQLConf.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import org.apache.spark.sql.SQLConf.SQLConfEntry -import org.apache.spark.sql.hive.CarbonSQLDialect - - /** - * A trait that enables the setting and getting of mutable config parameters/hints. - * - */ -class CarbonSQLConf extends SQLConf { - - override def dialect: String = { - getConf(SQLConf.DIALECT, - classOf[CarbonSQLDialect].getCanonicalName) - } - - override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false) - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala deleted file mode 100644 index a3c6343..0000000 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer - -import org.apache.spark.CarbonInputMetrics -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.execution.LeafNode -import org.apache.spark.sql.hive.CarbonMetastore - -import org.apache.carbondata.core.scan.model._ -import org.apache.carbondata.hadoop.{CarbonProjection, InputMetricsStats} -import org.apache.carbondata.spark.CarbonFilters -import org.apache.carbondata.spark.rdd.CarbonScanRDD - -case class CarbonScan( - var columnProjection: Seq[Attribute], - relationRaw: CarbonRelation, - dimensionPredicatesRaw: Seq[Expression], - useUnsafeCoversion: Boolean = true)(@transient val ocRaw: SQLContext) extends LeafNode { - val carbonTable = relationRaw.metaData.carbonTable - val selectedDims = scala.collection.mutable.MutableList[QueryDimension]() - val selectedMsrs = scala.collection.mutable.MutableList[QueryMeasure]() - @transient val carbonCatalog = ocRaw.catalog.asInstanceOf[CarbonMetastore] - - val attributesNeedToDecode = new java.util.LinkedHashSet[AttributeReference]() - val unprocessedExprs = new ArrayBuffer[Expression]() - - val buildCarbonPlan: CarbonQueryPlan = { - val plan: CarbonQueryPlan = new CarbonQueryPlan(relationRaw.databaseName, relationRaw.tableName) - plan.setQueryId(ocRaw.getConf("queryId", System.nanoTime() + "")) - processFilterExpressions(plan) - plan - } - - def processFilterExpressions(plan: CarbonQueryPlan) { - if (dimensionPredicatesRaw.nonEmpty) { - val expressionVal = CarbonFilters.processExpression( - dimensionPredicatesRaw, - attributesNeedToDecode, - unprocessedExprs, - carbonTable) - expressionVal match { - case Some(ce) => - // adding dimension used in expression in querystats - plan.setFilterExpression(ce) - case _ => - } - } - processExtraAttributes(plan) - } - - private def processExtraAttributes(plan: CarbonQueryPlan) { - if (attributesNeedToDecode.size() > 0) { - val attributeOut = new ArrayBuffer[Attribute]() ++ columnProjection - - attributesNeedToDecode.asScala.foreach { attr => - if (!columnProjection.exists(_.name.equalsIgnoreCase(attr.name))) { - attributeOut += attr - } - } - columnProjection = attributeOut - } - - val columns = carbonTable.getCreateOrderColumn(carbonTable.getFactTableName) - columns.addAll(carbonTable.getImplicitDimensionByTableName(carbonTable.getFactTableName)) - val colAttr = new Array[Attribute](columns.size()) - columnProjection.foreach { attr => - val column = - carbonTable.getColumnByName(carbonTable.getFactTableName, attr.name) - if(column != null) { - colAttr(columns.indexOf(column)) = attr - } - } - - columnProjection = colAttr.filter(f => f != null) - - var queryOrder: Integer = 0 - columnProjection.foreach { attr => - val carbonColumn = carbonTable.getColumnByName(carbonTable.getFactTableName, attr.name) - if (carbonColumn != null) { - if (carbonColumn.isDimension()) { - val dim = new QueryDimension(attr.name) - dim.setQueryOrder(queryOrder) - queryOrder = queryOrder + 1 - selectedDims += dim - } else { - val m1 = new QueryMeasure(attr.name) - m1.setQueryOrder(queryOrder) - queryOrder = queryOrder + 1 - selectedMsrs += m1 - } - } - } - - // Fill the selected dimensions & measures obtained from - // attributes to query plan for detailed query - selectedDims.foreach(plan.addDimension) - selectedMsrs.foreach(plan.addMeasure) - } - - def inputRdd: CarbonScanRDD = { - val projection = new CarbonProjection - columnProjection.foreach { attr => - projection.addColumn(attr.name) - } - val inputMetricsStats: CarbonInputMetrics = new CarbonInputMetrics - new CarbonScanRDD( - ocRaw.sparkContext, - projection, - buildCarbonPlan.getFilterExpression, - carbonTable.getAbsoluteTableIdentifier, - carbonTable.getTableInfo.serialize(), - carbonTable.getTableInfo, inputMetricsStats - ) - } - - override def outputsUnsafeRows: Boolean = - (attributesNeedToDecode.size() == 0) && useUnsafeCoversion - - override def doExecute(): RDD[InternalRow] = { - val outUnsafeRows: Boolean = (attributesNeedToDecode.size() == 0) && useUnsafeCoversion - inputRdd.mapPartitions { iter => - val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray) - new Iterator[InternalRow] { - override def hasNext: Boolean = iter.hasNext - - override def next(): InternalRow = { - val value = iter.next - if (outUnsafeRows) { - unsafeProjection(value) - } else { - value - } - } - } - } - } - - def output: Seq[Attribute] = { - columnProjection - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSparkUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSparkUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSparkUtil.scala deleted file mode 100644 index bc62a55..0000000 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSparkUtil.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import scala.collection.JavaConverters._ - -import org.apache.spark.sql.hive.{CarbonMetaData, DictionaryMap} - -import org.apache.carbondata.core.metadata.encoder.Encoding -import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.util.CarbonUtil - -case class TransformHolder(rdd: Any, mataData: CarbonMetaData) - -object CarbonSparkUtil { - - def createSparkMeta(carbonTable: CarbonTable): CarbonMetaData = { - val dimensionsAttr = carbonTable.getDimensionByTableName(carbonTable.getFactTableName) - .asScala.map(x => x.getColName) - // wf : may be problem - val measureAttr = carbonTable.getMeasureByTableName(carbonTable.getFactTableName) - .asScala.map(x => x.getColName) - val dictionary = - carbonTable.getDimensionByTableName(carbonTable.getFactTableName).asScala.map { f => - (f.getColName.toLowerCase, - f.hasEncoding(Encoding.DICTIONARY) && !f.hasEncoding(Encoding.DIRECT_DICTIONARY) && - !f.getDataType.isComplexType) - } - CarbonMetaData(dimensionsAttr, measureAttr, carbonTable, DictionaryMap(dictionary.toMap)) - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala deleted file mode 100644 index 9dc9ee2..0000000 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala +++ /dev/null @@ -1,589 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import scala.collection.JavaConverters._ -import scala.collection.mutable.Map -import scala.language.implicitConversions - -import org.apache.hadoop.hive.ql.lib.Node -import org.apache.hadoop.hive.ql.parse._ -import org.apache.spark.sql.catalyst._ -import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._ -import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.execution.ExplainCommand -import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.datasources.DescribeCommand -import org.apache.spark.sql.hive.HiveQlWrapper -import org.apache.spark.sql.types.StructField - -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException -import org.apache.carbondata.spark.util.CommonUtil - -/** - * Parser for All Carbon DDL, DML cases in Unified context - */ -class CarbonSqlParser() extends CarbonDDLSqlParser { - - override def parse(input: String): LogicalPlan = { - synchronized { - // Initialize the Keywords. - initLexical - phrase(start)(new lexical.Scanner(input)) match { - case Success(plan, _) => plan match { - case x: LoadTable => - x.inputSqlString = input - x - case logicalPlan => logicalPlan - } - case failureOrError => sys.error(failureOrError.toString) - } - } - } - - override protected lazy val start: Parser[LogicalPlan] = explainPlan | startCommand - - protected lazy val startCommand: Parser[LogicalPlan] = - createDatabase | dropDatabase | loadManagement | describeTable | - showPartitions | showLoads | alterTable | updateTable | deleteRecords | useDatabase | - createTable - - protected lazy val loadManagement: Parser[LogicalPlan] = - deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew - - protected lazy val createDatabase: Parser[LogicalPlan] = - CREATE ~> (DATABASE | SCHEMA) ~> restInput ^^ { - case statement => - val createDbSql = "CREATE DATABASE " + statement - var dbName = "" - // Get Ast node for create db command - val node = HiveQlWrapper.getAst(createDbSql) - node match { - // get dbname - case Token("TOK_CREATEDATABASE", children) => - dbName = BaseSemanticAnalyzer.unescapeIdentifier(children(0).getText) - } - CreateDatabase(convertDbNameToLowerCase(dbName), createDbSql) - } - - protected lazy val dropDatabase: Parser[LogicalPlan] = - DROP ~> (DATABASE | SCHEMA) ~> restInput ^^ { - case statement => - val dropDbSql = "DROP DATABASE " + statement - var dbName = "" - var isCascade = false - // Get Ast node for drop db command - val node = HiveQlWrapper.getAst(dropDbSql) - node match { - case Token("TOK_DROPDATABASE", children) => - dbName = BaseSemanticAnalyzer.unescapeIdentifier(children(0).getText) - // check whether cascade drop db - children.collect { - case t@Token("TOK_CASCADE", _) => - isCascade = true - case _ => // Unsupport features - } - } - DropDatabase(convertDbNameToLowerCase(dbName), isCascade, dropDbSql) - } - - protected lazy val alterTable: Parser[LogicalPlan] = - ALTER ~> TABLE ~> restInput ^^ { - case statement => - try { - val alterSql = "alter table " + statement - // DDl will be parsed and we get the AST tree from the HiveQl - val node = HiveQlWrapper.getAst(alterSql) - // processing the AST tree - nodeToPlanForAlterTable(node, alterSql) - } catch { - // MalformedCarbonCommandException need to be throw directly, parser will catch it - case ce: MalformedCarbonCommandException => - throw ce - } - } - - /** - * For handling the create table DDl systax compatible to Hive syntax - */ - protected lazy val createTable: Parser[LogicalPlan] = - restInput ^^ { - - case statement => - try { - // DDl will be parsed and we get the AST tree from the HiveQl - val node = HiveQlWrapper.getAst(statement) - // processing the AST tree - nodeToPlan(node) - } catch { - // MalformedCarbonCommandException need to be throw directly, parser will catch it - case ce: MalformedCarbonCommandException => - throw ce - case e: Exception => - sys.error("Parsing error") // no need to do anything. - } - } - - /** - * This function will traverse the tree and logical plan will be formed using that. - * - * @param node - * @return LogicalPlan - */ - protected def nodeToPlan(node: Node): LogicalPlan = { - node match { - // if create table taken is found then only we will handle. - case Token("TOK_CREATETABLE", children) => - - - var fields: Seq[Field] = Seq[Field]() - var tableComment: String = "" - var tableProperties = Map[String, String]() - var partitionByFields: Seq[Field] = Seq[Field]() - var partitionCols: Seq[PartitionerField] = Seq[PartitionerField]() - var likeTableName: String = "" - var storedBy: String = "" - var ifNotExistPresent: Boolean = false - var dbName: Option[String] = None - var tableName: String = "" - var bucketFields: Option[BucketFields] = None - - try { - - // Checking whether create table request is carbon table - children.collect { - case Token("TOK_STORAGEHANDLER", child :: Nil) => - storedBy = BaseSemanticAnalyzer.unescapeSQLString(child.getText).trim.toLowerCase - case _ => - } - if (!(storedBy.equals(CarbonContext.datasourceName) || - storedBy.equals(CarbonContext.datasourceShortName))) { - sys.error("Not a carbon format request") - } - - children.collect { - // collecting all the field list - case list@Token("TOK_TABCOLLIST", _) => - val cols = BaseSemanticAnalyzer.getColumns(list, true) - if (cols != null) { - val dupColsGrp = cols.asScala.groupBy(x => x.getName) filter { - case (_, colList) => colList.size > 1 - } - if (dupColsGrp.nonEmpty) { - var columnName: String = "" - dupColsGrp.toSeq.foreach(columnName += _._1 + ", ") - columnName = columnName.substring(0, columnName.lastIndexOf(", ")) - val errorMessage = "Duplicate column name: " + columnName + " found in table " + - ".Please check create table statement." - throw new MalformedCarbonCommandException(errorMessage) - } - cols.asScala.map { col => - val columnName = col.getName() - val dataType = Option(col.getType) - val name = Option(col.getName()) - // This is to parse complex data types - val x = '`' + col.getName + '`' + ' ' + col.getType - val f: Field = anyFieldDef(new lexical.Scanner(x)) - match { - case Success(field, _) => field - case failureOrError => throw new MalformedCarbonCommandException( - s"Unsupported data type: $col.getType") - } - // the data type of the decimal type will be like decimal(10,0) - // so checking the start of the string and taking the precision and scale. - // resetting the data type with decimal - if (f.dataType.getOrElse("").startsWith("decimal")) { - val (precision, scale) = getScaleAndPrecision(col.getType) - f.precision = precision - f.scale = scale - f.dataType = Some("decimal") - } - if (f.dataType.getOrElse("").startsWith("char")) { - f.dataType = Some("char") - } else if (f.dataType.getOrElse("").startsWith("float")) { - f.dataType = Some("float") - } - f.rawSchema = x - fields ++= Seq(f) - } - } - - case Token("TOK_IFNOTEXISTS", _) => - ifNotExistPresent = true - - case t@Token("TOK_TABNAME", _) => - val (db, tblName) = extractDbNameTableName(t) - dbName = db - tableName = tblName.toLowerCase() - - case Token("TOK_TABLECOMMENT", child :: Nil) => - tableComment = BaseSemanticAnalyzer.unescapeSQLString(child.getText) - - case Token("TOK_TABLEPARTCOLS", list@Token("TOK_TABCOLLIST", _) :: Nil) => - val cols = BaseSemanticAnalyzer.getColumns(list(0), false) - if (cols != null) { - cols.asScala.map { col => - val columnName = col.getName() - val dataType = Option(col.getType) - val comment = col.getComment - val rawSchema = '`' + col.getName + '`' + ' ' + col.getType - val field = Field(columnName, dataType, Some(columnName), None) - - // the data type of the decimal type will be like decimal(10,0) - // so checking the start of the string and taking the precision and scale. - // resetting the data type with decimal - if (field.dataType.getOrElse("").startsWith("decimal")) { - val (precision, scale) = getScaleAndPrecision(col.getType) - field.precision = precision - field.scale = scale - field.dataType = Some("decimal") - } - if (field.dataType.getOrElse("").startsWith("char")) { - field.dataType = Some("char") - } else if (field.dataType.getOrElse("").startsWith("float")) { - field.dataType = Some("float") - } - field.rawSchema = rawSchema - val partitionCol = new PartitionerField(columnName, dataType, comment) - partitionCols ++= Seq(partitionCol) - partitionByFields ++= Seq(field) - } - } - case Token("TOK_TABLEPROPERTIES", list :: Nil) => - val propertySeq: Seq[(String, String)] = getProperties(list) - val repeatedProperties = propertySeq.groupBy(_._1).filter(_._2.size > 1).keySet - if (repeatedProperties.nonEmpty) { - val repeatedPropStr: String = repeatedProperties.mkString(",") - throw new MalformedCarbonCommandException("Table properties is repeated: " + - repeatedPropStr) - } - tableProperties ++= propertySeq - - case Token("TOK_LIKETABLE", child :: Nil) => - likeTableName = child.getChild(0).getText() - case Token("TOK_ALTERTABLE_BUCKETS", - Token("TOK_TABCOLNAME", list) :: numberOfBuckets) => - val cols = list.map(_.getText) - if (cols != null) { - bucketFields = Some(BucketFields(cols, - numberOfBuckets.head.getText.toInt)) - } - - case _ => // Unsupport features - } - - // validate tblProperties - if (!CommonUtil.validateTblProperties(tableProperties, fields)) { - throw new MalformedCarbonCommandException("Invalid table properties") - } - - if (partitionCols.nonEmpty) { - if (!CommonUtil.validatePartitionColumns(tableProperties, partitionCols)) { - throw new MalformedCarbonCommandException("Invalid partition definition") - } - // partition columns should not be part of the schema - val colNames = fields.map(_.column) - val badPartCols = partitionCols.map(_.partitionColumn).toSet.intersect(colNames.toSet) - if (badPartCols.nonEmpty) { - throw new MalformedCarbonCommandException( - "Partition columns should not be specified in the schema: " + - badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]")) - } - fields ++= partitionByFields - } - - // prepare table model of the collected tokens - val tableModel: TableModel = prepareTableModel(ifNotExistPresent, - dbName, - tableName, - fields, - partitionCols, - tableProperties, - bucketFields) - - // get logical plan. - CreateTable(tableModel) - } catch { - case ce: MalformedCarbonCommandException => - val message = if (tableName.isEmpty) { - "Create table command failed. " - } - else if (dbName.isEmpty) { - s"Create table command failed for $tableName. " - } - else { - s"Create table command failed for ${ dbName.get }.$tableName. " - } - LOGGER.audit(message + ce.getMessage) - throw ce - } - - } - } - - /** - * This function will traverse the tree and logical plan will be formed using that. - * - * @param node - * @return LogicalPlan - */ - protected def nodeToPlanForAlterTable(node: Node, alterSql: String): LogicalPlan = { - node match { - // if create table taken is found then only we will handle. - case Token("TOK_ALTERTABLE", children) => - - var dbName: Option[String] = None - var tableName: String = "" - var compactionType: String = "" - - children.collect { - - case t@Token("TOK_TABNAME", _) => - val (db, tblName) = extractDbNameTableName(t) - dbName = db - tableName = tblName - - case Token("TOK_ALTERTABLE_COMPACT", child :: Nil) => - compactionType = BaseSemanticAnalyzer.unescapeSQLString(child.getText) - - case _ => // Unsupport features - } - - val altertablemodel = AlterTableModel(dbName, - tableName, - None, - compactionType, - Some(System.currentTimeMillis()), - alterSql) - AlterTableCompaction(altertablemodel) - } - } - - protected lazy val loadDataNew: Parser[LogicalPlan] = - LOAD ~> DATA ~> opt(LOCAL) ~> INPATH ~> stringLit ~ opt(OVERWRITE) ~ - (INTO ~> TABLE ~> (ident <~ ".").? ~ ident) ~ - (OPTIONS ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~ opt(";") ^^ { - case filePath ~ isOverwrite ~ table ~ optionsList => - val (databaseNameOp, tableName) = table match { - case databaseName ~ tableName => (databaseName, tableName.toLowerCase()) - } - if (optionsList.isDefined) { - validateOptions(optionsList) - } - val optionsMap = optionsList.getOrElse(List.empty[(String, String)]).toMap - LoadTable(convertDbNameToLowerCase(databaseNameOp), tableName, filePath, Seq(), optionsMap, - isOverwrite.isDefined) - } - - protected lazy val describeTable: Parser[LogicalPlan] = - ((DESCRIBE | DESC) ~> opt(EXTENDED | FORMATTED)) ~ (ident <~ ".").? ~ ident ^^ { - case ef ~ db ~ tbl => - val tblIdentifier = db match { - case Some(dbName) => - TableIdentifier(tbl.toLowerCase, Some(convertDbNameToLowerCase(dbName))) - case None => - TableIdentifier(tbl.toLowerCase, None) - } - if (ef.isDefined && "FORMATTED".equalsIgnoreCase(ef.get)) { - new DescribeFormattedCommand("describe formatted " + tblIdentifier, - tblIdentifier) - } else { - new DescribeCommand(UnresolvedRelation(tblIdentifier, None), ef.isDefined) - } - } - - protected lazy val showLoads: Parser[LogicalPlan] = - SHOW ~> SEGMENTS ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident ~ - (LIMIT ~> numericLit).? <~ - opt(";") ^^ { - case databaseName ~ tableName ~ limit => - ShowLoadsCommand(convertDbNameToLowerCase(databaseName), tableName.toLowerCase(), limit) - } - - protected lazy val deleteLoadsByID: Parser[LogicalPlan] = - DELETE ~> FROM ~ TABLE ~> (ident <~ ".").? ~ ident ~ - (WHERE ~> (SEGMENT ~ "." ~ ID) ~> IN ~> "(" ~> repsep(segmentId, ",")) <~ ")" ~ opt(";") ^^ { - case dbName ~ tableName ~ loadids => - DeleteLoadsById(loadids, convertDbNameToLowerCase(dbName), tableName.toLowerCase()) - } - - protected lazy val deleteLoadsByLoadDate: Parser[LogicalPlan] = - DELETE ~> FROM ~> TABLE ~> (ident <~ ".").? ~ ident ~ - (WHERE ~> (SEGMENT ~ "." ~ STARTTIME ~> BEFORE) ~ stringLit) <~ - opt(";") ^^ { - case database ~ table ~ condition => - condition match { - case dateField ~ dateValue => - DeleteLoadsByLoadDate(convertDbNameToLowerCase(database), - table.toLowerCase(), - dateField, - dateValue) - } - } - - protected lazy val cleanFiles: Parser[LogicalPlan] = - CLEAN ~> FILES ~> FOR ~> TABLE ~> (ident <~ ".").? ~ ident <~ opt(";") ^^ { - case databaseName ~ tableName => - CleanFiles(convertDbNameToLowerCase(databaseName), tableName.toLowerCase()) - } - - protected lazy val explainPlan: Parser[LogicalPlan] = - (EXPLAIN ~> opt(EXTENDED)) ~ startCommand ^^ { - case isExtended ~ logicalPlan => - logicalPlan match { - case plan: CreateTable => ExplainCommand(logicalPlan, extended = isExtended.isDefined) - case _ => ExplainCommand(OneRowRelation) - } - } - - protected lazy val deleteRecords: Parser[LogicalPlan] = - (DELETE ~> FROM ~> table) ~ restInput.? <~ opt(";") ^^ { - case table ~ rest => - val tableName = getTableName(table.tableIdentifier) - val alias = table.alias.getOrElse("") - DeleteRecords("select tupleId from " + tableName + " " + alias + rest.getOrElse(""), table) - } - - protected lazy val updateTable: Parser[LogicalPlan] = - UPDATE ~> table ~ - (SET ~> "(" ~> repsep(element, ",") <~ ")") ~ - ("=" ~> restInput) <~ opt(";") ^^ { - case tab ~ columns ~ rest => - val (sel, where) = splitQuery(rest) - val (selectStmt, relation) = - if (!sel.toLowerCase.startsWith("select ")) { - if (sel.trim.isEmpty) { - sys.error("At least one source column has to be specified ") - } - // only list of expression are given, need to convert that list of expressions into - // select statement on destination table - val relation = tab match { - case r@UnresolvedRelation(tableIdentifier, alias) => - updateRelation(r, tableIdentifier, alias) - case _ => tab - } - ("select " + sel + " from " + getTableName(relation.tableIdentifier) + " " + - relation.alias.get, relation) - } else { - (sel, updateRelation(tab, tab.tableIdentifier, tab.alias)) - } - UpdateTable(relation, columns, selectStmt, where) - } - protected lazy val showPartitions: Parser[LogicalPlan] = - (SHOW ~> PARTITIONS ~> table) <~ opt(";") ^^ { - case table => - val tableName = getTableName(table.tableIdentifier) - val alias = table.alias.getOrElse("") - ShowPartitions(table.tableIdentifier) - } - - private def splitQuery(query: String): (String, String) = { - val stack = scala.collection.mutable.Stack[Char]() - var foundSingleQuotes = false - var foundDoubleQuotes = false - var foundEscapeChar = false - var ignoreChar = false - var stop = false - var bracketCount = 0 - val (selectStatement, where) = query.span { - ch => { - if (stop) { - false - } else { - ignoreChar = false - if (foundEscapeChar && (ch == '\'' || ch == '\"' || ch == '\\')) { - foundEscapeChar = false - ignoreChar = true - } - // If escaped single or double quotes found, no need to consider - if (!ignoreChar) { - if (ch == '\\') { - foundEscapeChar = true - } else if (ch == '\'') { - foundSingleQuotes = !foundSingleQuotes - } else if (ch == '\"') { - foundDoubleQuotes = !foundDoubleQuotes - } - else if (ch == '(' && !foundSingleQuotes && !foundDoubleQuotes) { - bracketCount = bracketCount + 1 - stack.push(ch) - } else if (ch == ')' && !foundSingleQuotes && !foundDoubleQuotes) { - bracketCount = bracketCount + 1 - stack.pop() - if (0 == stack.size) { - stop = true - } - } - } - true - } - } - } - if (bracketCount == 0 || bracketCount % 2 != 0) { - sys.error("Parsing error, missing bracket ") - } - val select = selectStatement.trim - (select.substring(1, select.length - 1).trim -> where.trim) - } - - - protected lazy val table: Parser[UnresolvedRelation] = { - rep1sep(attributeName, ".") ~ opt(ident) ^^ { - case tableIdent ~ alias => UnresolvedRelation(tableIdent, alias) - } - } - - protected lazy val attributeName: Parser[String] = acceptMatch("attribute name", { - case lexical.Identifier(str) => str.toLowerCase - case lexical.Keyword(str) if !lexical.delimiters.contains(str) => str.toLowerCase - }) - - private def updateRelation( - r: UnresolvedRelation, - tableIdentifier: Seq[String], - alias: Option[String]): UnresolvedRelation = { - alias match { - case Some(_) => r - case _ => - val tableAlias = tableIdentifier match { - case Seq(dbName, tableName) => Some(tableName) - case Seq(tableName) => Some(tableName) - } - UnresolvedRelation(tableIdentifier, tableAlias) - } - } - - private def getTableName(tableIdentifier: Seq[String]): String = { - if (tableIdentifier.size > 1) { - tableIdentifier(0) + "." + tableIdentifier(1) - } else { - tableIdentifier(0) - } - } - - protected lazy val element: Parser[String] = - (ident <~ ".").? ~ ident ^^ { - case table ~ column => column.toLowerCase - } - - protected lazy val useDatabase: Parser[LogicalPlan] = - USE ~> ident <~ opt(";") ^^ { - case databaseName => UseDatabase(s"use ${ databaseName.toLowerCase }") - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CodeGenerateFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CodeGenerateFactory.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CodeGenerateFactory.scala deleted file mode 100644 index 065dfed..0000000 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CodeGenerateFactory.scala +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import org.apache.spark.SparkContext -import org.apache.spark.sql.catalyst.CatalystConf -import org.apache.spark.sql.catalyst.optimizer.Optimizer -import org.apache.spark.sql.catalyst.plans.logical.{Expand, LogicalPlan} -import org.apache.spark.util.{ScalaCompilerUtil, Utils} - -private[sql] class CodeGenerateFactory(version: String) { - - val optimizerFactory = if (version.equals("1.6.2") || version.equals("1.6.3")) { - ScalaCompilerUtil.compiledCode(CodeTemplates.spark1_6_OptimizerString) - .asInstanceOf[AbstractCarbonOptimizerFactory] - } else if (version.startsWith("1.6") || version.startsWith("1.5")) { - ScalaCompilerUtil.compiledCode(CodeTemplates.defaultOptimizerString) - .asInstanceOf[AbstractCarbonOptimizerFactory] - } else { - throw new UnsupportedOperationException(s"Spark version $version is not supported") - } - - val expandFactory = if (version.startsWith("1.5")) { - ScalaCompilerUtil.compiledCode(CodeTemplates.spark1_5ExpandString) - .asInstanceOf[AbstractCarbonExpandFactory] - } else if (version.startsWith("1.6")) { - new AbstractCarbonExpandFactory { - override def createExpand(expand: Expand, child: LogicalPlan): Expand = { - val loader = Utils.getContextOrSparkClassLoader - try { - val cons = loader.loadClass("org.apache.spark.sql.catalyst.plans.logical.Expand") - .getDeclaredConstructors - cons.head.setAccessible(true) - cons.head.newInstance(expand.projections, expand.output, child).asInstanceOf[Expand] - } catch { - case e: Exception => null - } - } - } - } else { - throw new UnsupportedOperationException(s"Spark version $version is not supported") - } - -} - -object CodeGenerateFactory { - - private var codeGenerateFactory: CodeGenerateFactory = _ - - def init(version: String): Unit = { - if (codeGenerateFactory == null) { - codeGenerateFactory = new CodeGenerateFactory(version) - } - } - - def getInstance(): CodeGenerateFactory = { - codeGenerateFactory - } - - def createDefaultOptimizer(conf: CatalystConf, sc: SparkContext): Optimizer = { - val name = "org.apache.spark.sql.catalyst.optimizer.DefaultOptimizer" - val loader = Utils.getContextOrSparkClassLoader - try { - val cons = loader.loadClass(name + "$").getDeclaredConstructors - cons.head.setAccessible(true) - cons.head.newInstance().asInstanceOf[Optimizer] - } catch { - case e: Exception => - loader.loadClass(name).getConstructor(classOf[CatalystConf]) - .newInstance(conf).asInstanceOf[Optimizer] - } - } - -} - -object CodeTemplates { - - val spark1_6_OptimizerString = - s""" - import org.apache.spark.sql._; - import org.apache.spark.sql.optimizer._; - import org.apache.spark.sql.catalyst.plans.logical._; - import org.apache.spark.sql.catalyst._; - import org.apache.spark.sql.catalyst.optimizer.Optimizer; - - new AbstractCarbonOptimizerFactory { - override def createOptimizer(optimizer: Optimizer, conf: CarbonSQLConf): Optimizer = { - class CarbonOptimizer1(optimizer: Optimizer, conf: CarbonSQLConf) - extends Optimizer(conf) { - override val batches = Nil; - override def execute(plan: LogicalPlan): LogicalPlan = { - CarbonOptimizer.execute(plan, optimizer); - } - } - new CarbonOptimizer1(optimizer, conf); - } - } - """ - - val defaultOptimizerString = - s""" - import org.apache.spark.sql._; - import org.apache.spark.sql.optimizer._; - import org.apache.spark.sql.catalyst.plans.logical._; - import org.apache.spark.sql.catalyst._; - import org.apache.spark.sql.catalyst.optimizer.Optimizer; - - new AbstractCarbonOptimizerFactory { - override def createOptimizer(optimizer: Optimizer, conf: CarbonSQLConf): Optimizer = { - class CarbonOptimizer2(optimizer: Optimizer, conf: CarbonSQLConf) extends Optimizer { - val batches = Nil; - override def execute(plan: LogicalPlan): LogicalPlan = { - CarbonOptimizer.execute(plan, optimizer); - } - } - new CarbonOptimizer2(optimizer, conf); - } - } - """ - - val spark1_5ExpandString = - s""" - import org.apache.spark.sql._ - import org.apache.spark.sql.catalyst.plans.logical.{Expand, LogicalPlan} - new AbstractCarbonExpandFactory { - override def createExpand(expand: Expand, child: LogicalPlan): Expand = { - Expand(expand.bitmasks, expand.groupByExprs, expand.gid, child) - } - } - """ -} - -abstract class AbstractCarbonOptimizerFactory { - def createOptimizer(optimizer: Optimizer, conf: CarbonSQLConf) : Optimizer -} - -abstract class AbstractCarbonExpandFactory { - def createExpand(expand: Expand, child: LogicalPlan) : Expand -} - http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala deleted file mode 100644 index d745be2..0000000 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CustomDeterministicExpression.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode} -import org.apache.spark.sql.types.{DataType, StringType} - -/** - * Custom expression to override the deterministic property - * - */ -case class CustomDeterministicExpression(nonDt: Expression ) extends Expression with Serializable{ - override def nullable: Boolean = true - - override def eval(input: InternalRow): Any = null - - override protected def genCode(ctx: CodeGenContext, - ev: GeneratedExpressionCode): String = ev.code - override def deterministic: Boolean = true - - override def dataType: DataType = StringType - - override def children: Seq[Expression] = Seq() -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala b/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala deleted file mode 100644 index dc2dd7b..0000000 --- a/integration/spark/src/main/scala/org/apache/spark/sql/SparkUnknownExpression.scala +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import scala.collection.JavaConverters._ - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Expression => SparkExpression, GenericMutableRow} - -import org.apache.carbondata.core.scan.expression.{ColumnExpression, Expression, ExpressionResult, UnknownExpression} -import org.apache.carbondata.core.scan.expression.conditional.ConditionalExpression -import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException -import org.apache.carbondata.core.scan.filter.intf.{ExpressionType, RowIntf} -import org.apache.carbondata.spark.util.CarbonScalaUtil - -class SparkUnknownExpression(var sparkExp: SparkExpression) - extends UnknownExpression with ConditionalExpression { - - private var evaluateExpression: (InternalRow) => Any = sparkExp.eval - private var isExecutor: Boolean = false - children.addAll(getColumnList()) - - override def evaluate(carbonRowInstance: RowIntf): ExpressionResult = { - - val values = carbonRowInstance.getValues.toSeq.map { - case s: String => org.apache.spark.unsafe.types.UTF8String.fromString(s) - case d: java.math.BigDecimal => org.apache.spark.sql.types.Decimal.apply(d) - case value => value - } - try { - val result = evaluateExpression( - new GenericMutableRow(values.map(a => a.asInstanceOf[Any]).toArray)) - val sparkRes = if (isExecutor) { - result.asInstanceOf[InternalRow].get(0, sparkExp.dataType) - } else { - result - } - new ExpressionResult(CarbonScalaUtil.convertSparkToCarbonDataType(sparkExp.dataType), - sparkRes - ) - } catch { - case e: Exception => throw new FilterUnsupportedException(e.getMessage) - } - } - - override def getFilterExpressionType: ExpressionType = { - ExpressionType.UNKNOWN - } - - override def getString: String = { - sparkExp.toString() - } - - def setEvaluateExpression(evaluateExpression: (InternalRow) => Any): Unit = { - this.evaluateExpression = evaluateExpression - isExecutor = true - } - - override def findAndSetChild(oldExpr: Expression, newExpr: Expression): Unit = {} - - def getColumnList: java.util.List[ColumnExpression] = { - - val lst = new java.util.ArrayList[ColumnExpression]() - getColumnListFromExpressionTree(sparkExp, lst) - lst - } - def getLiterals: java.util.List[ExpressionResult] = { - - val lst = new java.util.ArrayList[ExpressionResult]() - lst - } - - def getAllColumnList: java.util.List[ColumnExpression] = { - val lst = new java.util.ArrayList[ColumnExpression]() - getAllColumnListFromExpressionTree(sparkExp, lst) - lst - } - - def isSingleColumn: Boolean = { - val lst = new java.util.ArrayList[ColumnExpression]() - getAllColumnListFromExpressionTree(sparkExp, lst) - if (lst.size == 1 && lst.get(0).isDimension) { - true - } else { - false - } - } - - def getColumnListFromExpressionTree(sparkCurrentExp: SparkExpression, - list: java.util.List[ColumnExpression]): Unit = { - sparkCurrentExp match { - case carbonBoundRef: CarbonBoundReference => - val foundExp = list.asScala - .find(p => p.getColumnName == carbonBoundRef.colExp.getColumnName) - if (foundExp.isEmpty) { - carbonBoundRef.colExp.setColIndex(list.size) - list.add(carbonBoundRef.colExp) - } else { - carbonBoundRef.colExp.setColIndex(foundExp.get.getColIndex) - } - case _ => sparkCurrentExp.children.foreach(getColumnListFromExpressionTree(_, list)) - } - } - - - def getAllColumnListFromExpressionTree(sparkCurrentExp: SparkExpression, - list: java.util.List[ColumnExpression]): java.util.List[ColumnExpression] = { - sparkCurrentExp match { - case carbonBoundRef: CarbonBoundReference => list.add(carbonBoundRef.colExp) - case _ => sparkCurrentExp.children.foreach(getColumnListFromExpressionTree(_, list)) - } - list - } - -}