Github user xubo245 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2225#discussion_r185489456 --- Diff: integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CreateCarbonSourceTableAsSelectCommand.scala --- @@ -0,0 +1,170 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.catalog.{ + CatalogRelation, CatalogTable, CatalogTableType, + SimpleCatalogRelation +} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.command.{ + AlterTableRecoverPartitionsCommand, DDLUtils, + RunnableCommand +} +import org.apache.spark.sql.execution.datasources.{DataSource, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.sources.InsertableRelation +import org.apache.spark.sql.types.StructType + +/** + * Create table 'using carbondata' and insert the query result into it. + * + * @param table the Catalog Table + * @param mode SaveMode:Ignore,OverWrite,ErrorIfExists,Append + * @param query the query whose result will be insert into the new relation + * + */ + +case class CreateCarbonSourceTableAsSelectCommand( + table: CatalogTable, + mode: SaveMode, + query: LogicalPlan) + extends RunnableCommand { + + override protected def innerChildren: Seq[LogicalPlan] = Seq(query) + + override def run(sparkSession: SparkSession): Seq[Row] = { + assert(table.tableType != CatalogTableType.VIEW) + assert(table.provider.isDefined) + assert(table.schema.isEmpty) + + val provider = table.provider.get + val sessionState = sparkSession.sessionState + val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase) + val tableIdentWithDB = table.identifier.copy(database = Some(db)) + val tableName = tableIdentWithDB.unquotedString + + var createMetastoreTable = false + var existingSchema = Option.empty[StructType] + if (sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) { + // Check if we need to throw an exception or just return. + mode match { + case SaveMode.ErrorIfExists => + throw new AnalysisException(s"Table $tableName already exists. " + + s"If you are using saveAsTable, you can set SaveMode to " + + s"SaveMode.Append to " + + s"insert data into the table or set SaveMode to SaveMode" + + s".Overwrite to overwrite" + + s"the existing data. " + + s"Or, if you are using SQL CREATE TABLE, you need to drop " + + s"$tableName first.") + case SaveMode.Ignore => + // Since the table already exists and the save mode is Ignore, we will just return. + return Seq.empty[Row] + case SaveMode.Append => + // Check if the specified data source match the data source of the existing table. + val existingProvider = DataSource.lookupDataSource(provider) + // TODO: Check that options from the resolved relation match the relation that we are + // inserting into (i.e. using the same compression). + + // Pass a table identifier with database part, so that `lookupRelation` won't get temp + // views unexpectedly. + EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) match { + case l@LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) => + // check if the file formats match + l.relation match { + case r: HadoopFsRelation if r.fileFormat.getClass != existingProvider => + throw new AnalysisException( + s"The file format of the existing table $tableName is " + + s"`${ r.fileFormat.getClass.getName }`. It doesn't match the specified " + + s"format `$provider`") + case _ => + } + if (query.schema.size != l.schema.size) { + throw new AnalysisException( + s"The column number of the existing schema[${ l.schema }] " + + s"doesn't match the data schema[${ query.schema }]'s") + } + existingSchema = Some(l.schema) + case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) => + existingSchema = Some(s.metadata.schema) + case c: CatalogRelation if c.catalogTable.provider == Some(DDLUtils.HIVE_PROVIDER) => + throw new AnalysisException("Saving data in the Hive serde table " + + s"${ + c.catalogTable --- End diff -- please keep {} in one line
---