http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala new file mode 100644 index 0000000..96e840f --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.preaaggregate + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand +import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand +import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil +import org.apache.spark.sql.parser.CarbonSpark2SqlParser + +import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider +import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema} +import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} + +/** + * Below helper class will be used to create pre-aggregate table + * and updating the parent table about the child table information + * It will be either success or nothing happen in case of failure: + * 1. failed to create pre aggregate table. + * 2. failed to update main table + * + */ +case class PreAggregateTableHelper( + var parentTable: CarbonTable, + dataMapName: String, + dataMapClassName: String, + dataMapProperties: java.util.Map[String, String], + queryString: String, + timeSeriesFunction: String) { + + var loadCommand: CarbonLoadDataCommand = _ + + def initMeta(sparkSession: SparkSession): Seq[Row] = { + val dmProperties = dataMapProperties.asScala + val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(queryString) + val df = sparkSession.sql(updatedQuery) + val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes( + df.logicalPlan, queryString) + val fields = fieldRelationMap.keySet.toSeq + val tableProperties = mutable.Map[String, String]() + dmProperties.foreach(t => tableProperties.put(t._1, t._2)) + + val selectTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan) + if (!parentTable.getTableName.equalsIgnoreCase(selectTable.getTableName)) { + throw new MalformedDataMapCommandException( + "Parent table name is different in select and create") + } + var neworder = Seq[String]() + val parentOrder = parentTable.getSortColumns(parentTable.getTableName).asScala + parentOrder.foreach(parentcol => + fields.filter(col => fieldRelationMap(col).aggregateFunction.isEmpty && + parentcol.equals(fieldRelationMap(col). + columnTableRelationList.get(0).parentColumnName)) + .map(cols => neworder :+= cols.column) + ) + tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, neworder.mkString(",")) + tableProperties.put("sort_scope", parentTable.getTableInfo.getFactTable. + getTableProperties.getOrDefault("sort_scope", CarbonCommonConstants + .LOAD_SORT_SCOPE_DEFAULT)) + tableProperties + .put(CarbonCommonConstants.TABLE_BLOCKSIZE, parentTable.getBlockSizeInMB.toString) + val tableIdentifier = + TableIdentifier(parentTable.getTableName + "_" + dataMapName, + Some(parentTable.getDatabaseName)) + // prepare table model of the collected tokens + val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel( + ifNotExistPresent = false, + new CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database), + tableIdentifier.table.toLowerCase, + fields, + Seq(), + tableProperties, + None, + isAlterFlow = false, + None) + + // updating the relation identifier, this will be stored in child table + // which can be used during dropping of pre-aggreate table as parent table will + // also get updated + if(timeSeriesFunction != null) { + TimeSeriesUtil.validateTimeSeriesEventTime(dmProperties.toMap, parentTable) + TimeSeriesUtil.validateEventTimeColumnExitsInSelect( + fieldRelationMap, + dmProperties(TimeSeriesUtil.TIMESERIES_EVENTTIME)) + TimeSeriesUtil.updateTimeColumnSelect(fieldRelationMap, + dmProperties(TimeSeriesUtil.TIMESERIES_EVENTTIME), + timeSeriesFunction) + } + tableModel.parentTable = Some(parentTable) + tableModel.dataMapRelation = Some(fieldRelationMap) + val tablePath = if (dmProperties.contains("path")) { + dmProperties("path") + } else { + CarbonEnv.getTablePath(tableModel.databaseNameOp, tableModel.tableName)(sparkSession) + } + CarbonCreateTableCommand(TableNewProcessor(tableModel), + tableModel.ifNotExistsSet, Some(tablePath)).run(sparkSession) + + val table = CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession) + val tableInfo = table.getTableInfo + + // child schema object will be saved on parent table schema + val childSchema = tableInfo.getFactTable.buildChildSchema( + dataMapName, + DataMapProvider.PREAGGREGATE.toString, + tableInfo.getDatabaseName, + queryString, + "AGGREGATION") + dmProperties.foreach(f => childSchema.getProperties.put(f._1, f._2)) + + // updating the parent table about child table + PreAggregateUtil.updateMainTable(parentTable, childSchema, sparkSession) + + // After updating the parent carbon table with data map entry extract the latest table object + // to be used in further create process. + parentTable = CarbonEnv.getCarbonTable(Some(parentTable.getDatabaseName), + parentTable.getTableName)(sparkSession) + + val updatedLoadQuery = if (timeSeriesFunction != null) { + val dataMap = parentTable.getTableInfo.getDataMapSchemaList.asScala + .filter(p => p.getDataMapName.equalsIgnoreCase(dataMapName)) + .head + .asInstanceOf[AggregationDataMapSchema] + PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMap.getChildSchema, + parentTable.getTableName, + parentTable.getDatabaseName) + } else { + queryString + } + val dataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction( + updatedLoadQuery)).drop("preAggLoad") + loadCommand = PreAggregateUtil.createLoadCommandForChild( + tableInfo.getFactTable.getListOfColumns, + tableIdentifier, + dataFrame, + isOverwrite = false, + sparkSession = sparkSession) + loadCommand.processMetadata(sparkSession) + Seq.empty + } + + def initData(sparkSession: SparkSession): Seq[Row] = { + // load child table if parent table has existing segments + // This will be used to check if the parent table has any segments or not. If not then no + // need to fire load for pre-aggregate table. Therefore reading the load details for PARENT + // table. + SegmentStatusManager.deleteLoadsAndUpdateMetadata(parentTable, false) + val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath) + if (loadAvailable.exists(load => load.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS || + load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)) { + throw new UnsupportedOperationException( + "Cannot create pre-aggregate table when insert is in progress on main table") + } else if (loadAvailable.nonEmpty) { + val updatedQuery = if (timeSeriesFunction != null) { + val dataMap = parentTable.getTableInfo.getDataMapSchemaList.asScala + .filter(p => p.getDataMapName + .equalsIgnoreCase(dataMapName)).head + .asInstanceOf[AggregationDataMapSchema] + PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMap.getChildSchema, + parentTable.getTableName, + parentTable.getDatabaseName) + } else { + queryString + } + // Passing segmentToLoad as * because we want to load all the segments into the + // pre-aggregate table even if the user has set some segments on the parent table. + loadCommand.dataFrame = Some(PreAggregateUtil + .getDataFrame(sparkSession, loadCommand.logicalPlan.get)) + PreAggregateUtil.startDataLoadForDataMap( + parentTable, + segmentToLoad = "*", + validateSegments = true, + sparkSession, + loadCommand) + } + Seq.empty + } +} + +
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala index 1073f63..845e30d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.command.{ColumnTableRelation, DataMapField, Field} import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonRelation} import org.apache.spark.sql.parser.CarbonSpark2SqlParser import org.apache.spark.sql.types.DataType @@ -405,11 +405,11 @@ object PreAggregateUtil { * Below method will be used to update the main table about the pre aggregate table information * in case of any exception it will throw error so pre aggregate table creation will fail * - * @param childSchema - * @param sparkSession + * @return the existing TableInfo object before updating, it can be used to recover if any + * operation failed later */ def updateMainTable(carbonTable: CarbonTable, - childSchema: DataMapSchema, sparkSession: SparkSession): Unit = { + childSchema: DataMapSchema, sparkSession: SparkSession): TableInfo = { val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK) @@ -422,7 +422,7 @@ object PreAggregateUtil { locks = acquireLock(dbName, tableName, locksToBeAcquired, carbonTable) // get the latest carbon table and check for column existence // read the latest schema file - val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession) + val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable) val schemaConverter = new ThriftWrapperSchemaConverterImpl() val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo( thriftTableInfo, @@ -435,11 +435,11 @@ object PreAggregateUtil { throw new MetadataProcessException("DataMap name already exist") } wrapperTableInfo.getDataMapSchemaList.add(childSchema) - val thriftTable = schemaConverter - .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName) - updateSchemaInfo(carbonTable, - thriftTable)(sparkSession) + val thriftTable = schemaConverter.fromWrapperToExternalTableInfo( + wrapperTableInfo, dbName, tableName) + updateSchemaInfo(carbonTable, thriftTable)(sparkSession) LOGGER.info(s"Parent table updated is successful for table $dbName.$tableName") + thriftTableInfo } catch { case e: Exception => LOGGER.error(e, "Pre Aggregate Parent table update failed reverting changes") @@ -449,7 +449,6 @@ object PreAggregateUtil { // release lock after command execution completion releaseLocks(locks) } - Seq.empty } /** @@ -525,7 +524,7 @@ object PreAggregateUtil { val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) carbonTable.getTableLastUpdatedTime - val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession) + val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable) if (thriftTable.dataMapSchemas.size > numberOfChildSchema) { metastore.revertTableSchemaForPreAggCreationFailure( carbonTable.getAbsoluteTableIdentifier, thriftTable)(sparkSession) http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala index 07917d0..07d693b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala @@ -63,7 +63,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand( OperationListenerBus.getInstance().fireEvent(alterTableAddColumnListener, operationContext) // get the latest carbon table and check for column existence // read the latest schema file - val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession) + val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable) val schemaConverter = new ThriftWrapperSchemaConverterImpl() val wrapperTableInfo = schemaConverter .fromExternalToWrapperTableInfo(thriftTableInfo, http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala index fa8003e..51c8ec8 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala @@ -74,7 +74,7 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand( sys.error(s"Invalid Column: $columnName") } // read the latest schema file - val tableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession) + val tableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable) // maintain the added column for schema evolution history var addColumnSchema: ColumnSchema = null var deletedColumnSchema: ColumnSchema = null http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala index d848eb5..fcc1095 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala @@ -99,7 +99,7 @@ private[sql] case class CarbonAlterTableDropColumnCommand( // read the latest schema file val tableInfo: org.apache.carbondata.format.TableInfo = - metastore.getThriftTableInfo(carbonTable)(sparkSession) + metastore.getThriftTableInfo(carbonTable) // maintain the deleted columns for schema evolution history var deletedColumnSchema = ListBuffer[org.apache.carbondata.format.ColumnSchema]() val columnSchemaList = tableInfo.fact_table.table_columns.asScala http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala index fc780cb..cda78ce 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala @@ -108,7 +108,7 @@ private[sql] case class CarbonAlterTableRenameCommand( sparkSession) OperationListenerBus.getInstance().fireEvent(alterTableRenamePreEvent, operationContext) val tableInfo: org.apache.carbondata.format.TableInfo = - metastore.getThriftTableInfo(carbonTable)(sparkSession) + metastore.getThriftTableInfo(carbonTable) val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis) schemaEvolutionEntry.setTableName(newTableName) timeStamp = System.currentTimeMillis() http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala index 9e0cee5..e67a98f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala @@ -63,13 +63,13 @@ object TimeSeriesUtil { * @return whether find only one granularity */ def validateTimeSeriesGranularity( - dmProperties: Map[String, String], + dmProperties: java.util.Map[String, String], dmClassName: String): Boolean = { var isFound = false // 1. granularity only support one for (granularity <- Granularity.values()) { - if (dmProperties.get(granularity.getName).isDefined) { + if (dmProperties.containsKey(granularity.getName)) { if (isFound) { throw new MalformedDataMapCommandException( s"Only one granularity level can be defined") @@ -104,14 +104,14 @@ object TimeSeriesUtil { * @return key and value tuple */ def getTimeSeriesGranularityDetails( - dmProperties: Map[String, String], + dmProperties: java.util.Map[String, String], dmClassName: String): (String, String) = { val defaultValue = "1" for (granularity <- Granularity.values()) { - if (dmProperties.get(granularity.getName).isDefined && - dmProperties.get(granularity.getName).get.equalsIgnoreCase(defaultValue)) { - return (granularity.toString.toLowerCase, dmProperties.get(granularity.getName).get) + if (dmProperties.containsKey(granularity.getName) && + dmProperties.get(granularity.getName).equalsIgnoreCase(defaultValue)) { + return (granularity.toString.toLowerCase, dmProperties.get(granularity.getName)) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala index fd09e48..b3438a4 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala @@ -523,8 +523,7 @@ class CarbonFileMetastore extends CarbonMetaStore { override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] = metadata.carbonTables - override def getThriftTableInfo(carbonTable: CarbonTable) - (sparkSession: SparkSession): TableInfo = { + override def getThriftTableInfo(carbonTable: CarbonTable): TableInfo = { val tableMetadataFile = CarbonTablePath.getSchemaFilePath(carbonTable.getTablePath) CarbonUtil.readSchemaFile(tableMetadataFile) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala index 44f731e..4c40fee 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala @@ -96,8 +96,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { Seq() } - override def getThriftTableInfo(carbonTable: CarbonTable) - (sparkSession: SparkSession): format.TableInfo = { + override def getThriftTableInfo(carbonTable: CarbonTable): format.TableInfo = { val schemaConverter = new ThriftWrapperSchemaConverterImpl schemaConverter.fromWrapperToExternalTableInfo(carbonTable.getTableInfo, carbonTable.getDatabaseName, http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala index 0645040..c2333f9 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala @@ -143,7 +143,7 @@ trait CarbonMetaStore { def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] - def getThriftTableInfo(carbonTable: CarbonTable)(sparkSession: SparkSession): TableInfo + def getThriftTableInfo(carbonTable: CarbonTable): TableInfo def getTableFromMetadataCache(database: String, tableName: String): Option[CarbonTable] http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala index aaa87a3..9c2c7e7 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala @@ -192,7 +192,7 @@ object AlterTableUtil { val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore val fileType = FileFactory.getFileType(tablePath) if (FileFactory.isFileExist(tablePath, fileType)) { - val tableInfo = metastore.getThriftTableInfo(oldCarbonTable)(sparkSession) + val tableInfo = metastore.getThriftTableInfo(oldCarbonTable) val evolutionEntryList = tableInfo.fact_table.schema_evolution.schema_evolution_history val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp if (updatedTime == timeStamp) { @@ -221,7 +221,7 @@ object AlterTableUtil { (sparkSession: SparkSession): Unit = { val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) - val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession) + val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable) val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp if (updatedTime == timeStamp) { @@ -246,7 +246,7 @@ object AlterTableUtil { (sparkSession: SparkSession): Unit = { val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) - val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession) + val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable) val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp if (updatedTime == timeStamp) { @@ -277,7 +277,7 @@ object AlterTableUtil { (sparkSession: SparkSession): Unit = { val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) - val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession) + val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable) val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp if (updatedTime == timeStamp) { @@ -326,7 +326,7 @@ object AlterTableUtil { carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession) // get the latest carbon table // read the latest schema file - val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession) + val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable) val schemaConverter = new ThriftWrapperSchemaConverterImpl() val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo( thriftTableInfo, http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java index e817590..f9baab3 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java @@ -30,13 +30,13 @@ import org.apache.carbondata.core.datamap.DataMapMeta; import org.apache.carbondata.core.datamap.DataMapStoreManager; import org.apache.carbondata.core.datamap.TableDataMap; import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter; -import org.apache.carbondata.core.datamap.dev.DataMapFactory; +import org.apache.carbondata.core.datamap.dev.IndexDataMapFactory; import org.apache.carbondata.core.datastore.page.ColumnPage; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.processing.store.TablePage; /** - * It is for writing DataMap for one table + * It is for writing IndexDataMap for one table */ public class DataMapWriterListener { @@ -54,7 +54,7 @@ public class DataMapWriterListener { List<TableDataMap> tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable); if (tableDataMaps != null) { for (TableDataMap tableDataMap : tableDataMaps) { - DataMapFactory factory = tableDataMap.getDataMapFactory(); + IndexDataMapFactory factory = tableDataMap.getIndexDataMapFactory(); register(factory, segmentId, dataWritePath); } } @@ -63,7 +63,7 @@ public class DataMapWriterListener { /** * Register a AbstractDataMapWriter */ - private void register(DataMapFactory factory, String segmentId, String dataWritePath) { + private void register(IndexDataMapFactory factory, String segmentId, String dataWritePath) { assert (factory != null); assert (segmentId != null); DataMapMeta meta = factory.getMeta();