Repository: incubator-carbondata Updated Branches: refs/heads/12-dev dccd2ad0d -> 914d61e32 (forced update)
Insert Select Into Same Table Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/f527d3d2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/f527d3d2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/f527d3d2 Branch: refs/heads/12-dev Commit: f527d3d2e460076705834393182199c589ed318e Parents: 950a6d0 Author: sounakr <soun...@gmail.com> Authored: Mon Apr 3 12:48:20 2017 +0530 Committer: sounakr <soun...@gmail.com> Committed: Wed Apr 5 23:52:20 2017 +0530 ---------------------------------------------------------------------- .../InsertIntoCarbonTableTestCase.scala | 28 +++++++++ .../spark/sql/hive/CarbonAnalysisRules.scala | 43 +++++++------- .../spark/sql/CarbonCatalystOperators.scala | 19 +++++++ .../sql/CarbonDatasourceHadoopRelation.scala | 13 +---- .../sql/execution/command/DDLStrategy.scala | 5 +- .../execution/command/carbonTableSchema.scala | 3 +- .../spark/sql/hive/CarbonAnalysisRules.scala | 60 ++++++++++++++++++++ .../spark/sql/hive/CarbonSessionState.scala | 30 +++++++++- 8 files changed, 163 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f527d3d2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala index e84e62a..0b491bf 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala @@ -210,6 +210,32 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { ) } + test("insert select from same table") { + val timeStampPropOrig = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT) + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + + sql("drop table if exists CarbonDest") + sql("create table CarbonDest (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedVersio ns string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber BigInt) STORED BY 'org.apache.carbondata.format'") + + sql("drop table if exists HiveDest") + sql("create table HiveDest (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber BigInt) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','") + + sql("insert into CarbonDest select * from THive") + sql("insert into CarbonDest select * from CarbonDest") + + sql("insert into HiveDest select * from THive") + sql("insert into HiveDest select * from HiveDest") + + checkAnswer( + sql("select imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operato rsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription from HiveDest order by imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Late st_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription"), + sql("select imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operato rsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription from CarbonDest order by imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,La test_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription") + ) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, timeStampPropOrig) + } + + + override def afterAll { sql("drop table if exists load") sql("drop table if exists inser") @@ -219,5 +245,7 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { sql("drop table if exists TCarbonSource") sql("drop table if exists loadtable") sql("drop table if exists insertTable") + sql("drop table if exists CarbonDest") + sql("drop table if exists HiveDest") } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f527d3d2/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala index f22e958..d23b18f 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala @@ -33,31 +33,32 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants * Insert into carbon table from other source */ object CarbonPreInsertionCasts extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan.transform { - // Wait until children are resolved. - case p: LogicalPlan if !p.childrenResolved => p + def apply(plan: LogicalPlan): LogicalPlan = plan.transform { + // Wait until children are resolved. + case p: LogicalPlan if !p.childrenResolved => p - case p @ InsertIntoTable(relation: LogicalRelation, _, child, _, _) - if relation.relation.isInstanceOf[CarbonDatasourceRelation] => - castChildOutput(p, relation.relation.asInstanceOf[CarbonDatasourceRelation], child) - } + case p @ InsertIntoTable(relation: LogicalRelation, _, child, _, _) + if relation.relation.isInstanceOf[CarbonDatasourceRelation] => + castChildOutput(p, relation.relation.asInstanceOf[CarbonDatasourceRelation], child) + } - def castChildOutput(p: InsertIntoTable, relation: CarbonDatasourceRelation, child: LogicalPlan) - : LogicalPlan = { - if (relation.carbonRelation.output.size > CarbonCommonConstants - .DEFAULT_MAX_NUMBER_OF_COLUMNS) { - sys - .error("Maximum supported column by carbon is:" + CarbonCommonConstants - .DEFAULT_MAX_NUMBER_OF_COLUMNS - ) - } - if (child.output.size >= relation.carbonRelation.output.size ) { - InsertIntoCarbonTable(relation, p.partition, p.child, p.overwrite, p.ifNotExists) - } else { - sys.error("Cannot insert into target table because column number are different") - } + def castChildOutput(p: InsertIntoTable, relation: CarbonDatasourceRelation, child: LogicalPlan) + : LogicalPlan = { + if (relation.carbonRelation.output.size > CarbonCommonConstants + .DEFAULT_MAX_NUMBER_OF_COLUMNS) { + sys + .error("Maximum supported column by carbon is:" + CarbonCommonConstants + .DEFAULT_MAX_NUMBER_OF_COLUMNS + ) + } + if (child.output.size >= relation.carbonRelation.output.size ) { + InsertIntoCarbonTable(relation, p.partition, p.child, p.overwrite, p.ifNotExists) + } else { + sys.error("Cannot insert into target table because column number are different") } } +} + object CarbonIUDAnalysisRule extends Rule[LogicalPlan] { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f527d3d2/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala index d94489b..4070088 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala @@ -75,3 +75,22 @@ case class DescribeFormattedCommand(sql: String, tblIdentifier: TableIdentifier) override def output: Seq[AttributeReference] = Seq(AttributeReference("result", StringType, nullable = false)()) } + +/** + * A logical plan representing insertion into Hive table + * This plan ignores nullability of ArrayType, MapType, StructType unlike InsertIntoTable + * because Hive Table doesn't have nullability for ARRAY, MAP,STRUCT types. + */ +case class InsertIntoCarbonTable (table: CarbonDatasourceHadoopRelation, + partition: Map[String, Option[String]], + child: LogicalPlan, + overwrite: OverwriteOptions, + ifNotExists: Boolean) + extends Command { + + override def output: Seq[Attribute] = Seq.empty + + // This is the expected schema of the table prepared to be inserted into + // including dynamic partition columns. + val tableOutput = table.carbonRelation.output +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f527d3d2/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala index 2743e7e..4169ac3 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala @@ -43,7 +43,7 @@ case class CarbonDatasourceHadoopRelation( parameters: Map[String, String], tableSchema: Option[StructType], isSubquery: ArrayBuffer[Boolean] = new ArrayBuffer[Boolean]()) - extends BaseRelation with InsertableRelation { + extends BaseRelation { lazy val absIdentifier = AbsoluteTableIdentifier.fromTablePath(paths.head) lazy val carbonTable = SchemaReader.readCarbonTableFromStore(absIdentifier) @@ -74,15 +74,4 @@ case class CarbonDatasourceHadoopRelation( } override def unhandledFilters(filters: Array[Filter]): Array[Filter] = new Array[Filter](0) - override def insert(data: DataFrame, overwrite: Boolean): Unit = { - if (carbonRelation.output.size > CarbonCommonConstants.DEFAULT_MAX_NUMBER_OF_COLUMNS) { - sys.error("Maximum supported column by carbon is:" + - CarbonCommonConstants.DEFAULT_MAX_NUMBER_OF_COLUMNS) - } - if(data.logicalPlan.output.size >= carbonRelation.output.size) { - LoadTableByInsert(this, data.logicalPlan).run(sparkSession) - } else { - sys.error("Cannot insert into target table because column number are different") - } - } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f527d3d2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala index 2916a9f..55148d2 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.sql.execution.command -import org.apache.spark.sql.{CarbonEnv, ShowLoadsCommand, SparkSession} +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, InsertIntoCarbonTable, ShowLoadsCommand, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -54,6 +54,9 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { identifier.table.toLowerCase)) :: Nil case ShowLoadsCommand(databaseName, table, limit) => ExecutedCommandExec(ShowLoads(databaseName, table.toLowerCase, limit, plan.output)) :: Nil + case InsertIntoCarbonTable(relation: CarbonDatasourceHadoopRelation, + _, child: LogicalPlan, _, _) => + ExecutedCommandExec(LoadTableByInsert(relation, child)) :: Nil case createDb@CreateDatabaseCommand(dbName, ifNotExists, _, _, _) => CarbonEnv.get.carbonMetastore.createDatabaseDirectory(dbName) ExecutedCommandExec(createDb) :: Nil http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f527d3d2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 4bd0564..77a0d90 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -280,7 +280,8 @@ object LoadTable { } -case class LoadTableByInsert(relation: CarbonDatasourceHadoopRelation, child: LogicalPlan) { +case class LoadTableByInsert(relation: CarbonDatasourceHadoopRelation, child: LogicalPlan) + extends RunnableCommand { val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) def run(sparkSession: SparkSession): Seq[Row] = { val df = Dataset.ofRows(sparkSession, child) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f527d3d2/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala new file mode 100644 index 0000000..45accac --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.sql.execution.datasources.LogicalRelation + +import org.apache.carbondata.core.constants.CarbonCommonConstants + + +/** + * Insert into carbon table from other source + */ +object CarbonPreInsertionCasts extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = { + plan.transform { + // Wait until children are resolved. + case p: LogicalPlan if !p.childrenResolved => p + + case p@InsertIntoTable(relation: LogicalRelation, _, child, _, _) + if relation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => + castChildOutput(p, relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation], child) + } + } + + def castChildOutput(p: InsertIntoTable, + relation: CarbonDatasourceHadoopRelation, + child: LogicalPlan) + : LogicalPlan = { + if (relation.carbonRelation.output.size > CarbonCommonConstants + .DEFAULT_MAX_NUMBER_OF_COLUMNS) { + sys + .error("Maximum supported column by carbon is:" + CarbonCommonConstants + .DEFAULT_MAX_NUMBER_OF_COLUMNS + ) + } + if (child.output.size >= relation.carbonRelation.output.size) { + InsertIntoCarbonTable(relation, p.partition, p.child, p.overwrite, p.ifNotExists) + } else { + sys.error("Cannot insert into target table because column number are different") + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f527d3d2/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala index d81fc09..38c7f34 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala @@ -16,18 +16,22 @@ */ package org.apache.spark.sql.hive -import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, ExperimentalMethods, SparkSession} +import org.apache.spark.sql.CarbonDatasourceHadoopRelation +import org.apache.spark.sql.catalyst.analysis.Analyzer import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.catalyst.expressions.{PredicateSubquery, ScalarSubquery} import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} -import org.apache.spark.sql.execution.{CarbonLateDecodeStrategy, SparkOptimizer} +import org.apache.spark.sql.execution.CarbonLateDecodeStrategy import org.apache.spark.sql.execution.command.DDLStrategy -import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.SparkOptimizer +import org.apache.spark.sql.ExperimentalMethods import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.optimizer.CarbonLateDecodeRule import org.apache.spark.sql.parser.CarbonSparkSqlParser +import org.apache.spark.sql.SparkSession /** * Session state implementation to override sql parser and adding strategies @@ -42,6 +46,26 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule) override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods) + + override lazy val analyzer: Analyzer = { + new Analyzer(catalog, conf) { + override val extendedResolutionRules = + catalog.ParquetConversions :: + catalog.OrcConversions :: + CarbonPreInsertionCasts :: + AnalyzeCreateTable(sparkSession) :: + PreprocessTableInsertion(conf) :: + DataSourceAnalysis(conf) :: + (if (conf.runSQLonFile) { + new ResolveDataSource(sparkSession) :: Nil + } else { + Nil + }) + + override val extendedCheckRules = Seq( + PreWriteCheck(conf, catalog)) + } + } } class CarbonOptimizer(