[CARBONDATA-2088][CARBONDATA-1516] Optimize syntax for creating timeseries pre-aggregate table
change using 'timeseries' instead of using preaggregate for creating timeseries pre-aggregate table change timeseries.eventTime and hour_granularity and so on granularity only support one It should throw UnsupportDataMapException if don't use timeseries or preaggregate to create datamap This closes #1865 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/181c280b Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/181c280b Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/181c280b Branch: refs/heads/fgdatamap Commit: 181c280b7d33ac5e4029bd935d6260b0fe79a2bf Parents: b421c24 Author: xubo245 <601450...@qq.com> Authored: Fri Jan 26 17:47:46 2018 +0800 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Tue Jan 30 17:25:19 2018 +0530 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 4 - .../schema/datamap/DataMapProvider.java | 32 +++ .../metadata/schema/datamap/Granularity.java | 46 +++ .../preaggregate/TestPreAggCreateCommand.scala | 60 +++- .../TestPreAggregateTableSelection.scala | 26 +- .../timeseries/TestTimeSeriesCreateTable.scala | 280 ++++++++++++++++--- .../timeseries/TestTimeseriesCompaction.scala | 67 ++++- .../timeseries/TestTimeseriesDataLoad.scala | 142 +++++++++- .../TestTimeseriesTableSelection.scala | 114 +++++++- .../testsuite/datamap/TestDataMapCommand.scala | 155 +++++----- .../MalformedDataMapCommandException.java | 32 +++ .../datamap/CarbonCreateDataMapCommand.scala | 72 ++--- .../CreatePreAggregateTableCommand.scala | 4 +- .../command/timeseries/TimeSeriesUtil.scala | 83 +++++- .../carbondata/CarbonDataSourceSuite.scala | 8 + 15 files changed, 943 insertions(+), 182 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/181c280b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index f46feef..cf95dd9 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1543,10 +1543,6 @@ public final class CarbonCommonConstants { */ public static final long HANDOFF_SIZE_DEFAULT = 1024L * 1024 * 1024; - public static final String TIMESERIES_EVENTTIME = "timeseries.eventtime"; - - public static final String TIMESERIES_HIERARCHY = "timeseries.hierarchy"; - /** * It allows queries on hive metastore directly along with filter information, otherwise first * fetches all partitions from hive and apply filters on it. http://git-wip-us.apache.org/repos/asf/carbondata/blob/181c280b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapProvider.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapProvider.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapProvider.java new file mode 100644 index 0000000..65578b1 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapProvider.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.metadata.schema.datamap; + +/** + * type for create datamap + * The syntax of datamap creation is as follows. + * CREATE DATAMAP IF NOT EXISTS dataMapName ON TABLE tableName USING 'DataMapProvider' + * DMPROPERTIES('KEY'='VALUE') AS SELECT COUNT(COL1) FROM tableName + * + * Please refer {{org.apache.spark.sql.parser.CarbonSpark2SqlParser}} + */ + +public enum DataMapProvider { + PREAGGREGATE, + TIMESERIES; +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/181c280b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/Granularity.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/Granularity.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/Granularity.java new file mode 100644 index 0000000..d6aefb6 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/Granularity.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.metadata.schema.datamap; + +/** + * type for create datamap + * The syntax of datamap creation is as follows. + * CREATE DATAMAP IF NOT EXISTS dataMapName ON TABLE tableName USING 'DataMapProvider' + * DMPROPERTIES('KEY'='VALUE') AS SELECT COUNT(COL1) FROM tableName + * + * Please refer {{org.apache.spark.sql.parser.CarbonSpark2SqlParser}} + */ + +public enum Granularity { + YEAR("year_granularity"), + MONTH("month_granularity"), + DAY("day_granularity"), + HOUR("hour_granularity"), + MINUTE("minute_granularity"), + SECOND("second_granularity"); + private String name; + + Granularity(String name) { + this.name = name; + } + + public String getName() { + return name; + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/181c280b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala index 755a449..d3f0ff8 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala @@ -1,15 +1,18 @@ package org.apache.carbondata.integration.spark.testsuite.preaggregate +import scala.collection.JavaConverters._ + import org.apache.spark.sql.CarbonDatasourceHadoopRelation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll -import scala.collection.JavaConverters._ import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES +import org.apache.carbondata.spark.exception.MalformedCarbonCommandException class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll { @@ -212,6 +215,60 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll { sql("drop datamap agg0 on table maintable") } + val timeSeries = TIMESERIES.toString + + test("test PreAggregate table selection: create with preaggregate and hierarchy") { + sql("DROP TABLE IF EXISTS maintabletime") + sql( + """ + | CREATE TABLE maintabletime(year INT,month INT,name STRING,salary INT,dob STRING) + | STORED BY 'carbondata' + | TBLPROPERTIES( + | 'SORT_SCOPE'='Global_sort', + | 'TABLE_BLOCKSIZE'='23', + | 'SORT_COLUMNS'='month,year,name') + """.stripMargin) + sql("INSERT INTO maintabletime SELECT 10,11,'x',12,'2014-01-01 00:00:00'") + sql( + s""" + | CREATE DATAMAP agg0 ON TABLE maintabletime + | USING 'preaggregate' + | AS SELECT dob,name FROM maintabletime + | GROUP BY dob,name + """.stripMargin) + val e = intercept[MalformedCarbonCommandException] { + sql( + s""" + | CREATE DATAMAP agg1 ON TABLE maintabletime + | USING 'preaggregate' + | DMPROPERTIES ( + | 'EVENT_TIME'='dob', + | 'SECOND_GRANULARITY'='1') + | AS SELECT dob,name FROM maintabletime + | GROUP BY dob,name + """.stripMargin) + } + assert(e.getMessage.contains(s"$timeSeries keyword missing")) + sql("DROP TABLE IF EXISTS maintabletime") + } + + test("test pre agg create table 21: using") { + sql("DROP DATAMAP agg0 ON TABLE maintable") + + val e: Exception = intercept[Exception] { + sql( + """ + | CREATE DATAMAP agg0 ON TABLE mainTable + | USING 'abc' + | AS SELECT column3, SUM(column3),column5, SUM(column5) + | FROM maintable + | GROUP BY column3,column5,column2 + """.stripMargin) + } + assert(e.getMessage.contains( + s"Unknown data map type abc")) + sql("DROP DATAMAP agg0 ON TABLE maintable") + } def getCarbontable(plan: LogicalPlan) : CarbonTable ={ var carbonTable : CarbonTable = null @@ -239,5 +296,6 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll { sql("drop table if exists PreAggMain") sql("drop table if exists PreAggMain1") sql("drop table if exists PreAggMain2") + sql("drop table if exists maintabletime") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/181c280b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala index 17d95ef..f9ac354 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala @@ -23,6 +23,8 @@ import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Row} import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll +import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES + class TestPreAggregateTableSelection extends QueryTest with BeforeAndAfterAll { override def beforeAll: Unit = { @@ -267,6 +269,8 @@ class TestPreAggregateTableSelection extends QueryTest with BeforeAndAfterAll { preAggTableValidator(df.queryExecution.analyzed, "maintable") } + val timeSeries = TIMESERIES.toString + test("test PreAggregate table selection with timeseries and normal together") { sql("drop table if exists maintabletime") sql( @@ -277,17 +281,26 @@ test("test PreAggregate table selection with timeseries and normal together") { sql( "create datamap agg0 on table maintabletime using 'preaggregate' as select dob,name from " + "maintabletime group by dob,name") - sql( - "create datamap agg1 on table maintabletime using 'preaggregate' DMPROPERTIES ('timeseries" + - ".eventTime'='dob', 'timeseries.hierarchy'='hour=1,day=1,month=1,year=1') as select dob," + - "name from maintabletime group by dob,name") - val df = sql("select timeseries(dob,'year') from maintabletime group by timeseries(dob,'year')") + + sql( + s""" + | CREATE DATAMAP agg1_year ON TABLE maintabletime + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='dob', + | 'YEAR_GRANULARITY'='1') + | AS SELECT dob, name FROM maintabletime + | GROUP BY dob,name + """.stripMargin) + + val df = sql("SELECT timeseries(dob,'year') FROM maintabletime GROUP BY timeseries(dob,'year')") preAggTableValidator(df.queryExecution.analyzed, "maintabletime_agg1_year") + sql("DROP TABLE IF EXISTS maintabletime") } test("test table selection when unsupported aggregate function is present") { - sql("drop table if exists maintabletime") + sql("DROP TABLE IF EXISTS maintabletime") sql( "create table maintabletime(year int,month int,name string,salary int,dob string) stored" + " by 'carbondata' tblproperties('sort_scope'='Global_sort','table_blocksize'='23'," + @@ -302,6 +315,7 @@ test("test PreAggregate table selection with timeseries and normal together") { override def afterAll: Unit = { sql("drop table if exists mainTable") sql("drop table if exists lineitem") + sql("DROP TABLE IF EXISTS maintabletime") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/181c280b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala index c9041fa..0ca7cb9 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala @@ -17,14 +17,68 @@ package org.apache.carbondata.integration.spark.testsuite.timeseries import org.apache.spark.sql.test.util.QueryTest -import org.scalatest.{BeforeAndAfterAll, Ignore} +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES +import org.apache.carbondata.spark.exception.{MalformedDataMapCommandException, MalformedCarbonCommandException} class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll { + val timeSeries = TIMESERIES.toString + override def beforeAll: Unit = { - sql("drop table if exists mainTable") + sql("DROP TABLE IF EXISTS mainTable") sql("CREATE TABLE mainTable(dataTime timestamp, name string, city string, age int) STORED BY 'org.apache.carbondata.format'") - sql("create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='dataTime', 'timeseries.hierarchy'='second=1,hour=1,day=1,month=1,year=1') as select dataTime, sum(age) from mainTable group by dataTime") + sql( + s""" + | CREATE DATAMAP agg0_second ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='dataTime', + | 'SECOND_GRANULARITY'='1') + | AS SELECT dataTime, SUM(age) FROM mainTable + | GROUP BY dataTime + """.stripMargin) + sql( + s""" + | CREATE DATAMAP agg0_hour ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='dataTime', + | 'HOUR_GRANULARITY'='1') + | AS SELECT dataTime, SUM(age) FROM mainTable + | GROUP BY dataTime + """.stripMargin) + sql( + s""" + | CREATE DATAMAP agg0_day ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='dataTime', + | 'day_granularity'='1') + | AS SELECT dataTime, SUM(age) FROM mainTable + | GROUP BY dataTime + """.stripMargin) + sql( + s""" + | CREATE DATAMAP agg0_month ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='dataTime', + | 'month_granularity'='1') + | AS SELECT dataTime, SUM(age) FROM mainTable + | GROUP BY dataTime + """.stripMargin) + sql( + s""" + | CREATE DATAMAP agg0_year ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='dataTime', + | 'year_granularity'='1') + | AS SELECT dataTime, SUM(age) FROM mainTable + | GROUP BY dataTime + """.stripMargin) } test("test timeseries create table Zero") { @@ -53,28 +107,29 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll { intercept[Exception] { sql( s""" - | create datamap agg0 on table mainTable - | using 'preaggregate' + | CREATE DATAMAP agg0_second ON TABLE mainTable + | USING '$timeSeries' | DMPROPERTIES ( - | 'timeseries.eventTime'='dataTime', - | 'timeseries.hierarchy'='sec=1,hour=1,day=1,month=1,year=1') - | as select dataTime, sum(age) from mainTable - | group by dataTime - """.stripMargin) + | 'EVENT_TIME'='dataTime', + | 'SEC_GRANULARITY'='1') + | AS SELECT dataTime, SUM(age) FROM mainTable + | GROUP BY dataTime + """.stripMargin) } } test("test timeseries create table Six") { intercept[Exception] { sql( - """ - | create datamap agg0 on table mainTable - | using 'preaggregate' - | DMPROPERTIES ('timeseries.eventTime'='dataTime', 'timeseries.hierarchy'='hour=2') - | as select dataTime, sum(age) from mainTable - | group by dataTime - """.stripMargin) - + s""" + | CREATE DATAMAP agg0_second ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='dataTime', + | 'SECOND_GRANULARITY'='2') + | AS SELECT dataTime, SUM(age) FROM mainTable + | GROUP BY dataTime + """.stripMargin) } } @@ -82,24 +137,24 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll { intercept[Exception] { sql( s""" - | create datamap agg0 on table mainTable - | using 'preaggregate' + | CREATE DATAMAP agg0_second ON TABLE mainTable + | USING '$timeSeries' | DMPROPERTIES ( - | 'timeseries.eventTime'='dataTime', - | 'timeseries.hierarchy'='hour=1,day=1,year=1,month=1') - | as select dataTime, sum(age) from mainTable - | group by dataTime - """.stripMargin) + | 'EVENT_TIME'='dataTime', + | 'SECOND_GRANULARITY'='1') + | AS SELECT dataTime, SUM(age) FROM mainTable + | GROUP BY dataTime + """.stripMargin) sql( s""" - | create datamap agg0 on table mainTable - | using 'preaggregate' + | CREATE DATAMAP agg0_second ON TABLE mainTable + | USING '$timeSeries' | DMPROPERTIES ( - | 'timeseries.eventTime'='dataTime', - | 'timeseries.hierarchy'='hour=1,day=1,year=1,month=1') - | as select dataTime, sum(age) from mainTable - | group by dataTime - """.stripMargin) + | 'EVENT_TIME'='dataTime', + | 'SECOND_GRANULARITY'='1') + | AS SELECT dataTime, SUM(age) FROM mainTable + | GROUP BY dataTime + """.stripMargin) } } @@ -107,12 +162,14 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll { intercept[Exception] { sql( s""" - | create datamap agg0 on table mainTable - | using 'preaggregate' - | DMPROPERTIES ('timeseries.eventTime'='name', 'timeseries.hierarchy'='hour=1,day=1,year=1,month=1') - | as select name, sum(age) from mainTable - | group by name - """.stripMargin) + | CREATE DATAMAP agg0_second ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='name', + | 'SECOND_GRANULARITY'='1') + | AS SELECT dataTime, SUM(age) FROM mainTable + | GROUP BY dataTime + """.stripMargin) } } @@ -120,18 +177,149 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll { intercept[Exception] { sql( s""" - | create datamap agg0 on table mainTable - | using 'preaggregate' + | CREATE DATAMAP agg0_second ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='name', + | 'SECOND_GRANULARITY'='1') + | AS SELECT dataTime, SUM(age) FROM mainTable + | GROUP BY dataTime + """.stripMargin) + } + } + + test("test timeseries create table: USING") { + val e: Exception = intercept[MalformedDataMapCommandException] { + sql( + """CREATE DATAMAP agg1 ON TABLE mainTable + | USING 'abc' + | DMPROPERTIES ( + | 'EVENT_TIME'='dataTime', + | 'SECOND_GRANULARITY'='1') + | AS SELECT dataTime, SUM(age) FROM mainTable + | GROUP BY dataTime + """.stripMargin) + } + assert(e.getMessage.equals("Unknown data map type abc")) + } + + test("test timeseries create table: USING and catch MalformedCarbonCommandException") { + val e: Exception = intercept[MalformedCarbonCommandException] { + sql( + """CREATE DATAMAP agg1 ON TABLE mainTable + | USING 'abc' + | DMPROPERTIES ( + | 'EVENT_TIME'='dataTime', + | 'SECOND_GRANULARITY'='1') + | AS SELECT dataTime, SUM(age) FROM mainTable + | GROUP BY dataTime + """.stripMargin) + } + assert(e.getMessage.equals("Unknown data map type abc")) + } + + test("test timeseries create table: Only one granularity level can be defined 1") { + val e: Exception = intercept[MalformedCarbonCommandException] { + sql( + s""" + | CREATE DATAMAP agg0_second ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='dataTime', + | 'SECOND_GRANULARITY'='1', + | 'HOUR_GRANULARITY'='1', + | 'DAY_GRANULARITY'='1', + | 'MONTH_GRANULARITY'='1', + | 'YEAR_GRANULARITY'='1') + | AS SELECT dataTime, SUM(age) FROM mainTable + | GROUP BY dataTime + """.stripMargin) + } + assert(e.getMessage.equals("Only one granularity level can be defined")) + } + + test("test timeseries create table: Only one granularity level can be defined 2") { + val e: Exception = intercept[MalformedDataMapCommandException] { + sql( + s""" + | CREATE DATAMAP agg0_second ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='dataTime', + | 'SECOND_GRANULARITY'='1', + | 'HOUR_GRANULARITY'='1') + | AS SELECT dataTime, SUM(age) FROM mainTable + | GROUP BY dataTime + """.stripMargin) + } + assert(e.getMessage.equals("Only one granularity level can be defined")) + } + + test("test timeseries create table: Only one granularity level can be defined 3") { + val e: Exception = intercept[MalformedDataMapCommandException] { + sql( + s""" + | CREATE DATAMAP agg0_second ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='dataTime', + | 'DAY_GRANULARITY'='1', + | 'HOUR_GRANULARITY'='1') + | AS SELECT dataTime, SUM(age) FROM mainTable + | GROUP BY dataTime + """.stripMargin) + } + assert(e.getMessage.equals("Only one granularity level can be defined")) + } + + test("test timeseries create table: Granularity only support 1") { + val e = intercept[MalformedDataMapCommandException] { + sql( + s""" + | CREATE DATAMAP agg0_second ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='dataTime', + | 'DAY_GRANULARITY'='2') + | AS SELECT dataTime, SUM(age) FROM mainTable + | GROUP BY dataTime + """.stripMargin) + } + assert(e.getMessage.equals("Granularity only support 1")) + } + + test("test timeseries create table: Granularity only support 1 and throw Exception") { + val e = intercept[MalformedCarbonCommandException] { + sql( + s""" + | CREATE DATAMAP agg0_second ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='dataTime', + | 'HOUR_GRANULARITY'='2') + | AS SELECT dataTime, SUM(age) FROM mainTable + | GROUP BY dataTime + """.stripMargin) + } + assert(e.getMessage.equals("Granularity only support 1")) + } + + test("test timeseries create table: timeSeries should define time granularity") { + val e = intercept[MalformedDataMapCommandException] { + sql( + s""" + | CREATE DATAMAP agg0_second ON TABLE mainTable + | USING '$timeSeries' | DMPROPERTIES ( - | 'timeseries.eventTime'='dataTime', - | 'timeseries.hierarchy'='hour=1,day=1,year=1,month=1') - | as select name, sum(age) from mainTable - | group by name - """.stripMargin) + | 'EVENT_TIME'='dataTime') + | AS SELECT dataTime, SUM(age) FROM mainTable + | GROUP BY dataTime + """.stripMargin) } + assert(e.getMessage.equals(s"$timeSeries should define time granularity")) } override def afterAll: Unit = { - sql("drop table if exists mainTable") + sql("DROP TABLE IF EXISTS mainTable") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/181c280b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesCompaction.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesCompaction.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesCompaction.scala index a410fe4..d66c402 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesCompaction.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesCompaction.scala @@ -18,15 +18,18 @@ package org.apache.carbondata.integration.spark.testsuite.timeseries import org.apache.spark.sql.test.util.QueryTest import org.apache.spark.util.SparkUtil4Test -import org.scalatest.{BeforeAndAfterAll, Ignore} +import org.scalatest.BeforeAndAfterAll import org.scalatest.Matchers._ import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES import org.apache.carbondata.core.util.CarbonProperties class TestTimeseriesCompaction extends QueryTest with BeforeAndAfterAll { var isCompactionEnabled = false + val timeSeries = TIMESERIES.toString + override def beforeAll: Unit = { SparkUtil4Test.createTaskMockUp(sqlContext) CarbonProperties.getInstance() @@ -38,7 +41,67 @@ class TestTimeseriesCompaction extends QueryTest with BeforeAndAfterAll { .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true") sql("drop table if exists mainTable") sql("CREATE TABLE mainTable(mytime timestamp, name string, age int) STORED BY 'org.apache.carbondata.format'") - sql("create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='mytime', 'timeseries.hierarchy'='second=1,minute=1,hour=1,day=1,month=1,year=1') as select mytime, sum(age) from mainTable group by mytime") + sql( + s""" + | CREATE DATAMAP agg0_second ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='mytime', + | 'SECOND_GRANULARITY'='1') + | AS SELECT mytime, SUM(age) FROM mainTable + | GROUP BY mytime + """.stripMargin) + sql( + s""" + | CREATE DATAMAP agg0_minute ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='mytime', + | 'MINUTE_GRANULARITY'='1') + | AS SELECT mytime, SUM(age) FROM mainTable + | GROUP BY mytime + """.stripMargin) + sql( + s""" + | CREATE DATAMAP agg0_hour ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='mytime', + | 'HOUR_GRANULARITY'='1') + | AS SELECT mytime, SUM(age) FROM mainTable + | GROUP BY mytime + """.stripMargin) + sql( + s""" + | CREATE DATAMAP agg0_day ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='mytime', + | 'DAY_GRANULARITY'='1') + | AS SELECT mytime, SUM(age) FROM mainTable + | GROUP BY mytime + """.stripMargin) + sql( + s""" + | CREATE DATAMAP agg0_month ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='mytime', + | 'MONTH_GRANULARITY'='1') + | AS SELECT mytime, SUM(age) FROM mainTable + | GROUP BY mytime + """.stripMargin) + sql( + s""" + | CREATE DATAMAP agg0_year ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='mytime', + | 'YEAR_GRANULARITY'='1') + | AS SELECT mytime, SUM(age) FROM mainTable + | GROUP BY mytime + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/timeseriestest.csv' into table mainTable") sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/timeseriestest.csv' into table mainTable") sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/timeseriestest.csv' into table mainTable") http://git-wip-us.apache.org/repos/asf/carbondata/blob/181c280b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala index d25710c..8bcdfc9 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala @@ -24,10 +24,13 @@ import org.apache.spark.util.SparkUtil4Test import org.scalatest.{BeforeAndAfterAll, Ignore} import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES import org.apache.carbondata.core.util.CarbonProperties class TestTimeseriesDataLoad extends QueryTest with BeforeAndAfterAll { + val timeSeries = TIMESERIES.toString + override def beforeAll: Unit = { SparkUtil4Test.createTaskMockUp(sqlContext) CarbonProperties.getInstance() @@ -35,11 +38,131 @@ class TestTimeseriesDataLoad extends QueryTest with BeforeAndAfterAll { sql("drop table if exists mainTable") sql("drop table if exists table_03") sql("CREATE TABLE mainTable(mytime timestamp, name string, age int) STORED BY 'org.apache.carbondata.format'") - sql("create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='mytime', 'timeseries.hierarchy'='second=1,minute=1,hour=1,day=1,month=1,year=1') as select mytime, sum(age) from mainTable group by mytime") + sql( + s""" + | CREATE DATAMAP agg0_second ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='mytime', + | 'SECOND_GRANULARITY'='1') + | AS SELECT mytime, SUM(age) FROM mainTable + | GROUP BY mytime + """.stripMargin) + sql( + s""" + | CREATE DATAMAP agg0_minute ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='mytime', + | 'minute_granularity'='1') + | AS SELECT mytime, SUM(age) FROM mainTable + | GROUP BY mytime + """.stripMargin) + sql( + s""" + | CREATE DATAMAP agg0_hour ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='mytime', + | 'HOUR_GRANULARITY'='1') + | AS SELECT mytime, SUM(age) FROM mainTable + | GROUP BY mytime + """.stripMargin) + sql( + s""" + | CREATE DATAMAP agg0_day ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='mytime', + | 'DAY_GRANULARITY'='1') + | AS SELECT mytime, SUM(age) FROM mainTable + | GROUP BY mytime + """.stripMargin) + sql( + s""" + | CREATE DATAMAP agg0_month ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='mytime', + | 'MONTH_GRANULARITY'='1') + | AS SELECT mytime, SUM(age) FROM mainTable + | GROUP BY mytime + """.stripMargin) + sql( + s""" + | CREATE DATAMAP agg0_year ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='mytime', + | 'year_granularity'='1') + | AS SELECT mytime, SUM(age) FROM mainTable + | GROUP BY mytime + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/timeseriestest.csv' into table mainTable") sql("CREATE TABLE table_03 (imei string,age int,mac string,productdate timestamp,updatedate timestamp,gamePointId double,contractid double ) STORED BY 'org.apache.carbondata.format'") sql(s"LOAD DATA inpath '$resourcesPath/data_sort.csv' INTO table table_03 options ('DELIMITER'=',', 'QUOTECHAR'='','FILEHEADER'='imei,age,mac,productdate,updatedate,gamePointId,contractid')") - sql("create datamap ag1 on table table_03 using 'preaggregate' DMPROPERTIES ( 'timeseries.eventtime'='productdate','timeseries.hierarchy'='second=1,minute=1,hour=1,day=1,month=1,year=1')as select productdate,mac,sum(age) from table_03 group by productdate,mac") + + sql( + s""" + | CREATE DATAMAP ag1_second ON TABLE table_03 + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='productdate', + | 'SECOND_GRANULARITY'='1') + | AS SELECT productdate,mac,SUM(age) FROM table_03 + | GROUP BY productdate,mac + """.stripMargin) + sql( + s""" + | CREATE DATAMAP ag1_minute ON TABLE table_03 + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='productdate', + | 'minute_granularity'='1') + | AS SELECT productdate,mac,SUM(age) FROM table_03 + | GROUP BY productdate,mac + """.stripMargin) + sql( + s""" + | CREATE DATAMAP ag1_hour ON TABLE table_03 + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='productdate', + | 'HOUR_GRANULARITY'='1') + | AS SELECT productdate,mac,SUM(age) FROM table_03 + | GROUP BY productdate,mac + """.stripMargin) + sql( + s""" + | CREATE DATAMAP ag1_day ON TABLE table_03 + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='productdate', + | 'DAY_GRANULARITY'='1') + | AS SELECT productdate,mac,SUM(age) FROM table_03 + | GROUP BY productdate,mac + """.stripMargin) + sql( + s""" + | CREATE DATAMAP ag1_month ON TABLE table_03 + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='productdate', + | 'month_granularity'='1') + | AS SELECT productdate,mac,SUM(age) FROM table_03 + | GROUP BY productdate,mac + """.stripMargin) + sql( + s""" + | CREATE DATAMAP ag1_year ON TABLE table_03 + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='productdate', + | 'year_granularity'='1') + | AS SELECT productdate,mac,SUM(age) FROM table_03 + | GROUP BY productdate,mac + """.stripMargin) } test("test Year level timeseries data validation1 ") { @@ -93,12 +216,21 @@ class TestTimeseriesDataLoad extends QueryTest with BeforeAndAfterAll { Row(Timestamp.valueOf("2016-02-23 01:02:50.0"),50))) } - test("test if timeseries load is successful on table creation") { + test("test if timeseries load is successful ON TABLE creation") { sql("drop table if exists mainTable") sql("CREATE TABLE mainTable(mytime timestamp, name string, age int) STORED BY 'org.apache.carbondata.format'") sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/timeseriestest.csv' into table mainTable") - sql("create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='mytime', 'timeseries.hierarchy'='second=1,minute=1,hour=1,day=1,month=1,year=1') as select mytime, sum(age) from mainTable group by mytime") - checkAnswer( sql("select * from maintable_agg0_second"), + sql( + s""" + | CREATE DATAMAP agg0_second ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='mytime', + | 'SECOND_GRANULARITY'='1') + | AS SELECT mytime, SUM(age) FROM mainTable + | GROUP BY mytime + """.stripMargin) + checkAnswer( sql("select * FROM maintable_agg0_second"), Seq(Row(Timestamp.valueOf("2016-02-23 01:01:30.0"),10), Row(Timestamp.valueOf("2016-02-23 01:01:40.0"),20), Row(Timestamp.valueOf("2016-02-23 01:01:50.0"),30), http://git-wip-us.apache.org/repos/asf/carbondata/blob/181c280b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala index a9d3965..3065952 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala @@ -24,14 +24,78 @@ import org.apache.spark.sql.test.util.QueryTest import org.apache.spark.util.SparkUtil4Test import org.scalatest.BeforeAndAfterAll +import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES +import org.apache.carbondata.spark.exception.MalformedCarbonCommandException class TestTimeseriesTableSelection extends QueryTest with BeforeAndAfterAll { + val timeSeries = TIMESERIES.toString + override def beforeAll: Unit = { SparkUtil4Test.createTaskMockUp(sqlContext) sql("drop table if exists mainTable") sql("CREATE TABLE mainTable(mytime timestamp, name string, age int) STORED BY 'org.apache.carbondata.format'") - sql("create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='mytime', 'timeseries.hierarchy'='second=1,minute=1,hour=1,day=1,month=1,year=1') as select mytime, sum(age) from mainTable group by mytime") + sql( + s""" + | CREATE DATAMAP agg0_second ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='mytime', + | 'SECOND_GRANULARITY'='1') + | AS SELECT mytime, SUM(age) FROM mainTable + | GROUP BY mytime + """.stripMargin) + sql( + s""" + | CREATE DATAMAP agg0_minute ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='mytime', + | 'minute_granularity'='1') + | AS SELECT mytime, SUM(age) FROM mainTable + | GROUP BY mytime + """.stripMargin) + sql( + s""" + | CREATE DATAMAP agg0_hour ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='mytime', + | 'HOUR_GRANULARITY'='1') + | AS SELECT mytime, SUM(age) FROM mainTable + | GROUP BY mytime + """.stripMargin) + sql( + s""" + | CREATE DATAMAP agg0_day ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='mytime', + | 'DAY_GRANULARITY'='1') + | AS SELECT mytime, SUM(age) FROM mainTable + | GROUP BY mytime + """.stripMargin) + sql( + s""" + | CREATE DATAMAP agg0_month ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='mytime', + | 'MONTH_GRANULARITY'='1') + | AS SELECT mytime, SUM(age) FROM mainTable + | GROUP BY mytime + """.stripMargin) + sql( + s""" + | CREATE DATAMAP agg0_year ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='mytime', + | 'YEAR_GRANULARITY'='1') + | AS SELECT mytime, SUM(age) FROM mainTable + | GROUP BY mytime + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/timeseriestest.csv' into table mainTable") } @@ -100,6 +164,54 @@ class TestTimeseriesTableSelection extends QueryTest with BeforeAndAfterAll { preAggTableValidator(df.queryExecution.analyzed,"maintable") } + test("test timeseries table selection 14: Granularity only support 1 and throw Exception") { + val e = intercept[MalformedCarbonCommandException] { + sql( + s""" + | CREATE DATAMAP agg3_second ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='dataTime', + | 'HOUR_GRANULARITY'='2') + | AS SELECT dataTime, SUM(age) FROM mainTable + | GROUP BY dataTime + """.stripMargin) + } + assert(e.getMessage.contains("Granularity only support 1")) + } + + test("test timeseries table selection 15: Granularity only support 1 and throw Exception") { + val e = intercept[MalformedCarbonCommandException] { + sql( + s""" + | CREATE DATAMAP agg3_second ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='dataTime', + | 'HOUR_GRANULARITY'='1.5') + | AS SELECT dataTime, SUM(age) FROM mainTable + | GROUP BY dataTime + """.stripMargin) + } + assert(e.getMessage.contains("Granularity only support 1")) + } + + test("test timeseries table selection 16: Granularity only support 1 and throw Exception") { + val e = intercept[MalformedCarbonCommandException] { + sql( + s""" + | CREATE DATAMAP agg3_second ON TABLE mainTable + | USING '$timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='dataTime', + | 'HOUR_GRANULARITY'='-1') + | AS SELECT dataTime, SUM(age) FROM mainTable + | GROUP BY dataTime + """.stripMargin) + } + assert(e.getMessage.contains("Granularity only support 1")) + } + def preAggTableValidator(plan: LogicalPlan, actualTableName: String) : Unit ={ var isValidPlan = false plan.transform { http://git-wip-us.apache.org/repos/asf/carbondata/blob/181c280b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala index a0ea317..0c38239 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala @@ -27,6 +27,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.CarbonMetadata import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.spark.exception.MalformedDataMapCommandException class TestDataMapCommand extends QueryTest with BeforeAndAfterAll { @@ -39,37 +40,42 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll { sql("create table datamaptest (a string, b string, c string) stored by 'carbondata'") } - - test("test datamap create") { - sql("create datamap datamap1 on table datamaptest using 'new.class'") - val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest") - assert(table != null) - val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList - assert(dataMapSchemaList.size() == 1) - assert(dataMapSchemaList.get(0).getDataMapName.equals("datamap1")) - assert(dataMapSchemaList.get(0).getClassName.equals("new.class")) + val newClass = "org.apache.spark.sql.CarbonSource" + + test("test datamap create: don't support using class, only support short name") { + intercept[MalformedDataMapCommandException] { + sql(s"CREATE DATAMAP datamap1 ON TABLE datamaptest USING '$newClass'") + val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest") + assert(table != null) + val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList + assert(dataMapSchemaList.size() == 1) + assert(dataMapSchemaList.get(0).getDataMapName.equals("datamap1")) + assert(dataMapSchemaList.get(0).getClassName.equals(newClass)) + } } - test("test datamap create with dmproperties") { - sql("create datamap datamap2 on table datamaptest using 'new.class' dmproperties('key'='value')") - val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest") - assert(table != null) - val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList - assert(dataMapSchemaList.size() == 2) - assert(dataMapSchemaList.get(1).getDataMapName.equals("datamap2")) - assert(dataMapSchemaList.get(1).getClassName.equals("new.class")) - assert(dataMapSchemaList.get(1).getProperties.get("key").equals("value")) + test("test datamap create with dmproperties: don't support using class") { + intercept[MalformedDataMapCommandException] { + sql(s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')") + val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest") + assert(table != null) + val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList + assert(dataMapSchemaList.size() == 2) + assert(dataMapSchemaList.get(1).getDataMapName.equals("datamap2")) + assert(dataMapSchemaList.get(1).getClassName.equals(newClass)) + assert(dataMapSchemaList.get(1).getProperties.get("key").equals("value")) + } } - test("test datamap create with existing name") { - intercept[Exception] { + test("test datamap create with existing name: don't support using class") { + intercept[MalformedDataMapCommandException] { sql( - "create datamap datamap2 on table datamaptest using 'new.class' dmproperties('key'='value')") + s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')") + val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest") + assert(table != null) + val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList + assert(dataMapSchemaList.size() == 2) } - val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest") - assert(table != null) - val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList - assert(dataMapSchemaList.size() == 2) } test("test datamap create with preagg") { @@ -79,10 +85,10 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll { val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest") assert(table != null) val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList - assert(dataMapSchemaList.size() == 3) - assert(dataMapSchemaList.get(2).getDataMapName.equals("datamap3")) - assert(dataMapSchemaList.get(2).getProperties.get("key").equals("value")) - assert(dataMapSchemaList.get(2).getChildSchema.getTableName.equals("datamaptest_datamap3")) + assert(dataMapSchemaList.size() == 1) + assert(dataMapSchemaList.get(0).getDataMapName.equals("datamap3")) + assert(dataMapSchemaList.get(0).getProperties.get("key").equals("value")) + assert(dataMapSchemaList.get(0).getChildSchema.getTableName.equals("datamaptest_datamap3")) } test("check hivemetastore after drop datamap") { @@ -110,7 +116,7 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll { } } - test("drop the table having pre-aggregate"){ + test("drop the table having pre-aggregate") { try { CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, @@ -140,13 +146,24 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll { test("test datamap create with preagg with duplicate name") { intercept[Exception] { sql( - "create datamap datamap2 on table datamaptest using 'preaggregate' dmproperties('key'='value') as select count(a) from datamaptest") - + s""" + | CREATE DATAMAP datamap2 ON TABLE datamaptest + | USING 'preaggregate' + | DMPROPERTIES('key'='value') + | AS SELECT COUNT(a) FROM datamaptest + """.stripMargin) + sql( + s""" + | CREATE DATAMAP datamap2 ON TABLE datamaptest + | USING 'preaggregate' + | DMPROPERTIES('key'='value') + | AS SELECT COUNT(a) FROM datamaptest + """.stripMargin) } val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest") assert(table != null) val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList - assert(dataMapSchemaList.size() == 3) + assert(dataMapSchemaList.size() == 2) } test("test datamap drop with preagg") { @@ -157,25 +174,29 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll { val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest") assert(table != null) val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList - assert(dataMapSchemaList.size() == 3) + assert(dataMapSchemaList.size() == 2) } - test("test show datamap without preaggregate") { - sql("drop table if exists datamapshowtest") - sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'") - sql("create datamap datamap1 on table datamapshowtest using 'new.class' dmproperties('key'='value')") - sql("create datamap datamap2 on table datamapshowtest using 'new.class' dmproperties('key'='value')") - checkExistence(sql("show datamap on table datamapshowtest"), true, "datamap1", "datamap2", "(NA)", "new.class") + test("test show datamap without preaggregate: don't support using class") { + intercept[MalformedDataMapCommandException] { + sql("drop table if exists datamapshowtest") + sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'") + sql(s"CREATE DATAMAP datamap1 ON TABLE datamapshowtest USING '$newClass' DMPROPERTIES('key'='value')") + sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' DMPROPERTIES('key'='value')") + checkExistence(sql("SHOW DATAMAP ON TABLE datamapshowtest"), true, "datamap1", "datamap2", "(NA)", newClass) + } } - test("test show datamap with preaggregate") { - sql("drop table if exists datamapshowtest") - sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'") - sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest") - sql("create datamap datamap2 on table datamapshowtest using 'new.class' dmproperties('key'='value')") - val frame = sql("show datamap on table datamapshowtest") - assert(frame.collect().length == 2) - checkExistence(frame, true, "datamap1", "datamap2", "(NA)", "new.class", "default.datamapshowtest_datamap1") + test("test show datamap with preaggregate: don't support using class") { + intercept[MalformedDataMapCommandException] { + sql("drop table if exists datamapshowtest") + sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'") + sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest") + sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' DMPROPERTIES('key'='value')") + val frame = sql("show datamap on table datamapshowtest") + assert(frame.collect().length == 2) + checkExistence(frame, true, "datamap1", "datamap2", "(NA)", newClass, "default.datamapshowtest_datamap1") + } } test("test show datamap with no datamap") { @@ -184,15 +205,17 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll { assert(sql("show datamap on table datamapshowtest").collect().length == 0) } - test("test show datamap after dropping datamap") { - sql("drop table if exists datamapshowtest") - sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'") - sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest") - sql("create datamap datamap2 on table datamapshowtest using 'new.class' dmproperties('key'='value')") - sql("drop datamap datamap1 on table datamapshowtest") - val frame = sql("show datamap on table datamapshowtest") - assert(frame.collect().length == 1) - checkExistence(frame, true, "datamap2", "(NA)", "new.class") + test("test show datamap after dropping datamap: don't support using class") { + intercept[MalformedDataMapCommandException] { + sql("drop table if exists datamapshowtest") + sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'") + sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest") + sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' DMPROPERTIES('key'='value')") + sql("drop datamap datamap1 on table datamapshowtest") + val frame = sql("show datamap on table datamapshowtest") + assert(frame.collect().length == 1) + checkExistence(frame, true, "datamap2", "(NA)", newClass) + } } test("test if preaggregate load is successfull for hivemetastore") { @@ -217,7 +240,7 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll { sql("CREATE TABLE uniqdata(CUST_ID int,CUST_NAME String,ACTIVE_EMUI_VERSION string,DOB timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED BY 'org.apache.carbondata.format'") sql("insert into uniqdata select 9000,'CUST_NAME_00000','ACTIVE_EMUI_VERSION_00000','1970-01-01 01:00:03','1970-01-01 02:00:03',123372036854,-223372036854,12345678901.1234000000,22345678901.1234000000,11234567489.7976000000,-11234567489.7976000000,1") sql("create datamap uniqdata_agg on table uniqdata using 'preaggregate' as select min(DECIMAL_COLUMN1) from uniqdata group by DECIMAL_COLUMN1") - checkAnswer(sql("select * from uniqdata_uniqdata_agg"),Seq(Row(12345678901.1234000000, 12345678901.1234000000))) + checkAnswer(sql("select * from uniqdata_uniqdata_agg"), Seq(Row(12345678901.1234000000, 12345678901.1234000000))) sql("drop datamap if exists uniqdata_agg on table uniqdata") } @@ -229,16 +252,16 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll { sql("insert into main select 10,11,'amy',12") sql("insert into main select 10,11,'amy',14") sql("create datamap preagg on table main " + - "using 'preaggregate' " + - s"dmproperties ('path'='$path') " + - "as select name,avg(salary) from main group by name") + "using 'preaggregate' " + + s"dmproperties ('path'='$path') " + + "as select name,avg(salary) from main group by name") assertResult(true)(new File(path).exists()) assertResult(true)(new File(s"${CarbonTablePath.getSegmentPath(path, "0")}") - .list(new FilenameFilter { - override def accept(dir: File, name: String): Boolean = { - name.contains(CarbonCommonConstants.FACT_FILE_EXT) - } - }).length > 0) + .list(new FilenameFilter { + override def accept(dir: File, name: String): Boolean = { + name.contains(CarbonCommonConstants.FACT_FILE_EXT) + } + }).length > 0) checkAnswer(sql("select name,avg(salary) from main group by name"), Row("amy", 13.0)) checkAnswer(sql("select * from main_preagg"), Row("amy", 26, 2)) sql("drop datamap preagg on table main") http://git-wip-us.apache.org/repos/asf/carbondata/blob/181c280b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedDataMapCommandException.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedDataMapCommandException.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedDataMapCommandException.java new file mode 100644 index 0000000..a05d8e6 --- /dev/null +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedDataMapCommandException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.exception; + +/** + * Throw exception when using illegal argument + */ +public class MalformedDataMapCommandException extends MalformedCarbonCommandException { + /** + * default serial version ID. + */ + private static final long serialVersionUID = 1L; + + public MalformedDataMapCommandException(String msg) { + super(msg); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/181c280b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala index 8e00635..c4d32b4 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala @@ -16,18 +16,15 @@ */ package org.apache.spark.sql.execution.command.datamap -import scala.collection.JavaConverters._ - import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.command.preaaggregate.{CreatePreAggregateTableCommand, PreAggregateUtil} +import org.apache.spark.sql.execution.command.preaaggregate.CreatePreAggregateTableCommand import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.metadata.schema.table.DataMapSchema -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException +import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider._ +import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, MalformedDataMapCommandException} /** * Below command class will be used to create datamap on table @@ -41,70 +38,63 @@ case class CarbonCreateDataMapCommand( queryString: Option[String]) extends AtomicRunnableCommand { - var createPreAggregateTableCommands: Seq[CreatePreAggregateTableCommand] = _ + var createPreAggregateTableCommands: CreatePreAggregateTableCommand = _ override def processMetadata(sparkSession: SparkSession): Seq[Row] = { // since streaming segment does not support building index and pre-aggregate yet, // so streaming table does not support create datamap val carbonTable = - CarbonEnv.getCarbonTable(tableIdentifier.database, tableIdentifier.table)(sparkSession) + CarbonEnv.getCarbonTable(tableIdentifier.database, tableIdentifier.table)(sparkSession) if (carbonTable.isStreamingTable) { throw new MalformedCarbonCommandException("Streaming table does not support creating datamap") } val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - if (dmClassName.equals("org.apache.carbondata.datamap.AggregateDataMapHandler") || - dmClassName.equalsIgnoreCase("preaggregate")) { - val timeHierarchyString = dmproperties.get(CarbonCommonConstants.TIMESERIES_HIERARCHY) - createPreAggregateTableCommands = if (timeHierarchyString.isDefined) { + + if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) || + dmClassName.equalsIgnoreCase(TIMESERIES.toString)) { + TimeSeriesUtil.validateTimeSeriesGranularity(dmproperties, dmClassName) + createPreAggregateTableCommands = if (dmClassName.equalsIgnoreCase(TIMESERIES.toString)) { val details = TimeSeriesUtil - .validateAndGetTimeSeriesHierarchyDetails( - timeHierarchyString.get) - val updatedDmProperties = dmproperties - CarbonCommonConstants.TIMESERIES_HIERARCHY - details.map { f => - CreatePreAggregateTableCommand(dataMapName + '_' + f._1, - tableIdentifier, - dmClassName, - updatedDmProperties, - queryString.get, - Some(f._1)) - }.toSeq + .getTimeSeriesGranularityDetails(dmproperties, dmClassName) + val updatedDmProperties = dmproperties - details._1 + CreatePreAggregateTableCommand(dataMapName, + tableIdentifier, + dmClassName, + updatedDmProperties, + queryString.get, + Some(details._1)) } else { - Seq(CreatePreAggregateTableCommand( + CreatePreAggregateTableCommand( dataMapName, tableIdentifier, dmClassName, dmproperties, queryString.get - )) + ) } - createPreAggregateTableCommands.flatMap(_.processMetadata(sparkSession)) + createPreAggregateTableCommands.processMetadata(sparkSession) } else { - val dataMapSchema = new DataMapSchema(dataMapName, dmClassName) - dataMapSchema.setProperties(new java.util.HashMap[String, String](dmproperties.asJava)) - val dbName = CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession) - // upadting the parent table about dataschema - PreAggregateUtil.updateMainTable(dbName, tableIdentifier.table, dataMapSchema, sparkSession) + throw new MalformedDataMapCommandException("Unknown data map type " + dmClassName) } - LOGGER.audit(s"DataMap $dataMapName successfully added to Table ${ tableIdentifier.table }") + LOGGER.audit(s"DataMap $dataMapName successfully added to Table ${tableIdentifier.table}") Seq.empty } override def processData(sparkSession: SparkSession): Seq[Row] = { - if (dmClassName.equals("org.apache.carbondata.datamap.AggregateDataMapHandler") || - dmClassName.equalsIgnoreCase("preaggregate")) { - createPreAggregateTableCommands.flatMap(_.processData(sparkSession)) + if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) || + dmClassName.equalsIgnoreCase(TIMESERIES.toString)) { + createPreAggregateTableCommands.processData(sparkSession) } else { - Seq.empty + throw new MalformedDataMapCommandException("Unknown data map type " + dmClassName) } } override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = { - if (dmClassName.equals("org.apache.carbondata.datamap.AggregateDataMapHandler") || - dmClassName.equalsIgnoreCase("preaggregate")) { - val timeHierarchyString = dmproperties.get(CarbonCommonConstants.TIMESERIES_HIERARCHY) - createPreAggregateTableCommands.flatMap(_.undoMetadata(sparkSession, exception)) + if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) || + dmClassName.equalsIgnoreCase(TIMESERIES.toString)) { + createPreAggregateTableCommands.undoMetadata(sparkSession, exception) } else { - Seq.empty + throw new MalformedDataMapCommandException("Unknown data map type " + dmClassName) } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/181c280b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala index a75a06f..dbbf90c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala @@ -104,9 +104,9 @@ case class CreatePreAggregateTableCommand( TimeSeriesUtil.validateTimeSeriesEventTime(dmProperties, parentTable) TimeSeriesUtil.validateEventTimeColumnExitsInSelect( fieldRelationMap, - dmProperties.get(CarbonCommonConstants.TIMESERIES_EVENTTIME).get) + dmProperties.get(TimeSeriesUtil.TIMESERIES_EVENTTIME).get) TimeSeriesUtil.updateTimeColumnSelect(fieldRelationMap, - dmProperties.get(CarbonCommonConstants.TIMESERIES_EVENTTIME).get, + dmProperties.get(TimeSeriesUtil.TIMESERIES_EVENTTIME).get, timeSeriesFunction.get) } tableModel.parentTable = Some(parentTable) http://git-wip-us.apache.org/repos/asf/carbondata/blob/181c280b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala index 4fe9df0..987d4fe 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala @@ -18,32 +18,33 @@ package org.apache.spark.sql.execution.command.timeseries import org.apache.spark.sql.execution.command.{DataMapField, Field} -import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.datatype.DataTypes +import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES +import org.apache.carbondata.core.metadata.schema.datamap.Granularity import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.preagg.TimeSeriesUDF -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException +import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, MalformedDataMapCommandException} /** * Utility class for time series to keep */ object TimeSeriesUtil { + val TIMESERIES_EVENTTIME = "event_time" + /** * Below method will be used to validate whether column mentioned in time series * is timestamp column or not * - * @param dmproperties - * data map properties - * @param parentTable - * parent table + * @param dmproperties data map properties + * @param parentTable parent table * @return whether time stamp column */ def validateTimeSeriesEventTime(dmproperties: Map[String, String], parentTable: CarbonTable) { - val eventTime = dmproperties.get(CarbonCommonConstants.TIMESERIES_EVENTTIME) + val eventTime = dmproperties.get(TIMESERIES_EVENTTIME) if (!eventTime.isDefined) { - throw new MalformedCarbonCommandException("Eventtime not defined in time series") + throw new MalformedCarbonCommandException("event_time not defined in time series") } else { val carbonColumn = parentTable.getColumnByName(parentTable.getTableName, eventTime.get) if (carbonColumn.getDataType != DataTypes.TIMESTAMP) { @@ -55,13 +56,79 @@ object TimeSeriesUtil { } /** + * validate TimeSeries Granularity + * + * @param dmProperties datamap properties + * @param dmClassName datamap class name + * @return whether find only one granularity + */ + def validateTimeSeriesGranularity( + dmProperties: Map[String, String], + dmClassName: String): Boolean = { + var isFound = false + + // 1. granularity only support one + for (granularity <- Granularity.values()) { + if (dmProperties.get(granularity.getName).isDefined) { + if (isFound) { + throw new MalformedDataMapCommandException( + s"Only one granularity level can be defined") + } else { + isFound = true + } + } + } + + // 2. check whether timeseries and granularity match + if (isFound && !dmClassName.equalsIgnoreCase(TIMESERIES.toString)) { + throw new MalformedDataMapCommandException( + s"${TIMESERIES.toString} keyword missing") + } else if (!isFound && dmClassName.equalsIgnoreCase(TIMESERIES.toString)) { + throw new MalformedDataMapCommandException( + s"${TIMESERIES.toString} should define time granularity") + } else if (isFound) { + true + } else { + false + } + } + + /** + * get TimeSeries Granularity key and value + * check the value + * + * TODO:we will support value not only equal to 1 in the future + * + * @param dmProperties datamap properties + * @param dmClassName datamap class name + * @return key and value tuple + */ + def getTimeSeriesGranularityDetails( + dmProperties: Map[String, String], + dmClassName: String): (String, String) = { + + val defaultValue = "1" + for (granularity <- Granularity.values()) { + if (dmProperties.get(granularity.getName).isDefined && + dmProperties.get(granularity.getName).get.equalsIgnoreCase(defaultValue)) { + return (granularity.toString.toLowerCase, dmProperties.get(granularity.getName).get) + } + } + + throw new MalformedDataMapCommandException( + s"Granularity only support $defaultValue") + } + + /** * Below method will be used to validate the hierarchy of time series and its value * validation will be done whether hierarchy order is proper or not and hierarchy level * value + * TODO: we should remove this method * * @param timeSeriesHierarchyDetails * time series hierarchy string */ + @deprecated def validateAndGetTimeSeriesHierarchyDetails(timeSeriesHierarchyDetails: String): Array[ (String, String)] = { val updatedtimeSeriesHierarchyDetails = timeSeriesHierarchyDetails.toLowerCase http://git-wip-us.apache.org/repos/asf/carbondata/blob/181c280b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala index 0f934cb..eb52910 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala @@ -260,4 +260,12 @@ class CarbonDataSourceSuite extends Spark2QueryTest with BeforeAndAfterAll { sql("drop table if exists carbon_test") assert(exception.contains("Table creation failed. Table name cannot contain blank space")) } + + test("test create table: using") { + sql("DROP TABLE IF EXISTS usingTable") + val e: Exception = intercept[ClassNotFoundException] { + sql("CREATE TABLE usingTable(name STRING) USING abc") + } + assert(e.getMessage.contains("Failed to find data source: abc")) + } }