http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetadataUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetadataUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetadataUtil.scala deleted file mode 100644 index cd42fba..0000000 --- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetadataUtil.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.hive - -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.catalyst.SqlParser - -import org.apache.carbondata.common.logging.LogServiceFactory - - -/** - * This class contains all carbon hive metadata related utilities - */ -object CarbonHiveMetadataUtil { - - @transient - val LOGGER = LogServiceFactory.getLogService(CarbonHiveMetadataUtil.getClass.getName) - - - /** - * This method invalidates the table from HiveMetastoreCatalog before dropping table - * - * @param databaseName - * @param tableName - * @param sqlContext - */ - def invalidateAndDropTable(databaseName: String, - tableName: String, - sqlContext: SQLContext): Unit = { - val hiveContext = sqlContext.asInstanceOf[HiveContext] - val tableWithDb = databaseName + "." + tableName - val tableIdent = SqlParser.parseTableIdentifier(tableWithDb) - try { - hiveContext.catalog.invalidateTable(tableIdent) - hiveContext.runSqlHive(s"DROP TABLE IF EXISTS $databaseName.$tableName") - } catch { - case e: Exception => - LOGGER.audit( - s"Error While deleting the table $databaseName.$tableName during drop carbon table" + - e.getMessage) - } - } - -}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala deleted file mode 100644 index 4d5d39a..0000000 --- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala +++ /dev/null @@ -1,562 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import java.io._ -import java.util.UUID - -import scala.Array.canBuildFrom -import scala.collection.mutable.ArrayBuffer -import scala.language.implicitConversions -import scala.util.parsing.combinator.RegexParsers - -import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.NoSuchTableException -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.command.Partitioner -import org.apache.spark.sql.hive.client.ClientInterface -import org.apache.spark.sql.types._ - -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datamap.DataMapStoreManager -import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.datastore.impl.FileFactory.FileType -import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier} -import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl -import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.reader.ThriftReader -import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants} -import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil} -import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} -import org.apache.carbondata.core.writer.ThriftWriter -import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo} -import org.apache.carbondata.processing.merger.TableMeta - -case class MetaData(var tablesMeta: ArrayBuffer[TableMeta]) - -case class CarbonMetaData(dims: Seq[String], - msrs: Seq[String], - carbonTable: CarbonTable, - dictionaryMap: DictionaryMap) - -object CarbonMetastore { - - def readSchemaFileToThriftTable(schemaFilePath: String): TableInfo = { - val createTBase = new ThriftReader.TBaseCreator() { - override def create(): org.apache.thrift.TBase[TableInfo, TableInfo._Fields] = { - new TableInfo() - } - } - val thriftReader = new ThriftReader(schemaFilePath, createTBase) - var tableInfo: TableInfo = null - try { - thriftReader.open() - tableInfo = thriftReader.read().asInstanceOf[TableInfo] - } finally { - thriftReader.close() - } - tableInfo - } - - def writeThriftTableToSchemaFile(schemaFilePath: String, tableInfo: TableInfo): Unit = { - val thriftWriter = new ThriftWriter(schemaFilePath, false) - try { - thriftWriter.open() - thriftWriter.write(tableInfo); - } finally { - thriftWriter.close() - } - } - -} - -case class DictionaryMap(dictionaryMap: Map[String, Boolean]) { - def get(name: String): Option[Boolean] = { - dictionaryMap.get(name.toLowerCase) - } -} - -class CarbonMetastore(hiveContext: HiveContext, val storePath: String, - client: ClientInterface, queryId: String) extends HiveMetastoreCatalog(client, hiveContext) { - - @transient - val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog") - - val tableModifiedTimeStore = new java.util.HashMap[String, Long]() - tableModifiedTimeStore - .put(CarbonCommonConstants.DATABASE_DEFAULT_NAME, System.currentTimeMillis()) - - val metadata = loadMetadata(storePath) - - def getTableCreationTime(databaseName: String, tableName: String): Long = { - val tableMeta = metadata.tablesMeta.filter( - c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(databaseName) && - c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName)) - val tableCreationTime = tableMeta.head.carbonTable.getTableLastUpdatedTime - tableCreationTime - } - - def lookupRelation1(dbName: Option[String], - tableName: String)(sqlContext: SQLContext): LogicalPlan = { - lookupRelation1(TableIdentifier(tableName, dbName))(sqlContext) - } - - def lookupRelation1(tableIdentifier: TableIdentifier, - alias: Option[String] = None)(sqlContext: SQLContext): LogicalPlan = { - checkSchemasModifiedTimeAndReloadTables() - val database = tableIdentifier.database.getOrElse(getDB.getDatabaseName(None, sqlContext)) - val tables = getTableFromMetadata(database, tableIdentifier.table) - tables match { - case Some(t) => - CarbonRelation(database, tableIdentifier.table, - CarbonSparkUtil.createSparkMeta(tables.head.carbonTable), tables.head, alias)(sqlContext) - case None => - LOGGER.audit(s"Table Not Found: ${tableIdentifier.table}") - throw new NoSuchTableException - } - } - - /** - * This method will search for a table in the catalog metadata - * - * @param database - * @param tableName - * @return - */ - def getTableFromMetadata(database: String, - tableName: String): Option[TableMeta] = { - metadata.tablesMeta - .find(c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) && - c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName)) - } - - def tableExists(identifier: TableIdentifier)(sqlContext: SQLContext): Boolean = { - checkSchemasModifiedTimeAndReloadTables() - val database = identifier.database.getOrElse(getDB.getDatabaseName(None, sqlContext)) - val tables = metadata.tablesMeta.filter( - c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) && - c.carbonTableIdentifier.getTableName.equalsIgnoreCase(identifier.table)) - tables.nonEmpty - } - - def loadMetadata(metadataPath: String): MetaData = { - val recorder = CarbonTimeStatisticsFactory.createDriverRecorder() - val statistic = new QueryStatistic() - // creating zookeeper instance once. - // if zookeeper is configured as carbon lock type. - val zookeeperurl = hiveContext.getConf(CarbonCommonConstants.ZOOKEEPER_URL, null) - if (null != zookeeperurl) { - CarbonProperties.getInstance - .addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl) - } - if (metadataPath == null) { - return null - } - // if no locktype is configured and store type is HDFS set HDFS lock as default - if (null == CarbonProperties.getInstance - .getProperty(CarbonCommonConstants.LOCK_TYPE) && - FileType.HDFS == FileFactory.getFileType(metadataPath)) { - CarbonProperties.getInstance - .addProperty(CarbonCommonConstants.LOCK_TYPE, - CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS - ) - LOGGER.info("Default lock type HDFSLOCK is configured") - } - val fileType = FileFactory.getFileType(metadataPath) - val metaDataBuffer = new ArrayBuffer[TableMeta] - fillMetaData(metadataPath, fileType, metaDataBuffer) - updateSchemasUpdatedTime(readSchemaFileSystemTime("", "")) - statistic.addStatistics(QueryStatisticsConstants.LOAD_META, - System.currentTimeMillis()) - recorder.recordStatisticsForDriver(statistic, queryId) - MetaData(metaDataBuffer) - } - - private def fillMetaData(basePath: String, fileType: FileType, - metaDataBuffer: ArrayBuffer[TableMeta]): Unit = { - val databasePath = basePath // + "/schemas" - try { - if (FileFactory.isFileExist(databasePath, fileType)) { - val file = FileFactory.getCarbonFile(databasePath, fileType) - val databaseFolders = file.listFiles() - - databaseFolders.foreach(databaseFolder => { - if (databaseFolder.isDirectory) { - val dbName = databaseFolder.getName - val tableFolders = databaseFolder.listFiles() - - tableFolders.foreach(tableFolder => { - if (tableFolder.isDirectory) { - val carbonTableIdentifier = new CarbonTableIdentifier(databaseFolder.getName, - tableFolder.getName, UUID.randomUUID().toString) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(basePath, - carbonTableIdentifier) - val tableMetadataFile = carbonTablePath.getSchemaFilePath - - if (FileFactory.isFileExist(tableMetadataFile, fileType)) { - val tableName = tableFolder.getName - val tableUniqueName = databaseFolder.getName + "_" + tableFolder.getName - - - val createTBase = new ThriftReader.TBaseCreator() { - override def create(): org.apache.thrift.TBase[TableInfo, TableInfo._Fields] = { - new TableInfo() - } - } - val thriftReader = new ThriftReader(tableMetadataFile, createTBase) - thriftReader.open() - val tableInfo: TableInfo = thriftReader.read().asInstanceOf[TableInfo] - thriftReader.close() - - val schemaConverter = new ThriftWrapperSchemaConverterImpl - val wrapperTableInfo = schemaConverter - .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, basePath) - val schemaFilePath = CarbonStorePath - .getCarbonTablePath(storePath, carbonTableIdentifier).getSchemaFilePath - wrapperTableInfo.setStorePath(storePath) - wrapperTableInfo - .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath)) - CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo) - val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) - metaDataBuffer += new TableMeta(carbonTable.getCarbonTableIdentifier, storePath, - null, carbonTable) - } - } - }) - } - }) - } else { - // Create folders and files. - FileFactory.mkdirs(databasePath, fileType) - } - } catch { - case s: java.io.FileNotFoundException => - // Create folders and files. - FileFactory.mkdirs(databasePath, fileType) - } - } - - /** - * - * Prepare Thrift Schema from wrapper TableInfo and write to Schema file. - * Load CarbonTable from wrapper tableinfo - * - */ - def createTableFromThrift( - tableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo, - dbName: String, tableName: String, partitioner: Partitioner) - (sqlContext: SQLContext): String = { - if (tableExists(TableIdentifier(tableName, Some(dbName)))(sqlContext)) { - sys.error(s"Table [$tableName] already exists under Database [$dbName]") - } - val schemaConverter = new ThriftWrapperSchemaConverterImpl - val thriftTableInfo = schemaConverter - .fromWrapperToExternalTableInfo(tableInfo, dbName, tableName) - val schemaEvolutionEntry = new SchemaEvolutionEntry(tableInfo.getLastUpdatedTime) - thriftTableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history - .add(schemaEvolutionEntry) - - val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, - tableInfo.getFactTable.getTableId) - val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier) - val schemaFilePath = carbonTablePath.getSchemaFilePath - val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath) - tableInfo.setMetaDataFilepath(schemaMetadataPath) - tableInfo.setStorePath(storePath) - CarbonMetadata.getInstance().loadTableMetadata(tableInfo) - val tableMeta = new TableMeta(carbonTableIdentifier, storePath, null, - CarbonMetadata.getInstance().getCarbonTable(dbName + "_" + tableName)) - - val fileType = FileFactory.getFileType(schemaMetadataPath) - if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) { - FileFactory.mkdirs(schemaMetadataPath, fileType) - } - - val thriftWriter = new ThriftWriter(schemaFilePath, false) - thriftWriter.open() - thriftWriter.write(thriftTableInfo) - thriftWriter.close() - - metadata.tablesMeta += tableMeta - logInfo(s"Table $tableName for Database $dbName created successfully.") - LOGGER.info(s"Table $tableName for Database $dbName created successfully.") - updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName)) - carbonTablePath.getPath - } - - private def updateMetadataByWrapperTable( - wrapperTableInfo: org.apache.carbondata.core.metadata.schema.table.TableInfo): Unit = { - - CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo) - val carbonTable = CarbonMetadata.getInstance().getCarbonTable( - wrapperTableInfo.getTableUniqueName) - for (i <- metadata.tablesMeta.indices) { - if (wrapperTableInfo.getTableUniqueName.equals( - metadata.tablesMeta(i).carbonTableIdentifier.getTableUniqueName)) { - metadata.tablesMeta(i).carbonTable = carbonTable - } - } - } - - def updateMetadataByThriftTable(schemaFilePath: String, - tableInfo: TableInfo, dbName: String, tableName: String, storePath: String): Unit = { - - tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0) - .setTime_stamp(System.currentTimeMillis()) - val schemaConverter = new ThriftWrapperSchemaConverterImpl - val wrapperTableInfo = schemaConverter - .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, storePath) - wrapperTableInfo - .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath)) - wrapperTableInfo.setStorePath(storePath) - updateMetadataByWrapperTable(wrapperTableInfo) - } - - /** - * Shows all tables for given schema. - */ - def getTables(databaseName: Option[String])(sqlContext: SQLContext): Seq[(String, Boolean)] = { - - val dbName = - databaseName.getOrElse(sqlContext.asInstanceOf[HiveContext].catalog.client.currentDatabase) - checkSchemasModifiedTimeAndReloadTables() - metadata.tablesMeta.filter { c => - c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(dbName) - }.map { c => (c.carbonTableIdentifier.getTableName, false) } - } - - def isTablePathExists(tableIdentifier: TableIdentifier)(sqlContext: SQLContext): Boolean = { - val dbName = tableIdentifier.database.getOrElse(getDB.getDatabaseName(None, sqlContext)) - val tableName = tableIdentifier.table - - val tablePath = CarbonStorePath.getCarbonTablePath(this.storePath, - new CarbonTableIdentifier(dbName, tableName, "")).getPath - - val fileType = FileFactory.getFileType(tablePath) - FileFactory.isFileExist(tablePath, fileType) - } - - def dropTable(tableStorePath: String, tableIdentifier: TableIdentifier) - (sqlContext: SQLContext) { - val dbName = tableIdentifier.database.get - val tableName = tableIdentifier.table - - val metadataFilePath = CarbonStorePath.getCarbonTablePath(tableStorePath, - new CarbonTableIdentifier(dbName, tableName, "")).getMetadataDirectoryPath - - val fileType = FileFactory.getFileType(metadataFilePath) - - if (FileFactory.isFileExist(metadataFilePath, fileType)) { - // while drop we should refresh the schema modified time so that if any thing has changed - // in the other beeline need to update. - checkSchemasModifiedTimeAndReloadTables - val file = FileFactory.getCarbonFile(metadataFilePath, fileType) - CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile) - val metadataToBeRemoved: Option[TableMeta] = getTableFromMetadata(dbName, - tableIdentifier.table) - metadataToBeRemoved match { - case Some(tableMeta) => - metadata.tablesMeta -= tableMeta - CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName) - CarbonMetadata.getInstance.removeTable(dbName + "_" + tableName) - updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName)) - case None => - logInfo(s"Metadata does not contain entry for table $tableName in database $dbName") - } - CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sqlContext) - // discard cached table info in cachedDataSourceTables - sqlContext.catalog.refreshTable(tableIdentifier) - DataMapStoreManager.getInstance(). - clearDataMap(AbsoluteTableIdentifier.from(storePath, dbName, tableName)) - } - } - - private def getTimestampFileAndType(databaseName: String, tableName: String) = { - val timestampFile = storePath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE - val timestampFileType = FileFactory.getFileType(timestampFile) - (timestampFile, timestampFileType) - } - - /** - * This method will put the updated timestamp of schema file in the table modified time store map - * - * @param timeStamp - */ - def updateSchemasUpdatedTime(timeStamp: Long) { - tableModifiedTimeStore.put("default", timeStamp) - } - - /** - * This method will read the timestamp of empty schema file - * - * @param databaseName - * @param tableName - * @return - */ - def readSchemaFileSystemTime(databaseName: String, tableName: String): Long = { - val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName) - if (FileFactory.isFileExist(timestampFile, timestampFileType)) { - FileFactory.getCarbonFile(timestampFile, timestampFileType).getLastModifiedTime - } else { - System.currentTimeMillis() - } - } - - /** - * This method will check and create an empty schema timestamp file - * - * @param databaseName - * @param tableName - * @return - */ - def touchSchemaFileSystemTime(databaseName: String, tableName: String): Long = { - val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName) - if (!FileFactory.isFileExist(timestampFile, timestampFileType)) { - LOGGER.audit(s"Creating timestamp file for $databaseName.$tableName") - FileFactory.createNewFile(timestampFile, timestampFileType) - } - val systemTime = System.currentTimeMillis() - FileFactory.getCarbonFile(timestampFile, timestampFileType) - .setLastModifiedTime(systemTime) - systemTime - } - - def checkSchemasModifiedTimeAndReloadTables() { - val (timestampFile, timestampFileType) = getTimestampFileAndType("", "") - if (FileFactory.isFileExist(timestampFile, timestampFileType)) { - if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType). - getLastModifiedTime == - tableModifiedTimeStore.get(CarbonCommonConstants.DATABASE_DEFAULT_NAME))) { - refreshCache() - } - } - } - - def refreshCache() { - metadata.tablesMeta = loadMetadata(storePath).tablesMeta - } - - def getSchemaLastUpdatedTime(databaseName: String, tableName: String): Long = { - var schemaLastUpdatedTime = System.currentTimeMillis - val (timestampFile, timestampFileType) = getTimestampFileAndType(databaseName, tableName) - if (FileFactory.isFileExist(timestampFile, timestampFileType)) { - schemaLastUpdatedTime = FileFactory.getCarbonFile(timestampFile, timestampFileType) - .getLastModifiedTime - } - schemaLastUpdatedTime - } - - def createDatabaseDirectory(dbName: String) { - val databasePath = storePath + File.separator + dbName - val fileType = FileFactory.getFileType(databasePath) - FileFactory.mkdirs(databasePath, fileType) - } - - def dropDatabaseDirectory(dbName: String) { - val databasePath = storePath + File.separator + dbName - val fileType = FileFactory.getFileType(databasePath) - if (FileFactory.isFileExist(databasePath, fileType)) { - val dbPath = FileFactory.getCarbonFile(databasePath, fileType) - CarbonUtil.deleteFoldersAndFiles(dbPath) - } - } - -} - - -object CarbonMetastoreTypes extends RegexParsers { - protected lazy val primitiveType: Parser[DataType] = - "string" ^^^ StringType | - "float" ^^^ FloatType | - "int" ^^^ IntegerType | - "tinyint" ^^^ ShortType | - "short" ^^^ ShortType | - "double" ^^^ DoubleType | - "long" ^^^ LongType | - "binary" ^^^ BinaryType | - "boolean" ^^^ BooleanType | - fixedDecimalType | - "decimal" ^^^ "decimal" ^^^ DecimalType(18, 2) | - "varchar\\((\\d+)\\)".r ^^^ StringType | - "timestamp" ^^^ TimestampType | - "date" ^^^ DateType | - "char\\((\\d+)\\)".r ^^^ StringType - - protected lazy val fixedDecimalType: Parser[DataType] = - "decimal" ~> "(" ~> "^[1-9]\\d*".r ~ ("," ~> "^[0-9]\\d*".r <~ ")") ^^ { - case precision ~ scale => - DecimalType(precision.toInt, scale.toInt) - } - - protected lazy val arrayType: Parser[DataType] = - "array" ~> "<" ~> dataType <~ ">" ^^ { - case tpe => ArrayType(tpe) - } - - protected lazy val mapType: Parser[DataType] = - "map" ~> "<" ~> dataType ~ "," ~ dataType <~ ">" ^^ { - case t1 ~ _ ~ t2 => MapType(t1, t2) - } - - protected lazy val structField: Parser[StructField] = - "[a-zA-Z0-9_]*".r ~ ":" ~ dataType ^^ { - case name ~ _ ~ tpe => StructField(name, tpe, nullable = true) - } - - protected lazy val structType: Parser[DataType] = - "struct" ~> "<" ~> repsep(structField, ",") <~ ">" ^^ { - case fields => StructType(fields) - } - - protected lazy val dataType: Parser[DataType] = - arrayType | - mapType | - structType | - primitiveType - - def toDataType(metastoreType: String): DataType = { - parseAll(dataType, metastoreType) match { - case Success(result, _) => result - case failure: NoSuccess => sys.error(s"Unsupported dataType: $metastoreType") - } - } - - def toMetastoreType(dt: DataType): String = { - dt match { - case ArrayType(elementType, _) => s"array<${ toMetastoreType(elementType) }>" - case StructType(fields) => - s"struct<${ - fields.map(f => s"${ f.name }:${ toMetastoreType(f.dataType) }") - .mkString(",") - }>" - case StringType => "string" - case FloatType => "float" - case IntegerType => "int" - case ShortType => "tinyint" - case DoubleType => "double" - case LongType => "bigint" - case BinaryType => "binary" - case BooleanType => "boolean" - case DecimalType() => "decimal" - case DateType => "date" - case TimestampType => "timestamp" - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSQLDialect.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSQLDialect.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSQLDialect.scala deleted file mode 100644 index 82c5f7f..0000000 --- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSQLDialect.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import org.apache.spark.sql.CarbonSqlParser -import org.apache.spark.sql.catalyst.ParserDialect -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan - -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException - -private[spark] class CarbonSQLDialect(hiveContext: HiveContext) extends ParserDialect { - - @transient - protected val sqlParser = new CarbonSqlParser - - override def parse(sqlText: String): LogicalPlan = { - - try { - sqlParser.parse(sqlText) - } catch { - // MalformedCarbonCommandException need to throw directly - // because hive can no parse carbon command - case ce: MalformedCarbonCommandException => - throw ce - case _: Throwable => - HiveQl.parseSql(sqlText) - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala deleted file mode 100644 index d3d699a..0000000 --- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala +++ /dev/null @@ -1,370 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import java.util - -import scala.collection.JavaConverters._ - -import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._ -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.expressions -import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ScalaUDF, _} -import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, QueryPlanner} -import org.apache.spark.sql.catalyst.plans.logical.{Filter => LogicalFilter, LogicalPlan} -import org.apache.spark.sql.execution.{ExecutedCommand, Filter, Project, SparkPlan} -import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.datasources.{DescribeCommand => LogicalDescribeCommand, LogicalRelation} -import org.apache.spark.sql.hive.execution.{DropTable, HiveNativeCommand} -import org.apache.spark.sql.hive.execution.command._ -import org.apache.spark.sql.optimizer.CarbonDecoderRelation -import org.apache.spark.sql.types.{IntegerType, StringType} - -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.spark.CarbonAliasDecoderRelation -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException - -class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] { - - override def strategies: Seq[Strategy] = getStrategies - - val LOGGER = LogServiceFactory.getLogService("CarbonStrategies") - - def getStrategies: Seq[Strategy] = { - val total = sqlContext.planner.strategies :+ CarbonTableScan - total - } - - /** - * Carbon strategies for performing late materizlization (decoding dictionary key - * as late as possbile) - */ - private[sql] object CarbonTableScan extends Strategy { - - def apply(plan: LogicalPlan): Seq[SparkPlan] = { - plan match { - case PhysicalOperation(projectList, predicates, l: LogicalRelation) - if l.relation.isInstanceOf[CarbonDatasourceRelation] => - if (isStarQuery(plan)) { - carbonRawScanForStarQuery(projectList, predicates, l)(sqlContext) :: Nil - } else { - carbonRawScan(projectList, predicates, l)(sqlContext) :: Nil - } - case InsertIntoCarbonTable(relation: CarbonDatasourceRelation, - _, child: LogicalPlan, overwrite, _) => - ExecutedCommand(LoadTableByInsert(relation, child, overwrite)) :: Nil - case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) => - CarbonDictionaryDecoder(relations, - profile, - aliasMap, - planLater(child))(sqlContext) :: Nil - case _ => - Nil - } - } - - /** - * Create carbon scan - */ - private def carbonRawScan(projectListRaw: Seq[NamedExpression], - predicates: Seq[Expression], - logicalRelation: LogicalRelation)(sc: SQLContext): SparkPlan = { - - val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceRelation] - val tableName: String = - relation.carbonRelation.metaData.carbonTable.getFactTableName.toLowerCase - // Check out any expressions are there in project list. if they are present then we need to - // decode them as well. - - val projectList = projectListRaw.map {p => - p.transform { - case CustomDeterministicExpression(exp) => exp - } - }.asInstanceOf[Seq[NamedExpression]] - val newProjectList = projectList.map { - case a@Alias(s: ScalaUDF, name) - if name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) || - name.equalsIgnoreCase( - CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) => - AttributeReference(name, StringType, true)().withExprId(a.exprId) - case a@Alias(s: ScalaUDF, name) - if name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID) => - val reference = - AttributeReference(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID, - StringType, true)().withExprId(a.exprId) - val alias = a.transform { - case s: ScalaUDF => - ScalaUDF(s.function, s.dataType, Seq(reference), s.inputTypes) - }.asInstanceOf[Alias] - Alias(alias.child, alias.name)(alias.exprId, alias.qualifiers, alias.explicitMetadata) - case other => other - } - val projectSet = AttributeSet(newProjectList.flatMap(_.references)) - val filterSet = AttributeSet(predicates.flatMap(_.references)) - val scan = CarbonScan(projectSet.toSeq, - relation.carbonRelation, - predicates)(sqlContext) - newProjectList.map { - case attr: AttributeReference => - case Alias(attr: AttributeReference, _) => - case others => - others.references.map { f => - val dictionary = relation.carbonRelation.metaData.dictionaryMap.get(f.name) - if (dictionary.isDefined && dictionary.get) { - scan.attributesNeedToDecode.add(f.asInstanceOf[AttributeReference]) - } - } - } - val scanWithDecoder = - if (scan.attributesNeedToDecode.size() > 0) { - val decoder = getCarbonDecoder(logicalRelation, - sc, - tableName, - scan.attributesNeedToDecode.asScala.toSeq, - scan) - if (scan.unprocessedExprs.nonEmpty) { - val filterCondToAdd = scan.unprocessedExprs.reduceLeftOption(expressions.And) - filterCondToAdd.map(Filter(_, decoder)).getOrElse(decoder) - } else { - decoder - } - } else { - scan - } - - if (projectList.map(_.toAttribute) == scan.columnProjection && - projectSet.size == projectList.size && - filterSet.subsetOf(projectSet)) { - // copied from spark pruneFilterProjectRaw - // When it is possible to just use column pruning to get the right projection and - // when the columns of this projection are enough to evaluate all filter conditions, - // just do a scan with no extra project. - scanWithDecoder - } else { - Project(newProjectList, scanWithDecoder) - } - } - - /** - * Create carbon scan for star query - */ - private def carbonRawScanForStarQuery(projectList: Seq[NamedExpression], - predicates: Seq[Expression], - logicalRelation: LogicalRelation)(sc: SQLContext): SparkPlan = { - val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceRelation] - val tableName: String = - relation.carbonRelation.metaData.carbonTable.getFactTableName.toLowerCase - // Check out any expressions are there in project list. if they are present then we need to - // decode them as well. - val projectExprsNeedToDecode = new java.util.HashSet[Attribute]() - val scan = CarbonScan(projectList.map(_.toAttribute), - relation.carbonRelation, - predicates, - useUnsafeCoversion = false)(sqlContext) - projectExprsNeedToDecode.addAll(scan.attributesNeedToDecode) - val updatedAttrs = scan.columnProjection.map(attr => - updateDataType(attr.asInstanceOf[AttributeReference], relation, projectExprsNeedToDecode)) - scan.columnProjection = updatedAttrs - if (projectExprsNeedToDecode.size() > 0 - && isDictionaryEncoded(projectExprsNeedToDecode.asScala.toSeq, relation)) { - val decoder = getCarbonDecoder(logicalRelation, - sc, - tableName, - projectExprsNeedToDecode.asScala.toSeq, - scan) - if (scan.unprocessedExprs.nonEmpty) { - val filterCondToAdd = scan.unprocessedExprs.reduceLeftOption(expressions.And) - filterCondToAdd.map(Filter(_, decoder)).getOrElse(decoder) - } else { - decoder - } - } else { - if (scan.unprocessedExprs.nonEmpty) { - val filterCondToAdd = scan.unprocessedExprs.reduceLeftOption(expressions.And) - filterCondToAdd.map(Filter(_, scan)).getOrElse(scan) - } else { - scan - } - } - } - - def getCarbonDecoder(logicalRelation: LogicalRelation, - sc: SQLContext, - tableName: String, - projectExprsNeedToDecode: Seq[Attribute], - scan: CarbonScan): CarbonDictionaryDecoder = { - val relation = CarbonDecoderRelation(logicalRelation.attributeMap, - logicalRelation.relation.asInstanceOf[CarbonDatasourceRelation]) - val attrs = projectExprsNeedToDecode.map { attr => - val newAttr = AttributeReference(attr.name, - attr.dataType, - attr.nullable, - attr.metadata)(attr.exprId, Seq(tableName)) - relation.addAttribute(newAttr) - newAttr - } - CarbonDictionaryDecoder(Seq(relation), IncludeProfile(attrs), - CarbonAliasDecoderRelation(), scan)(sc) - } - - def isDictionaryEncoded(projectExprsNeedToDecode: Seq[Attribute], - relation: CarbonDatasourceRelation): Boolean = { - var isEncoded = false - projectExprsNeedToDecode.foreach { attr => - if (relation.carbonRelation.metaData.dictionaryMap.get(attr.name).getOrElse(false)) { - isEncoded = true - } - } - isEncoded - } - - def updateDataType(attr: AttributeReference, - relation: CarbonDatasourceRelation, - allAttrsNotDecode: util.Set[Attribute]): AttributeReference = { - if (relation.carbonRelation.metaData.dictionaryMap.get(attr.name).getOrElse(false) && - !allAttrsNotDecode.asScala.exists(p => p.name.equals(attr.name))) { - AttributeReference(attr.name, - IntegerType, - attr.nullable, - attr.metadata)(attr.exprId, attr.qualifiers) - } else { - attr - } - } - - private def isStarQuery(plan: LogicalPlan) = { - plan match { - case LogicalFilter(condition, l: LogicalRelation) - if l.relation.isInstanceOf[CarbonDatasourceRelation] => - true - case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceRelation] => true - case _ => false - } - } - } - - object DDLStrategies extends Strategy { - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case DropTable(tableName, ifNotExists) - if CarbonEnv.get.carbonMetastore - .isTablePathExists(toTableIdentifier(tableName.toLowerCase))(sqlContext) => - val identifier = toTableIdentifier(tableName.toLowerCase) - ExecutedCommand(DropTableCommand(ifNotExists, identifier.database, identifier.table)) :: Nil - case ShowLoadsCommand(databaseName, table, limit) => - ExecutedCommand(ShowLoads(databaseName, table, limit, plan.output)) :: Nil - case LoadTable(databaseNameOp, tableName, factPathFromUser, dimFilesPath, - options, isOverwriteExist, inputSqlString, dataFrame, _) => - val isCarbonTable = CarbonEnv.get.carbonMetastore - .tableExists(TableIdentifier(tableName, databaseNameOp))(sqlContext) - if (isCarbonTable || options.nonEmpty) { - ExecutedCommand(LoadTable(databaseNameOp, tableName, factPathFromUser, dimFilesPath, - options, isOverwriteExist, inputSqlString, dataFrame)) :: Nil - } else { - ExecutedCommand(HiveNativeCommand(inputSqlString)) :: Nil - } - case alterTable@AlterTableCompaction(altertablemodel) => - val isCarbonTable = CarbonEnv.get.carbonMetastore - .tableExists(TableIdentifier(altertablemodel.tableName, - altertablemodel.dbName))(sqlContext) - if (isCarbonTable) { - if (altertablemodel.compactionType.equalsIgnoreCase("minor") || - altertablemodel.compactionType.equalsIgnoreCase("major")) { - ExecutedCommand(alterTable) :: Nil - } else { - throw new MalformedCarbonCommandException( - "Unsupported alter operation on carbon table") - } - } else { - ExecutedCommand(HiveNativeCommand(altertablemodel.alterSql)) :: Nil - } - case CreateDatabase(dbName, sql) => - ExecutedCommand(CreateDatabaseCommand(dbName, HiveNativeCommand(sql))) :: Nil - case DropDatabase(dbName, isCascade, sql) => - if (isCascade) { - ExecutedCommand(DropDatabaseCascadeCommand(dbName, HiveNativeCommand(sql))) :: Nil - } else { - ExecutedCommand(DropDatabaseCommand(dbName, HiveNativeCommand(sql))) :: Nil - } - case UseDatabase(sql) => - ExecutedCommand(HiveNativeCommand(sql)) :: Nil - case d: HiveNativeCommand => - try { - val resolvedTable = sqlContext.executePlan(CarbonHiveSyntax.parse(d.sql)).optimizedPlan - planLater(resolvedTable) :: Nil - } catch { - case ce: MalformedCarbonCommandException => - throw ce - case ae: AnalysisException => - throw ae - case e: Exception => ExecutedCommand(d) :: Nil - } - case DescribeFormattedCommand(sql, tblIdentifier) => - val isTable = CarbonEnv.get.carbonMetastore - .tableExists(tblIdentifier)(sqlContext) - if (isTable) { - val describe = - LogicalDescribeCommand(UnresolvedRelation(tblIdentifier, None), isExtended = false) - val resolvedTable = sqlContext.executePlan(describe.table).analyzed - val resultPlan = sqlContext.executePlan(resolvedTable).executedPlan - ExecutedCommand(DescribeCommandFormatted(resultPlan, plan.output, tblIdentifier)) :: Nil - } else { - ExecutedCommand(HiveNativeCommand(sql)) :: Nil - } - case ShowPartitions(t) => - val isCarbonTable = CarbonEnv.get.carbonMetastore - .tableExists(t)(sqlContext) - if (isCarbonTable) { - ExecutedCommand(ShowCarbonPartitionsCommand(t)) :: Nil - } else { - var tableName = t.table - var database = t.database - var sql: String = null - if (database.isEmpty) { - sql = s"show partitions $tableName" - } else { - sql = s"show partitions $database.$tableName" - } - ExecutedCommand(HiveNativeCommand(sql)) :: Nil - } - case _ => - Nil - } - - def toTableIdentifier(name: String): TableIdentifier = { - val identifier = name.split("\\.") - identifier match { - case Array(tableName) => TableIdentifier(tableName, None) - case Array(dbName, tableName) => TableIdentifier(tableName, Some(dbName)) - } - } - } - -} - -object CarbonHiveSyntax { - - @transient - protected val sqlParser = new CarbonSqlParser - - def parse(sqlText: String): LogicalPlan = { - sqlParser.parse(sqlText) - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/hive/HiveQlWrapper.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/HiveQlWrapper.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/HiveQlWrapper.scala deleted file mode 100644 index 6b244f9..0000000 --- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/HiveQlWrapper.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import org.apache.hadoop.hive.ql.parse.ASTNode - -/** - * Wrapper class for using the hiveQl class of hive. - */ -object HiveQlWrapper { - - def getAst(sql: String): ASTNode = { - - HiveQl.getAst(sql) - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/hive/cli/CarbonSQLCLIDriver.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/cli/CarbonSQLCLIDriver.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/cli/CarbonSQLCLIDriver.scala deleted file mode 100644 index ad70027..0000000 --- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/cli/CarbonSQLCLIDriver.scala +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.hive.cli - -import java.io.File - -import scala.collection.JavaConverters._ - -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.scheduler.StatsReportListener -import org.apache.spark.sql.CarbonContext -import org.apache.spark.sql.hive.HiveContext -import org.apache.spark.sql.hive.thriftserver.{SparkSQLCLIDriver, SparkSQLEnv} -import org.apache.spark.util.Utils - -import org.apache.carbondata.common.logging.LogServiceFactory - -object CarbonSQLCLIDriver { - - private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - var hiveContext: HiveContext = _ - var sparkContext: SparkContext = _ - - def main(args: Array[String]): Unit = { - init() - SparkSQLEnv.sparkContext = sparkContext - SparkSQLEnv.hiveContext = hiveContext - SparkSQLCLIDriver.installSignalHandler() - SparkSQLCLIDriver.main(args) - } - - def init() { - if (hiveContext == null) { - val sparkConf = new SparkConf(loadDefaults = true) - val maybeSerializer = sparkConf.getOption("spark.serializer") - val maybeKryoReferenceTracking = sparkConf.getOption("spark.kryo.referenceTracking") - // If user doesn't specify the appName, we want to get [SparkSQL::localHostName] instead of - // the default appName [CarbonSQLCLIDriver] in cli or beeline. - val maybeAppName = sparkConf - .getOption("spark.app.name") - .filterNot(_ == classOf[SparkSQLCLIDriver].getName) - val maybeStorePath = sparkConf.getOption("spark.carbon.storepath") - - sparkConf - .setAppName(maybeAppName.getOrElse(s"CarbonSparkSQL::${ Utils.localHostName() }")) - .set( - "spark.serializer", - maybeSerializer.getOrElse("org.apache.spark.serializer.KryoSerializer")) - .set( - "spark.kryo.referenceTracking", - maybeKryoReferenceTracking.getOrElse("false")) - - sparkContext = new SparkContext(sparkConf) - sparkContext.addSparkListener(new StatsReportListener()) - val path = System.getenv("CARBON_HOME") + "/bin/carbonsqlclistore" - val store = new File(path) - store.mkdirs() - hiveContext = new CarbonContext(sparkContext, - maybeStorePath.getOrElse(store.getCanonicalPath), - store.getCanonicalPath) - - hiveContext.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion) - hiveContext.hiveconf.getAllProperties.asScala.toSeq.sorted.foreach { case (k, v) => - LOGGER.debug(s"HiveConf var: $k=$v") - } - } - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala deleted file mode 100644 index 0f42940..0000000 --- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.execution.command - -import org.apache.spark.sql._ -import org.apache.spark.sql.execution.RunnableCommand -import org.apache.spark.sql.execution.command.DropTableCommand -import org.apache.spark.sql.hive.execution.HiveNativeCommand - -private[hive] case class CreateDatabaseCommand(dbName: String, - command: HiveNativeCommand) extends RunnableCommand { - def run(sqlContext: SQLContext): Seq[Row] = { - val rows = command.run(sqlContext) - CarbonEnv.get.carbonMetastore.createDatabaseDirectory(dbName) - rows - } -} - -private[hive] case class DropDatabaseCommand(dbName: String, - command: HiveNativeCommand) extends RunnableCommand { - def run(sqlContext: SQLContext): Seq[Row] = { - val rows = command.run(sqlContext) - CarbonEnv.get.carbonMetastore.dropDatabaseDirectory(dbName) - rows - } -} - -private[hive] case class DropDatabaseCascadeCommand(dbName: String, - command: HiveNativeCommand) extends RunnableCommand { - def run(sqlContext: SQLContext): Seq[Row] = { - val tablesInDB = CarbonEnv.get.carbonMetastore - .getTables(Some(dbName))(sqlContext).map(x => x._1) - val rows = command.run(sqlContext) - tablesInDB.foreach{tableName => - DropTableCommand(true, Some(dbName), tableName).run(sqlContext) - } - CarbonEnv.get.carbonMetastore.dropDatabaseDirectory(dbName) - rows - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala deleted file mode 100644 index 2e45954..0000000 --- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala +++ /dev/null @@ -1,431 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark - -import scala.collection.mutable.ArrayBuffer - -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Literal, StartsWith, _} -import org.apache.spark.sql.optimizer.AttributeReferenceWrapper -import org.apache.spark.sql.sources -import org.apache.spark.sql.types._ - -import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes} -import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn -import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression} -import org.apache.carbondata.core.scan.expression.conditional.{GreaterThanEqualToExpression, LessThanExpression, _} -import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression} -import org.apache.carbondata.spark.util.CarbonScalaUtil - -/** - * All filter conversions are done here. - */ -object CarbonFilters { - - /** - * Converts data sources filters to carbon filter predicates. - */ - def createCarbonFilter(schema: StructType, - predicate: sources.Filter): Option[CarbonExpression] = { - val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap - - def createFilter(predicate: sources.Filter): Option[CarbonExpression] = { - predicate match { - - case sources.EqualTo(name, value) => - Some(new EqualToExpression(getCarbonExpression(name), - getCarbonLiteralExpression(name, value))) - case sources.Not(sources.EqualTo(name, value)) => - Some(new NotEqualsExpression(getCarbonExpression(name), - getCarbonLiteralExpression(name, value))) - - case sources.EqualNullSafe(name, value) => - Some(new EqualToExpression(getCarbonExpression(name), - getCarbonLiteralExpression(name, value))) - case sources.Not(sources.EqualNullSafe(name, value)) => - Some(new NotEqualsExpression(getCarbonExpression(name), - getCarbonLiteralExpression(name, value))) - - case sources.GreaterThan(name, value) => - Some(new GreaterThanExpression(getCarbonExpression(name), - getCarbonLiteralExpression(name, value))) - case sources.LessThan(name, value) => - Some(new LessThanExpression(getCarbonExpression(name), - getCarbonLiteralExpression(name, value))) - case sources.GreaterThanOrEqual(name, value) => - Some(new GreaterThanEqualToExpression(getCarbonExpression(name), - getCarbonLiteralExpression(name, value))) - case sources.LessThanOrEqual(name, value) => - Some(new LessThanEqualToExpression(getCarbonExpression(name), - getCarbonLiteralExpression(name, value))) - - case sources.In(name, values) => - Some(new InExpression(getCarbonExpression(name), - new ListExpression( - convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList)))) - case sources.Not(sources.In(name, values)) => - Some(new NotInExpression(getCarbonExpression(name), - new ListExpression( - convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList)))) - - case sources.IsNull(name) => - Some(new EqualToExpression(getCarbonExpression(name), - getCarbonLiteralExpression(name, null), true)) - case sources.IsNotNull(name) => - Some(new NotEqualsExpression(getCarbonExpression(name), - getCarbonLiteralExpression(name, null), true)) - - case sources.And(lhs, rhs) => - (createFilter(lhs) ++ createFilter(rhs)).reduceOption(new AndExpression(_, _)) - - case sources.Or(lhs, rhs) => - for { - lhsFilter <- createFilter(lhs) - rhsFilter <- createFilter(rhs) - } yield { - new OrExpression(lhsFilter, rhsFilter) - } - case sources.StringStartsWith(name, value) if value.length > 0 => - val l = new GreaterThanEqualToExpression(getCarbonExpression(name), - getCarbonLiteralExpression(name, value)) - val maxValueLimit = value.substring(0, value.length - 1) + - (value.charAt(value.length - 1).toInt + 1).toChar - val r = new LessThanExpression( - getCarbonExpression(name), getCarbonLiteralExpression(name, maxValueLimit)) - Some(new AndExpression(l, r)) - case _ => None - } - } - - def getCarbonExpression(name: String) = { - new CarbonColumnExpression(name, - CarbonScalaUtil.convertSparkToCarbonDataType(dataTypeOf(name))) - } - - def getCarbonLiteralExpression(name: String, value: Any): CarbonExpression = { - val dataTypeOfAttribute = CarbonScalaUtil.convertSparkToCarbonDataType(dataTypeOf(name)) - val dataType = if (Option(value).isDefined - && dataTypeOfAttribute == CarbonDataTypes.STRING - && value.isInstanceOf[Double]) { - CarbonDataTypes.DOUBLE - } else { - dataTypeOfAttribute - } - new CarbonLiteralExpression(value, dataType) - } - - createFilter(predicate) - } - - - // Check out which filters can be pushed down to carbon, remaining can be handled in spark layer. - // Mostly dimension filters are only pushed down since it is faster in carbon. - // TODO - The Filters are first converted Intermediate sources filters expression and then these - // expressions are again converted back to CarbonExpression. Instead of two step process of - // evaluating the filters it can be merged into a single one. - def selectFilters(filters: Seq[Expression], - attrList: java.util.HashSet[AttributeReferenceWrapper], - aliasMap: CarbonAliasDecoderRelation): Unit = { - def translate(expr: Expression, or: Boolean = false): Option[sources.Filter] = { - expr match { - case or@ Or(left, right) => - - val leftFilter = translate(left, or = true) - val rightFilter = translate(right, or = true) - if (leftFilter.isDefined && rightFilter.isDefined) { - Some( sources.Or(leftFilter.get, rightFilter.get)) - } else { - or.collect { - case attr: AttributeReference => - attrList.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))) - } - None - } - - case And(left, right) => - val leftFilter = translate(left, or) - val rightFilter = translate(right, or) - if (or) { - if (leftFilter.isDefined && rightFilter.isDefined) { - (leftFilter ++ rightFilter).reduceOption(sources.And) - } else { - None - } - } else { - (leftFilter ++ rightFilter).reduceOption(sources.And) - } - - case EqualTo(a: Attribute, Literal(v, t)) => - Some(sources.EqualTo(a.name, v)) - case EqualTo(l@Literal(v, t), a: Attribute) => - Some(sources.EqualTo(a.name, v)) - case Not(EqualTo(a: Attribute, Literal(v, t))) => - Some(sources.Not(sources.EqualTo(a.name, v))) - case Not(EqualTo(Literal(v, t), a: Attribute)) => - Some(sources.Not(sources.EqualTo(a.name, v))) - case IsNotNull(a: Attribute) => - Some(sources.IsNotNull(a.name)) - case IsNull(a: Attribute) => - Some(sources.IsNull(a.name)) - case Not(In(a: Attribute, list)) if !list.exists(!_.isInstanceOf[Literal]) => - val hSet = list.map(e => e.eval(EmptyRow)) - Some(sources.Not(sources.In(a.name, hSet.toArray))) - case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) => - val hSet = list.map(e => e.eval(EmptyRow)) - Some(sources.In(a.name, hSet.toArray)) - case GreaterThan(a: Attribute, Literal(v, t)) => - Some(sources.GreaterThan(a.name, v)) - case GreaterThan(Literal(v, t), a: Attribute) => - Some(sources.LessThan(a.name, v)) - case LessThan(a: Attribute, Literal(v, t)) => - Some(sources.LessThan(a.name, v)) - case LessThan(Literal(v, t), a: Attribute) => - Some(sources.GreaterThan(a.name, v)) - case GreaterThanOrEqual(a: Attribute, Literal(v, t)) => - Some(sources.GreaterThanOrEqual(a.name, v)) - case GreaterThanOrEqual(Literal(v, t), a: Attribute) => - Some(sources.LessThanOrEqual(a.name, v)) - case LessThanOrEqual(a: Attribute, Literal(v, t)) => - Some(sources.LessThanOrEqual(a.name, v)) - case LessThanOrEqual(Literal(v, t), a: Attribute) => - Some(sources.GreaterThanOrEqual(a.name, v)) - case StartsWith(a: Attribute, Literal(v, t)) => - Some(sources.StringStartsWith(a.name, v.toString)) - - case others => - if (!or) { - others.collect { - case attr: AttributeReference => - attrList.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))) - } - } - None - } - } - filters.flatMap(translate(_, false)).toArray - } - - def isCarbonSupportedDataTypes(expr: Expression): Boolean = { - expr.dataType match { - case StringType => true - case IntegerType => true - case LongType => true - case DoubleType => true - case FloatType => true - case BooleanType => true - case TimestampType => true - case ArrayType(_, _) => true - case StructType(_) => true - case DecimalType() => true - case _ => false - } - } - - def processExpression(exprs: Seq[Expression], - attributesNeedToDecode: java.util.HashSet[AttributeReference], - unprocessedExprs: ArrayBuffer[Expression], - carbonTable: CarbonTable): Option[CarbonExpression] = { - def transformExpression(expr: Expression, or: Boolean = false): Option[CarbonExpression] = { - expr match { - case orFilter@ Or(left, right) - if (isCarbonSupportedDataTypes(left) && isCarbonSupportedDataTypes(right)) => - val leftFilter = transformExpression(left, or = true) - val rightFilter = transformExpression(right, or = true) - if (leftFilter.isDefined && rightFilter.isDefined) { - Some(new OrExpression(leftFilter.get, rightFilter.get)) - } else { - if (!or) { - orFilter.collect { - case attr: AttributeReference => attributesNeedToDecode.add(attr) - } - unprocessedExprs += orFilter - } - None - } - - case And(left, right) if (isCarbonSupportedDataTypes(left) && - isCarbonSupportedDataTypes(right)) => - val leftFilter = transformExpression(left, or) - val rightFilter = transformExpression(right, or) - if (or) { - if (leftFilter.isDefined && rightFilter.isDefined) { - (leftFilter ++ rightFilter).reduceOption(new AndExpression(_, _)) - } else { - None - } - } else { - (leftFilter ++ rightFilter).reduceOption(new AndExpression(_, _)) - } - - - case EqualTo(a: Attribute, l@Literal(v, t)) if (isCarbonSupportedDataTypes(a) && - isCarbonSupportedDataTypes(l)) => - Some( - new EqualToExpression( - transformExpression(a).get, - transformExpression(l).get - ) - ) - case EqualTo(l@Literal(v, t), a: Attribute) if (isCarbonSupportedDataTypes(l) && - isCarbonSupportedDataTypes(a)) => - Some( - new EqualToExpression( - transformExpression(a).get, - transformExpression(l).get - ) - ) - - case Not(EqualTo(a: Attribute, l@Literal(v, t))) if (isCarbonSupportedDataTypes(a) && - isCarbonSupportedDataTypes(l)) => - Some( - new NotEqualsExpression( - transformExpression(a).get, - transformExpression(l).get - ) - ) - case Not(EqualTo(l@Literal(v, t), a: Attribute)) if (isCarbonSupportedDataTypes(l) && - isCarbonSupportedDataTypes(a)) => - Some( - new NotEqualsExpression( - transformExpression(a).get, - transformExpression(l).get - ) - ) - case IsNotNull(child: Attribute) if (isCarbonSupportedDataTypes(child)) => - Some(new NotEqualsExpression(transformExpression(child).get, - transformExpression(Literal(null)).get, true)) - case IsNull(child: Attribute) if (isCarbonSupportedDataTypes(child)) => - Some(new EqualToExpression(transformExpression(child).get, - transformExpression(Literal(null)).get, true)) - case Not(In(a: Attribute, list)) - if !list.exists(!_.isInstanceOf[Literal]) && isCarbonSupportedDataTypes(a) => - if (list.exists(x => isNullLiteral(x.asInstanceOf[Literal]))) { - Some(new FalseExpression(transformExpression(a).get)) - } else { - Some(new NotInExpression(transformExpression(a).get, - new ListExpression(convertToJavaList(list.map(transformExpression(_).get))))) - } - case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) && - isCarbonSupportedDataTypes(a) => - Some(new InExpression(transformExpression(a).get, - new ListExpression(convertToJavaList(list - .map(transformExpression(_).get))))) - - case GreaterThan(a: Attribute, l@Literal(v, t)) - if (isCarbonSupportedDataTypes(a) && isCarbonSupportedDataTypes(l)) => - Some(new GreaterThanExpression(transformExpression(a).get, transformExpression(l).get)) - case GreaterThan(l@Literal(v, t), a: Attribute) - if (isCarbonSupportedDataTypes(l) && isCarbonSupportedDataTypes(a)) => - Some(new LessThanExpression(transformExpression(a).get, transformExpression(l).get)) - - case LessThan(a: Attribute, l@Literal(v, t)) - if (isCarbonSupportedDataTypes(a) && isCarbonSupportedDataTypes(l)) => - Some(new LessThanExpression(transformExpression(a).get, transformExpression(l).get)) - case LessThan(l@Literal(v, t), a: Attribute) - if (isCarbonSupportedDataTypes(l) && isCarbonSupportedDataTypes(a)) => - Some(new GreaterThanExpression(transformExpression(a).get, transformExpression(l).get)) - - case GreaterThanOrEqual(a: Attribute, l@Literal(v, t)) - if (isCarbonSupportedDataTypes(a) && isCarbonSupportedDataTypes(l)) => - Some(new GreaterThanEqualToExpression(transformExpression(a).get, - transformExpression(l).get)) - case GreaterThanOrEqual(l@Literal(v, t), a: Attribute) - if (isCarbonSupportedDataTypes(l) && isCarbonSupportedDataTypes(a)) => - Some(new LessThanEqualToExpression(transformExpression(a).get, - transformExpression(l).get)) - - case LessThanOrEqual(a: Attribute, l@Literal(v, t)) - if (isCarbonSupportedDataTypes(a) && isCarbonSupportedDataTypes(l)) => - Some(new LessThanEqualToExpression(transformExpression(a).get, - transformExpression(l).get)) - case LessThanOrEqual(l@Literal(v, t), a: Attribute) - if (isCarbonSupportedDataTypes(l) && isCarbonSupportedDataTypes(a)) => - Some(new GreaterThanEqualToExpression(transformExpression(a).get, - transformExpression(l).get)) - - case AttributeReference(name, dataType, _, _) => - Some(new CarbonColumnExpression(name, - CarbonScalaUtil.convertSparkToCarbonDataType( - getActualCarbonDataType(name, carbonTable)))) - case Literal(name, dataType) => Some(new - CarbonLiteralExpression(name, CarbonScalaUtil.convertSparkToCarbonDataType(dataType))) - case StartsWith(left : Attribute, right@Literal(pattern, dataType)) if - pattern.toString.size > 0 && - isCarbonSupportedDataTypes - (left) && - isCarbonSupportedDataTypes - (right) => - val l = new GreaterThanEqualToExpression(transformExpression(left).get, - transformExpression(right).get) - val maxValueLimit = pattern.toString.substring(0, pattern.toString.length - 1) + - (pattern.toString.charAt(pattern.toString.length - 1).toInt + 1) - .toChar - val r = new LessThanExpression( - transformExpression(left).get, - new CarbonLiteralExpression(maxValueLimit, - CarbonScalaUtil.convertSparkToCarbonDataType(dataType))) - Some(new AndExpression(l, r)) - case others => - if (!or) { - others.collect { - case attr: AttributeReference => attributesNeedToDecode.add(attr) - } - unprocessedExprs += others - } - None - } - } - exprs.flatMap(transformExpression(_, false)).reduceOption(new AndExpression(_, _)) - } - private def isNullLiteral(exp: Expression): Boolean = { - if (null != exp - && exp.isInstanceOf[Literal] - && (exp.asInstanceOf[Literal].dataType == org.apache.spark.sql.types.DataTypes.NullType) - || (exp.asInstanceOf[Literal].value == null)) { - true - } else { - false - } - } - private def getActualCarbonDataType(column: String, carbonTable: CarbonTable) = { - var carbonColumn: CarbonColumn = - carbonTable.getDimensionByName(carbonTable.getFactTableName, column) - val dataType = if (carbonColumn != null) { - carbonColumn.getDataType - } else { - carbonColumn = carbonTable.getMeasureByName(carbonTable.getFactTableName, column) - carbonColumn.getDataType match { - case CarbonDataTypes.INT => CarbonDataTypes.INT - case CarbonDataTypes.SHORT => CarbonDataTypes.SHORT - case CarbonDataTypes.LONG => CarbonDataTypes.LONG - case CarbonDataTypes.DECIMAL => CarbonDataTypes.DECIMAL - case _ => CarbonDataTypes.DOUBLE - } - } - CarbonScalaUtil.convertCarbonToSparkDataType(dataType) - } - - // Convert scala list to java list, Cannot use scalaList.asJava as while deserializing it is - // not able find the classes inside scala list and gives ClassNotFoundException. - private def convertToJavaList( - scalaList: Seq[CarbonExpression]): java.util.List[CarbonExpression] = { - val javaList = new java.util.ArrayList[CarbonExpression]() - scalaList.foreach(javaList.add) - javaList - } -}