Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1481#discussion_r150487031
  
    --- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
 ---
    @@ -34,105 +32,74 @@ import org.apache.carbondata.core.util.CarbonUtil
      * 1. failed to create pre aggregate table.
      * 2. failed to update main table
      *
    - * @param cm
    - * @param dataFrame
    - * @param createDSTable
      * @param queryString
      */
     case class CreatePreAggregateTableCommand(
    -    cm: TableModel,
    -    dataFrame: DataFrame,
    -    createDSTable: Boolean = true,
    -    queryString: String,
    -    fieldRelationMap: scala.collection.mutable.LinkedHashMap[Field, 
DataMapField])
    +    dataMapName: String,
    +    parentTableIdentifier: TableIdentifier,
    +    dmClassName: String,
    +    dmproperties: Map[String, String],
    +    queryString: String)
       extends RunnableCommand with SchemaProcessCommand {
     
       override def run(sparkSession: SparkSession): Seq[Row] = {
         processSchema(sparkSession)
       }
     
       override def processSchema(sparkSession: SparkSession): Seq[Row] = {
    -    val storePath = CarbonEnv.getInstance(sparkSession).storePath
    -    CarbonEnv.getInstance(sparkSession).carbonMetastore.
    -      checkSchemasModifiedTimeAndReloadTables(storePath)
    -    val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    -    cm.databaseName = GetDB.getDatabaseName(cm.databaseNameOp, 
sparkSession)
    -    val tbName = cm.tableName
    -    val dbName = cm.databaseName
    -    LOGGER.audit(s"Creating Table with Database name [$dbName] and Table 
name [$tbName]")
    +    val df = sparkSession.sql(queryString)
    +    val fieldRelationMap = PreAggregateUtil
    +      .validateActualSelectPlanAndGetAttrubites(df.logicalPlan, 
queryString)
    +    val fields = fieldRelationMap.keySet.toSeq
    +    val tableProperties = mutable.Map[String, String]()
    +    dmproperties.foreach(t => tableProperties.put(t._1, t._2))
    +    val tableIdentifier = TableIdentifier(dataMapName, 
parentTableIdentifier.database)
    +    // prepare table model of the collected tokens
    +    val tableModel: TableModel = new 
CarbonSpark2SqlParser().prepareTableModel(false,
    +      new 
CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database),
    +      tableIdentifier.table.toLowerCase,
    +      fields,
    +      Seq(),
    +      tableProperties,
    +      None,
    +      false,
    +      None)
    +
         // getting the parent table
    -    val parentTable = 
PreAggregateUtil.getParentCarbonTable(dataFrame.logicalPlan)
    +    val parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan)
         // getting the table name
         val parentTableName = parentTable.getFactTableName
         // getting the db name of parent table
         val parentDbName = parentTable.getDatabaseName
    +
    +    assert(parentTableName.equalsIgnoreCase(parentTableIdentifier.table))
         // updating the relation identifier, this will be stored in child table
         // which can be used during dropping of pre-aggreate table as parent 
table will
         // also get updated
    -    cm.parentTable = Some(parentTable)
    -    cm.dataMapRelation = Some(fieldRelationMap)
    -    val tableInfo: TableInfo = TableNewProcessor(cm)
    -    // Add validation for sort scope when create table
    -    val sortScope = tableInfo.getFactTable.getTableProperties
    -      .getOrDefault("sort_scope", 
CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
    -    if (!CarbonUtil.isValidSortOption(sortScope)) {
    -      throw new InvalidConfigurationException(
    -        s"Passing invalid SORT_SCOPE '$sortScope', valid SORT_SCOPE are 
'NO_SORT', 'BATCH_SORT'," +
    -        s" 'LOCAL_SORT' and 'GLOBAL_SORT' ")
    -    }
    -
    -    if (tableInfo.getFactTable.getListOfColumns.size <= 0) {
    -      sys.error("No Dimensions found. Table should have at least one 
dimesnion !")
    -    }
    -
    -    if (sparkSession.sessionState.catalog.listTables(dbName)
    -      .exists(_.table.equalsIgnoreCase(tbName))) {
    -      if (!cm.ifNotExistsSet) {
    -        LOGGER.audit(
    -          s"Table creation with Database name [$dbName] and Table name 
[$tbName] failed. " +
    -          s"Table [$tbName] already exists under database [$dbName]")
    -        sys.error(s"Table [$tbName] already exists under database 
[$dbName]")
    -      }
    -    } else {
    -      val tableIdentifier = AbsoluteTableIdentifier.from(storePath, 
dbName, tbName)
    -      // Add Database to catalog and persist
    -      val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
    -      val tablePath = tableIdentifier.getTablePath
    -      val carbonSchemaString = 
catalog.generateTableSchemaString(tableInfo, tablePath)
    -      if (createDSTable) {
    -        try {
    -          val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size)
    -          cm.dimCols.foreach(f => fields(f.schemaOrdinal) = f)
    -          cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f)
    -          sparkSession.sql(
    -            s"""CREATE TABLE $dbName.$tbName
    -               |(${ fields.map(f => f.rawSchema).mkString(",") })
    -               |USING org.apache.spark.sql.CarbonSource""".stripMargin +
    -            s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath 
""".stripMargin +
    -            s""""$tablePath"$carbonSchemaString) """)
    -          // child schema object which will be updated on parent table 
about the
    -          val childSchema = tableInfo.getFactTable
    -            .buildChildSchema("", tableInfo.getDatabaseName, queryString, 
"AGGREGATION")
    -          // upadting the parent table about child table
    -          PreAggregateUtil.updateMainTable(parentDbName, parentTableName, 
childSchema, sparkSession)
    -          val loadAvailable = PreAggregateUtil
    -            .checkMainTableLoad(parentTable)
    -          if (loadAvailable) {
    -            sparkSession.sql(s"insert into ${ cm.databaseName }.${ 
cm.tableName } $queryString")
    -          }
    -        } catch {
    -          case e: Exception =>
    -            val identifier: TableIdentifier = TableIdentifier(tbName, 
Some(dbName))
    -            // call the drop table to delete the created table.
    -            CarbonEnv.getInstance(sparkSession).carbonMetastore
    -              .dropTable(tablePath, identifier)(sparkSession)
    -            LOGGER.audit(s"Table creation with Database name [$dbName] " +
    -                         s"and Table name [$tbName] failed")
    -            throw e
    -        }
    +    tableModel.parentTable = Some(parentTable)
    +    tableModel.dataMapRelation = Some(fieldRelationMap)
    +    CarbonCreateTableCommand(tableModel).run(sparkSession)
    +    try {
    +      val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore.
    +        
lookupRelation(tableIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
    +      val tableInfo = relation.tableMeta.carbonTable.getTableInfo
    +      // child schema object which will be updated on parent table about 
the
    +      val childSchema = tableInfo.getFactTable
    +        .buildChildSchema(dataMapName, "", tableInfo.getDatabaseName, 
queryString, "AGGREGATION")
    +      dmproperties.foreach(f => childSchema.getProperties.put(f._1, f._2))
    +      // updating the parent table about child table
    +      PreAggregateUtil.updateMainTable(parentDbName, parentTableName, 
childSchema, sparkSession)
    +      val loadAvailable = PreAggregateUtil.checkMainTableLoad(parentTable)
    +      if (loadAvailable) {
    +        sparkSession
    +          .sql(s"insert into ${ tableModel.databaseName }.${ 
tableModel.tableName } $queryString")
           }
    +    } catch {
    +      case e: Exception =>
    +        sparkSession.
    +          sql(s"""DROP TABLE IF EXISTS ${ tableModel.databaseName }.${ 
tableModel.tableName }""")
    --- End diff --
    
    move to previous line, move parameter to next line


---

Reply via email to