Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1481#discussion_r150490665 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala --- @@ -34,105 +32,74 @@ import org.apache.carbondata.core.util.CarbonUtil * 1. failed to create pre aggregate table. * 2. failed to update main table * - * @param cm - * @param dataFrame - * @param createDSTable * @param queryString */ case class CreatePreAggregateTableCommand( - cm: TableModel, - dataFrame: DataFrame, - createDSTable: Boolean = true, - queryString: String, - fieldRelationMap: scala.collection.mutable.LinkedHashMap[Field, DataMapField]) + dataMapName: String, + parentTableIdentifier: TableIdentifier, + dmClassName: String, + dmproperties: Map[String, String], + queryString: String) extends RunnableCommand with SchemaProcessCommand { override def run(sparkSession: SparkSession): Seq[Row] = { processSchema(sparkSession) } override def processSchema(sparkSession: SparkSession): Seq[Row] = { - val storePath = CarbonEnv.getInstance(sparkSession).storePath - CarbonEnv.getInstance(sparkSession).carbonMetastore. - checkSchemasModifiedTimeAndReloadTables(storePath) - val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - cm.databaseName = GetDB.getDatabaseName(cm.databaseNameOp, sparkSession) - val tbName = cm.tableName - val dbName = cm.databaseName - LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]") + val df = sparkSession.sql(queryString) + val fieldRelationMap = PreAggregateUtil + .validateActualSelectPlanAndGetAttrubites(df.logicalPlan, queryString) + val fields = fieldRelationMap.keySet.toSeq + val tableProperties = mutable.Map[String, String]() + dmproperties.foreach(t => tableProperties.put(t._1, t._2)) + val tableIdentifier = TableIdentifier(dataMapName, parentTableIdentifier.database) + // prepare table model of the collected tokens + val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel(false, + new CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database), + tableIdentifier.table.toLowerCase, + fields, + Seq(), + tableProperties, + None, + false, + None) + // getting the parent table - val parentTable = PreAggregateUtil.getParentCarbonTable(dataFrame.logicalPlan) + val parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan) // getting the table name val parentTableName = parentTable.getFactTableName // getting the db name of parent table val parentDbName = parentTable.getDatabaseName + + assert(parentTableName.equalsIgnoreCase(parentTableIdentifier.table)) // updating the relation identifier, this will be stored in child table // which can be used during dropping of pre-aggreate table as parent table will // also get updated - cm.parentTable = Some(parentTable) - cm.dataMapRelation = Some(fieldRelationMap) - val tableInfo: TableInfo = TableNewProcessor(cm) - // Add validation for sort scope when create table - val sortScope = tableInfo.getFactTable.getTableProperties - .getOrDefault("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT) - if (!CarbonUtil.isValidSortOption(sortScope)) { - throw new InvalidConfigurationException( - s"Passing invalid SORT_SCOPE '$sortScope', valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT'," + - s" 'LOCAL_SORT' and 'GLOBAL_SORT' ") - } - - if (tableInfo.getFactTable.getListOfColumns.size <= 0) { - sys.error("No Dimensions found. Table should have at least one dimesnion !") - } - - if (sparkSession.sessionState.catalog.listTables(dbName) - .exists(_.table.equalsIgnoreCase(tbName))) { - if (!cm.ifNotExistsSet) { - LOGGER.audit( - s"Table creation with Database name [$dbName] and Table name [$tbName] failed. " + - s"Table [$tbName] already exists under database [$dbName]") - sys.error(s"Table [$tbName] already exists under database [$dbName]") - } - } else { - val tableIdentifier = AbsoluteTableIdentifier.from(storePath, dbName, tbName) - // Add Database to catalog and persist - val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore - val tablePath = tableIdentifier.getTablePath - val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tablePath) - if (createDSTable) { - try { - val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size) - cm.dimCols.foreach(f => fields(f.schemaOrdinal) = f) - cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f) - sparkSession.sql( - s"""CREATE TABLE $dbName.$tbName - |(${ fields.map(f => f.rawSchema).mkString(",") }) - |USING org.apache.spark.sql.CarbonSource""".stripMargin + - s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin + - s""""$tablePath"$carbonSchemaString) """) - // child schema object which will be updated on parent table about the - val childSchema = tableInfo.getFactTable - .buildChildSchema("", tableInfo.getDatabaseName, queryString, "AGGREGATION") - // upadting the parent table about child table - PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession) - val loadAvailable = PreAggregateUtil - .checkMainTableLoad(parentTable) - if (loadAvailable) { - sparkSession.sql(s"insert into ${ cm.databaseName }.${ cm.tableName } $queryString") - } - } catch { - case e: Exception => - val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName)) - // call the drop table to delete the created table. - CarbonEnv.getInstance(sparkSession).carbonMetastore - .dropTable(tablePath, identifier)(sparkSession) - LOGGER.audit(s"Table creation with Database name [$dbName] " + - s"and Table name [$tbName] failed") - throw e - } + tableModel.parentTable = Some(parentTable) + tableModel.dataMapRelation = Some(fieldRelationMap) + CarbonCreateTableCommand(tableModel).run(sparkSession) + try { + val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore. + lookupRelation(tableIdentifier)(sparkSession).asInstanceOf[CarbonRelation] + val tableInfo = relation.tableMeta.carbonTable.getTableInfo + // child schema object which will be updated on parent table about the + val childSchema = tableInfo.getFactTable + .buildChildSchema(dataMapName, "", tableInfo.getDatabaseName, queryString, "AGGREGATION") + dmproperties.foreach(f => childSchema.getProperties.put(f._1, f._2)) + // updating the parent table about child table + PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession) + val loadAvailable = PreAggregateUtil.checkMainTableLoad(parentTable) + if (loadAvailable) { + sparkSession + .sql(s"insert into ${ tableModel.databaseName }.${ tableModel.tableName } $queryString") } + } catch { + case e: Exception => + sparkSession. + sql(s"""DROP TABLE IF EXISTS ${ tableModel.databaseName }.${ tableModel.tableName }""") --- End diff -- ok
---