[CARBONDATA-2078][CARBONDATA-1516] Add 'if not exists' for creating datamap
Add 'if not exists' function for creating datamap This closes #1861 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f9606e9d Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f9606e9d Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f9606e9d Branch: refs/heads/branch-1.3 Commit: f9606e9d03d55bf57925c2ac176e92553c213d49 Parents: 02eefca Author: xubo245 <601450...@qq.com> Authored: Thu Jan 25 21:27:27 2018 +0800 Committer: kunal642 <kunalkapoor...@gmail.com> Committed: Fri Feb 2 12:08:54 2018 +0530 ---------------------------------------------------------------------- .../preaggregate/TestPreAggCreateCommand.scala | 55 ++++++++++- .../preaggregate/TestPreAggregateLoad.scala | 96 ++++++++++++++++++- .../timeseries/TestTimeSeriesCreateTable.scala | 85 +++++++++++++---- .../timeseries/TestTimeseriesDataLoad.scala | 99 +++++++++++++++++++- .../datamap/CarbonCreateDataMapCommand.scala | 38 ++++++-- .../CreatePreAggregateTableCommand.scala | 5 +- .../sql/parser/CarbonSpark2SqlParser.scala | 9 +- 7 files changed, 353 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9606e9d/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala index f1d7396..0cb1045 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala @@ -19,7 +19,7 @@ package org.apache.carbondata.integration.spark.testsuite.preaggregate import scala.collection.JavaConverters._ -import org.apache.spark.sql.CarbonDatasourceHadoopRelation +import org.apache.spark.sql.{AnalysisException, CarbonDatasourceHadoopRelation} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hive.CarbonRelation @@ -321,7 +321,60 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll { checkExistence(sql("show tables"), false, "tbl_1_agg2_day","tbl_1_agg2_hour","tbl_1_agg2_month","tbl_1_agg2_year") } + test("test pre agg create table 21: should support 'if not exists'") { + try { + sql( + """ + | CREATE DATAMAP IF NOT EXISTS agg0 ON TABLE mainTable + | USING 'preaggregate' + | AS SELECT + | column3, + | sum(column3), + | column5, + | sum(column5) + | FROM maintable + | GROUP BY column3,column5,column2 + """.stripMargin) + + sql( + """ + | CREATE DATAMAP IF NOT EXISTS agg0 ON TABLE mainTable + | USING 'preaggregate' + | AS SELECT + | column3, + | sum(column3), + | column5, + | sum(column5) + | FROM maintable + | GROUP BY column3,column5,column2 + """.stripMargin) + assert(true) + } catch { + case _: Exception => + assert(false) + } + sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable") + } + test("test pre agg create table 22: don't support 'create datamap if exists'") { + val e: Exception = intercept[AnalysisException] { + sql( + """ + | CREATE DATAMAP IF EXISTS agg0 ON TABLE mainTable + | USING 'preaggregate' + | AS SELECT + | column3, + | sum(column3), + | column5, + | sum(column5) + | FROM maintable + | GROUP BY column3,column5,column2 + """.stripMargin) + assert(true) + } + assert(e.getMessage.contains("identifier matching regex")) + sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable") + } def getCarbontable(plan: LogicalPlan) : CarbonTable ={ var carbonTable : CarbonTable = null http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9606e9d/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala index 4ebf150..b6b7a17 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala @@ -21,9 +21,9 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.test.util.QueryTest import org.apache.spark.util.SparkUtil4Test import org.scalatest.{BeforeAndAfterAll, Ignore} - import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException class TestPreAggregateLoad extends QueryTest with BeforeAndAfterAll { @@ -310,5 +310,99 @@ test("check load and select for avg double datatype") { checkAnswer(sql("select name,avg(salary) from maintbl group by name"), rows) } + test("create datamap with 'if not exists' after load data into mainTable and create datamap") { + sql("DROP TABLE IF EXISTS maintable") + sql( + """ + | CREATE TABLE maintable(id int, name string, city string, age int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql( + s""" + | create datamap preagg_sum + | on table maintable + | using 'preaggregate' + | as select id,sum(age) from maintable + | group by id + """.stripMargin) + + sql( + s""" + | create datamap if not exists preagg_sum + | on table maintable + | using 'preaggregate' + | as select id,sum(age) from maintable + | group by id + """.stripMargin) + + checkAnswer(sql(s"select * from maintable_preagg_sum"), + Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55))) + sql("drop table if exists maintable") + } + + test("create datamap with 'if not exists' after create datamap and load data into mainTable") { + sql("DROP TABLE IF EXISTS maintable") + sql( + """ + | CREATE TABLE maintable(id int, name string, city string, age int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + + sql( + s""" + | create datamap preagg_sum + | on table maintable + | using 'preaggregate' + | as select id,sum(age) from maintable + | group by id + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql( + s""" + | create datamap if not exists preagg_sum + | on table maintable + | using 'preaggregate' + | as select id,sum(age) from maintable + | group by id + """.stripMargin) + + checkAnswer(sql(s"select * from maintable_preagg_sum"), + Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55))) + sql("drop table if exists maintable") + } + + test("create datamap without 'if not exists' after load data into mainTable and create datamap") { + sql("DROP TABLE IF EXISTS maintable") + sql( + """ + | CREATE TABLE maintable(id int, name string, city string, age int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + sql( + s""" + | create datamap preagg_sum + | on table maintable + | using 'preaggregate' + | as select id,sum(age) from maintable + | group by id + """.stripMargin) + + val e: Exception = intercept[TableAlreadyExistsException] { + sql( + s""" + | create datamap preagg_sum + | on table maintable + | using 'preaggregate' + | as select id,sum(age) from maintable + | group by id + """.stripMargin) + } + assert(e.getMessage.contains("already exists in database")) + checkAnswer(sql(s"select * from maintable_preagg_sum"), + Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55))) + sql("drop table if exists maintable") + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9606e9d/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala index 0ca7cb9..b63fd53 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala @@ -16,6 +16,7 @@ */ package org.apache.carbondata.integration.spark.testsuite.timeseries +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll @@ -81,29 +82,29 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll { """.stripMargin) } - test("test timeseries create table Zero") { + test("test timeseries create table 1") { checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0_second"), true, "maintable_agg0_second") sql("drop datamap agg0_second on table mainTable") } - test("test timeseries create table One") { + test("test timeseries create table 2") { checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0_hour"), true, "maintable_agg0_hour") sql("drop datamap agg0_hour on table mainTable") } - test("test timeseries create table two") { + test("test timeseries create table 3") { checkExistence(sql("DESCRIBE FORMATTED maintable_agg0_day"), true, "maintable_agg0_day") sql("drop datamap agg0_day on table mainTable") } - test("test timeseries create table three") { + test("test timeseries create table 4") { checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0_month"), true, "maintable_agg0_month") sql("drop datamap agg0_month on table mainTable") } - test("test timeseries create table four") { + test("test timeseries create table 5") { checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0_year"), true, "maintable_agg0_year") sql("drop datamap agg0_year on table mainTable") } - test("test timeseries create table five") { + test("test timeseries create table 6") { intercept[Exception] { sql( s""" @@ -118,7 +119,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll { } } - test("test timeseries create table Six") { + test("test timeseries create table 7") { intercept[Exception] { sql( s""" @@ -133,7 +134,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll { } } - test("test timeseries create table seven") { + test("test timeseries create table 8") { intercept[Exception] { sql( s""" @@ -158,7 +159,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll { } } - test("test timeseries create table Eight") { + test("test timeseries create table 9") { intercept[Exception] { sql( s""" @@ -173,7 +174,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll { } } - test("test timeseries create table Nine") { + test("test timeseries create table 10") { intercept[Exception] { sql( s""" @@ -188,7 +189,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll { } } - test("test timeseries create table: USING") { + test("test timeseries create table 11: USING") { val e: Exception = intercept[MalformedDataMapCommandException] { sql( """CREATE DATAMAP agg1 ON TABLE mainTable @@ -203,7 +204,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll { assert(e.getMessage.equals("Unknown data map type abc")) } - test("test timeseries create table: USING and catch MalformedCarbonCommandException") { + test("test timeseries create table 12: USING and catch MalformedCarbonCommandException") { val e: Exception = intercept[MalformedCarbonCommandException] { sql( """CREATE DATAMAP agg1 ON TABLE mainTable @@ -218,7 +219,8 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll { assert(e.getMessage.equals("Unknown data map type abc")) } - test("test timeseries create table: Only one granularity level can be defined 1") { + test("test timeseries create table 13: Only one granularity level can be defined 1") { + sql("DROP DATAMAP IF EXISTS agg0_second ON TABLE mainTable") val e: Exception = intercept[MalformedCarbonCommandException] { sql( s""" @@ -238,7 +240,8 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll { assert(e.getMessage.equals("Only one granularity level can be defined")) } - test("test timeseries create table: Only one granularity level can be defined 2") { + test("test timeseries create table 14: Only one granularity level can be defined 2") { + sql("DROP DATAMAP IF EXISTS agg0_second ON TABLE mainTable") val e: Exception = intercept[MalformedDataMapCommandException] { sql( s""" @@ -255,7 +258,8 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll { assert(e.getMessage.equals("Only one granularity level can be defined")) } - test("test timeseries create table: Only one granularity level can be defined 3") { + test("test timeseries create table 15: Only one granularity level can be defined 3") { + sql("DROP DATAMAP IF EXISTS agg0_second ON TABLE mainTable") val e: Exception = intercept[MalformedDataMapCommandException] { sql( s""" @@ -272,7 +276,8 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll { assert(e.getMessage.equals("Only one granularity level can be defined")) } - test("test timeseries create table: Granularity only support 1") { + test("test timeseries create table 16: Granularity only support 1") { + sql("DROP DATAMAP IF EXISTS agg0_second ON TABLE mainTable") val e = intercept[MalformedDataMapCommandException] { sql( s""" @@ -288,7 +293,8 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll { assert(e.getMessage.equals("Granularity only support 1")) } - test("test timeseries create table: Granularity only support 1 and throw Exception") { + test("test timeseries create table 17: Granularity only support 1 and throw Exception") { + sql("DROP DATAMAP IF EXISTS agg0_second ON TABLE mainTable") val e = intercept[MalformedCarbonCommandException] { sql( s""" @@ -304,7 +310,8 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll { assert(e.getMessage.equals("Granularity only support 1")) } - test("test timeseries create table: timeSeries should define time granularity") { + test("test timeseries create table 18: timeSeries should define time granularity") { + sql("DROP DATAMAP IF EXISTS agg0_second ON TABLE mainTable") val e = intercept[MalformedDataMapCommandException] { sql( s""" @@ -319,6 +326,48 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll { assert(e.getMessage.equals(s"$timeSeries should define time granularity")) } + test("test timeseries create table 19: should support if not exists") { + sql("DROP DATAMAP IF EXISTS agg1 ON TABLE mainTable") + + sql( + s""" + | CREATE DATAMAP agg1 ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='dataTime', + | 'MONTH_GRANULARITY'='1') + | AS SELECT dataTime, SUM(age) FROM mainTable + | GROUP BY dataTime + """.stripMargin) + sql( + s""" + | CREATE DATAMAP IF NOT EXISTS agg1 ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='dataTime', + | 'MONTH_GRANULARITY'='1') + |AS SELECT dataTime, SUM(age) FROM mainTable + |GROUP BY dataTime + """.stripMargin) + checkExistence(sql("SHOW DATAMAP ON TABLE mainTable"), true, "agg1") + checkExistence(sql("DESC FORMATTED mainTable_agg1"), true, "maintable_age_sum") + } + + test("test timeseries create table 20: don't support 'create datamap if exists'") { + val e: Exception = intercept[AnalysisException] { + sql( + s"""CREATE DATAMAP IF EXISTS agg2 ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='dataTime', + | 'MONTH_GRANULARITY'='1') + | AS SELECT dataTime, SUM(age) FROM mainTable + | GROUP BY dataTime + """.stripMargin) + } + assert(e.getMessage.contains("identifier matching regex")) + } + override def afterAll: Unit = { sql("DROP TABLE IF EXISTS mainTable") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9606e9d/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala index 8bcdfc9..b43b93b 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala @@ -19,9 +19,10 @@ package org.apache.carbondata.integration.spark.testsuite.timeseries import java.sql.Timestamp import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.test.util.QueryTest import org.apache.spark.util.SparkUtil4Test -import org.scalatest.{BeforeAndAfterAll, Ignore} +import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES @@ -239,6 +240,102 @@ class TestTimeseriesDataLoad extends QueryTest with BeforeAndAfterAll { Row(Timestamp.valueOf("2016-02-23 01:02:50.0"),50))) } + test("create datamap without 'if not exists' after load data into mainTable and create datamap") { + sql("drop table if exists mainTable") + sql( + """ + | CREATE TABLE mainTable( + | mytime timestamp, + | name string, + | age int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/timeseriestest.csv' into table mainTable") + + sql( + s""" + | CREATE DATAMAP agg0_second ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='mytime', + | 'second_granularity'='1') + | AS SELECT mytime, SUM(age) FROM mainTable + | GROUP BY mytime + """.stripMargin) + + checkAnswer(sql("select * from maintable_agg0_second"), + Seq(Row(Timestamp.valueOf("2016-02-23 01:01:30.0"), 10), + Row(Timestamp.valueOf("2016-02-23 01:01:40.0"), 20), + Row(Timestamp.valueOf("2016-02-23 01:01:50.0"), 30), + Row(Timestamp.valueOf("2016-02-23 01:02:30.0"), 40), + Row(Timestamp.valueOf("2016-02-23 01:02:40.0"), 50), + Row(Timestamp.valueOf("2016-02-23 01:02:50.0"), 50))) + val e: Exception = intercept[TableAlreadyExistsException] { + sql( + s""" + | CREATE DATAMAP agg0_second ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='mytime', + | 'second_granularity'='1') + | AS SELECT mytime, SUM(age) FROM mainTable + | GROUP BY mytime + """.stripMargin) + } + assert(e.getMessage.contains("already exists in database")) + } + + test("create datamap with 'if not exists' after load data into mainTable and create datamap") { + sql("drop table if exists mainTable") + sql( + """ + | CREATE TABLE mainTable( + | mytime timestamp, + | name string, + | age int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/timeseriestest.csv' into table mainTable") + + sql( + s""" + | CREATE DATAMAP agg0_second ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='mytime', + | 'second_granularity'='1') + | AS SELECT mytime, SUM(age) FROM mainTable + | GROUP BY mytime + """.stripMargin) + + checkAnswer(sql("select * from maintable_agg0_second"), + Seq(Row(Timestamp.valueOf("2016-02-23 01:01:30.0"), 10), + Row(Timestamp.valueOf("2016-02-23 01:01:40.0"), 20), + Row(Timestamp.valueOf("2016-02-23 01:01:50.0"), 30), + Row(Timestamp.valueOf("2016-02-23 01:02:30.0"), 40), + Row(Timestamp.valueOf("2016-02-23 01:02:40.0"), 50), + Row(Timestamp.valueOf("2016-02-23 01:02:50.0"), 50))) + + sql( + s""" + | CREATE DATAMAP IF NOT EXISTS agg0_second ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='mytime', + | 'second_granularity'='1') + | AS SELECT mytime, SUM(age) FROM mainTable + | GROUP BY mytime + """.stripMargin) + + checkAnswer(sql("select * from maintable_agg0_second"), + Seq(Row(Timestamp.valueOf("2016-02-23 01:01:30.0"), 10), + Row(Timestamp.valueOf("2016-02-23 01:01:40.0"), 20), + Row(Timestamp.valueOf("2016-02-23 01:01:50.0"), 30), + Row(Timestamp.valueOf("2016-02-23 01:02:30.0"), 40), + Row(Timestamp.valueOf("2016-02-23 01:02:40.0"), 50), + Row(Timestamp.valueOf("2016-02-23 01:02:50.0"), 50))) + } + override def afterAll: Unit = { sql("drop table if exists mainTable") sql("drop table if exists table_03") http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9606e9d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala index c4d32b4..da20ac5 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.command.datamap import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.command.preaaggregate.CreatePreAggregateTableCommand @@ -35,10 +36,12 @@ case class CarbonCreateDataMapCommand( tableIdentifier: TableIdentifier, dmClassName: String, dmproperties: Map[String, String], - queryString: Option[String]) + queryString: Option[String], + ifNotExistsSet: Boolean = false) extends AtomicRunnableCommand { var createPreAggregateTableCommands: CreatePreAggregateTableCommand = _ + var tableIsExists: Boolean = false override def processMetadata(sparkSession: SparkSession): Seq[Row] = { // since streaming segment does not support building index and pre-aggregate yet, @@ -49,10 +52,22 @@ case class CarbonCreateDataMapCommand( throw new MalformedCarbonCommandException("Streaming table does not support creating datamap") } val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + val dbName = tableIdentifier.database.getOrElse("default") + val tableName = tableIdentifier.table + "_" + dataMapName - if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) || + if (sparkSession.sessionState.catalog.listTables(dbName) + .exists(_.table.equalsIgnoreCase(tableName))) { + LOGGER.audit( + s"Table creation with Database name [$dbName] and Table name [$tableName] failed. " + + s"Table [$tableName] already exists under database [$dbName]") + tableIsExists = true + if (!ifNotExistsSet) { + throw new TableAlreadyExistsException(dbName, tableName) + } + } else if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) || dmClassName.equalsIgnoreCase(TIMESERIES.toString)) { TimeSeriesUtil.validateTimeSeriesGranularity(dmproperties, dmClassName) + createPreAggregateTableCommands = if (dmClassName.equalsIgnoreCase(TIMESERIES.toString)) { val details = TimeSeriesUtil .getTimeSeriesGranularityDetails(dmproperties, dmClassName) @@ -62,15 +77,16 @@ case class CarbonCreateDataMapCommand( dmClassName, updatedDmProperties, queryString.get, - Some(details._1)) + Some(details._1), + ifNotExistsSet = ifNotExistsSet) } else { CreatePreAggregateTableCommand( dataMapName, tableIdentifier, dmClassName, dmproperties, - queryString.get - ) + queryString.get, + ifNotExistsSet = ifNotExistsSet) } createPreAggregateTableCommands.processMetadata(sparkSession) } else { @@ -83,7 +99,11 @@ case class CarbonCreateDataMapCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) || dmClassName.equalsIgnoreCase(TIMESERIES.toString)) { - createPreAggregateTableCommands.processData(sparkSession) + if (!tableIsExists) { + createPreAggregateTableCommands.processData(sparkSession) + } else { + Seq.empty + } } else { throw new MalformedDataMapCommandException("Unknown data map type " + dmClassName) } @@ -92,7 +112,11 @@ case class CarbonCreateDataMapCommand( override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = { if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) || dmClassName.equalsIgnoreCase(TIMESERIES.toString)) { - createPreAggregateTableCommands.undoMetadata(sparkSession, exception) + if (!tableIsExists) { + createPreAggregateTableCommands.undoMetadata(sparkSession, exception) + } else { + Seq.empty + } } else { throw new MalformedDataMapCommandException("Unknown data map type " + dmClassName) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9606e9d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala index 3de75c2..31a3403 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala @@ -49,7 +49,8 @@ case class CreatePreAggregateTableCommand( dmClassName: String, dmProperties: Map[String, String], queryString: String, - timeSeriesFunction: Option[String] = None) + timeSeriesFunction: Option[String] = None, + ifNotExistsSet: Boolean = false) extends AtomicRunnableCommand { var parentTable: CarbonTable = _ @@ -86,7 +87,7 @@ case class CreatePreAggregateTableCommand( parentTableIdentifier.database) // prepare table model of the collected tokens val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel( - ifNotExistPresent = false, + ifNotExistPresent = ifNotExistsSet, new CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database), tableIdentifier.table.toLowerCase, fields, http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9606e9d/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index 4045478..7addd26 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -142,17 +142,18 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { /** * The syntax of datamap creation is as follows. - * CREATE DATAMAP datamapName ON TABLE tableName USING 'DataMapClassName' + * CREATE DATAMAP IF NOT EXISTS datamapName ON TABLE tableName USING 'DataMapClassName' * DMPROPERTIES('KEY'='VALUE') AS SELECT COUNT(COL1) FROM tableName */ protected lazy val createDataMap: Parser[LogicalPlan] = - CREATE ~> DATAMAP ~> ident ~ (ON ~ TABLE) ~ (ident <~ ".").? ~ ident ~ + CREATE ~> DATAMAP ~> opt(IF ~> NOT ~> EXISTS) ~ ident ~ + (ON ~ TABLE) ~ (ident <~ ".").? ~ ident ~ (USING ~> stringLit) ~ (DMPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? ~ (AS ~> restInput).? <~ opt(";") ^^ { - case dmname ~ ontable ~ dbName ~ tableName ~ className ~ dmprops ~ query => + case ifnotexists ~ dmname ~ ontable ~ dbName ~ tableName ~ className ~ dmprops ~ query => val map = dmprops.getOrElse(List[(String, String)]()).toMap[String, String] CarbonCreateDataMapCommand( - dmname, TableIdentifier(tableName, dbName), className, map, query) + dmname, TableIdentifier(tableName, dbName), className, map, query, ifnotexists.isDefined) } /**