[CARBONDATA-2088][CARBONDATA-1516] Optimize syntax for creating timeseries 
pre-aggregate table

change using 'timeseries' instead of using preaggregate for creating timeseries 
pre-aggregate table

change timeseries.eventTime and hour_granularity and so on
granularity only support one

It should throw UnsupportDataMapException if don't use timeseries or 
preaggregate to create datamap

This closes #1865


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/181c280b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/181c280b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/181c280b

Branch: refs/heads/fgdatamap
Commit: 181c280b7d33ac5e4029bd935d6260b0fe79a2bf
Parents: b421c24
Author: xubo245 <601450...@qq.com>
Authored: Fri Jan 26 17:47:46 2018 +0800
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Tue Jan 30 17:25:19 2018 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   4 -
 .../schema/datamap/DataMapProvider.java         |  32 +++
 .../metadata/schema/datamap/Granularity.java    |  46 +++
 .../preaggregate/TestPreAggCreateCommand.scala  |  60 +++-
 .../TestPreAggregateTableSelection.scala        |  26 +-
 .../timeseries/TestTimeSeriesCreateTable.scala  | 280 ++++++++++++++++---
 .../timeseries/TestTimeseriesCompaction.scala   |  67 ++++-
 .../timeseries/TestTimeseriesDataLoad.scala     | 142 +++++++++-
 .../TestTimeseriesTableSelection.scala          | 114 +++++++-
 .../testsuite/datamap/TestDataMapCommand.scala  | 155 +++++-----
 .../MalformedDataMapCommandException.java       |  32 +++
 .../datamap/CarbonCreateDataMapCommand.scala    |  72 ++---
 .../CreatePreAggregateTableCommand.scala        |   4 +-
 .../command/timeseries/TimeSeriesUtil.scala     |  83 +++++-
 .../carbondata/CarbonDataSourceSuite.scala      |   8 +
 15 files changed, 943 insertions(+), 182 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/181c280b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index f46feef..cf95dd9 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1543,10 +1543,6 @@ public final class CarbonCommonConstants {
    */
   public static final long HANDOFF_SIZE_DEFAULT = 1024L * 1024 * 1024;
 
-  public static final String TIMESERIES_EVENTTIME = "timeseries.eventtime";
-
-  public static final String TIMESERIES_HIERARCHY = "timeseries.hierarchy";
-
   /**
    * It allows queries on hive metastore directly along with filter 
information, otherwise first
    * fetches all partitions from hive and apply filters on it.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/181c280b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapProvider.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapProvider.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapProvider.java
new file mode 100644
index 0000000..65578b1
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapProvider.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.metadata.schema.datamap;
+
+/**
+ * type for create datamap
+ * The syntax of datamap creation is as follows.
+ * CREATE DATAMAP IF NOT EXISTS dataMapName ON TABLE tableName USING 
'DataMapProvider'
+ * DMPROPERTIES('KEY'='VALUE') AS SELECT COUNT(COL1) FROM tableName
+ *
+ * Please refer {{org.apache.spark.sql.parser.CarbonSpark2SqlParser}}
+ */
+
+public enum DataMapProvider {
+  PREAGGREGATE,
+  TIMESERIES;
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/181c280b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/Granularity.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/Granularity.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/Granularity.java
new file mode 100644
index 0000000..d6aefb6
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/Granularity.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.metadata.schema.datamap;
+
+/**
+ * type for create datamap
+ * The syntax of datamap creation is as follows.
+ * CREATE DATAMAP IF NOT EXISTS dataMapName ON TABLE tableName USING 
'DataMapProvider'
+ * DMPROPERTIES('KEY'='VALUE') AS SELECT COUNT(COL1) FROM tableName
+ *
+ * Please refer {{org.apache.spark.sql.parser.CarbonSpark2SqlParser}}
+ */
+
+public enum Granularity {
+  YEAR("year_granularity"),
+  MONTH("month_granularity"),
+  DAY("day_granularity"),
+  HOUR("hour_granularity"),
+  MINUTE("minute_granularity"),
+  SECOND("second_granularity");
+  private String name;
+
+  Granularity(String name) {
+    this.name = name;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/181c280b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
index 755a449..d3f0ff8 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -1,15 +1,18 @@
 package org.apache.carbondata.integration.spark.testsuite.preaggregate
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.sql.CarbonDatasourceHadoopRelation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
-import scala.collection.JavaConverters._
 
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import 
org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
 
@@ -212,6 +215,60 @@ class TestPreAggCreateCommand extends QueryTest with 
BeforeAndAfterAll {
     sql("drop datamap agg0 on table maintable")
   }
 
+  val timeSeries = TIMESERIES.toString
+
+  test("test PreAggregate table selection: create with preaggregate and 
hierarchy") {
+    sql("DROP TABLE IF EXISTS maintabletime")
+    sql(
+      """
+        | CREATE TABLE maintabletime(year INT,month INT,name STRING,salary 
INT,dob STRING)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES(
+        |   'SORT_SCOPE'='Global_sort',
+        |   'TABLE_BLOCKSIZE'='23',
+        |   'SORT_COLUMNS'='month,year,name')
+      """.stripMargin)
+    sql("INSERT INTO maintabletime SELECT 10,11,'x',12,'2014-01-01 00:00:00'")
+    sql(
+      s"""
+         | CREATE DATAMAP agg0 ON TABLE maintabletime
+         | USING 'preaggregate'
+         | AS SELECT dob,name FROM maintabletime
+         | GROUP BY dob,name
+       """.stripMargin)
+    val e = intercept[MalformedCarbonCommandException] {
+      sql(
+        s"""
+           | CREATE DATAMAP agg1 ON TABLE maintabletime
+           | USING 'preaggregate'
+           | DMPROPERTIES (
+           |  'EVENT_TIME'='dob',
+           |  'SECOND_GRANULARITY'='1')
+           | AS SELECT dob,name FROM maintabletime
+           | GROUP BY dob,name
+       """.stripMargin)
+    }
+    assert(e.getMessage.contains(s"$timeSeries keyword missing"))
+    sql("DROP TABLE IF EXISTS maintabletime")
+  }
+
+  test("test pre agg create table 21: using") {
+    sql("DROP DATAMAP agg0 ON TABLE maintable")
+
+    val e: Exception = intercept[Exception] {
+      sql(
+        """
+          | CREATE DATAMAP agg0 ON TABLE mainTable
+          | USING 'abc'
+          | AS SELECT column3, SUM(column3),column5, SUM(column5)
+          | FROM maintable
+          | GROUP BY column3,column5,column2
+        """.stripMargin)
+    }
+    assert(e.getMessage.contains(
+      s"Unknown data map type abc"))
+    sql("DROP DATAMAP agg0 ON TABLE maintable")
+  }
 
   def getCarbontable(plan: LogicalPlan) : CarbonTable ={
     var carbonTable : CarbonTable = null
@@ -239,5 +296,6 @@ class TestPreAggCreateCommand extends QueryTest with 
BeforeAndAfterAll {
     sql("drop table if exists PreAggMain")
     sql("drop table if exists PreAggMain1")
     sql("drop table if exists PreAggMain2")
+    sql("drop table if exists maintabletime")
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/181c280b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
index 17d95ef..f9ac354 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
@@ -23,6 +23,8 @@ import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, 
Row}
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import 
org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
+
 class TestPreAggregateTableSelection extends QueryTest with BeforeAndAfterAll {
 
   override def beforeAll: Unit = {
@@ -267,6 +269,8 @@ class TestPreAggregateTableSelection extends QueryTest with 
BeforeAndAfterAll {
     preAggTableValidator(df.queryExecution.analyzed, "maintable")
   }
 
+  val timeSeries = TIMESERIES.toString
+
 test("test PreAggregate table selection with timeseries and normal together") {
     sql("drop table if exists maintabletime")
     sql(
@@ -277,17 +281,26 @@ test("test PreAggregate table selection with timeseries 
and normal together") {
     sql(
       "create datamap agg0 on table maintabletime using 'preaggregate' as 
select dob,name from " +
       "maintabletime group by dob,name")
-    sql(
-      "create datamap agg1 on table maintabletime using 'preaggregate' 
DMPROPERTIES ('timeseries" +
-      ".eventTime'='dob', 
'timeseries.hierarchy'='hour=1,day=1,month=1,year=1') as select dob," +
-      "name from maintabletime group by dob,name")
-    val df = sql("select timeseries(dob,'year') from maintabletime group by 
timeseries(dob,'year')")
+
+  sql(
+    s"""
+       | CREATE DATAMAP agg1_year ON TABLE maintabletime
+       | USING '$timeSeries'
+       | DMPROPERTIES (
+       | 'EVENT_TIME'='dob',
+       | 'YEAR_GRANULARITY'='1')
+       | AS SELECT dob, name FROM maintabletime
+       | GROUP BY dob,name
+       """.stripMargin)
+
+    val df = sql("SELECT timeseries(dob,'year') FROM maintabletime GROUP BY 
timeseries(dob,'year')")
     preAggTableValidator(df.queryExecution.analyzed, "maintabletime_agg1_year")
+  sql("DROP TABLE IF EXISTS maintabletime")
 
   }
 
   test("test table selection when unsupported aggregate function is present") {
-    sql("drop table if exists maintabletime")
+    sql("DROP TABLE IF EXISTS maintabletime")
     sql(
       "create table maintabletime(year int,month int,name string,salary 
int,dob string) stored" +
       " by 'carbondata' 
tblproperties('sort_scope'='Global_sort','table_blocksize'='23'," +
@@ -302,6 +315,7 @@ test("test PreAggregate table selection with timeseries and 
normal together") {
   override def afterAll: Unit = {
     sql("drop table if exists mainTable")
     sql("drop table if exists lineitem")
+    sql("DROP TABLE IF EXISTS maintabletime")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/181c280b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
index c9041fa..0ca7cb9 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
@@ -17,14 +17,68 @@
 package org.apache.carbondata.integration.spark.testsuite.timeseries
 
 import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.{BeforeAndAfterAll, Ignore}
+import org.scalatest.BeforeAndAfterAll
+
+import 
org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
+import 
org.apache.carbondata.spark.exception.{MalformedDataMapCommandException, 
MalformedCarbonCommandException}
 
 class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
 
+  val timeSeries = TIMESERIES.toString
+
   override def beforeAll: Unit = {
-    sql("drop table if exists mainTable")
+    sql("DROP TABLE IF EXISTS mainTable")
     sql("CREATE TABLE mainTable(dataTime timestamp, name string, city string, 
age int) STORED BY 'org.apache.carbondata.format'")
-    sql("create datamap agg0 on table mainTable using 'preaggregate' 
DMPROPERTIES ('timeseries.eventTime'='dataTime', 
'timeseries.hierarchy'='second=1,hour=1,day=1,month=1,year=1') as select 
dataTime, sum(age) from mainTable group by dataTime")
+    sql(
+      s"""
+         | CREATE DATAMAP agg0_second ON TABLE mainTable
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         | 'EVENT_TIME'='dataTime',
+         | 'SECOND_GRANULARITY'='1')
+         | AS SELECT dataTime, SUM(age) FROM mainTable
+         | GROUP BY dataTime
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP agg0_hour ON TABLE mainTable
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         | 'EVENT_TIME'='dataTime',
+         | 'HOUR_GRANULARITY'='1')
+         | AS SELECT dataTime, SUM(age) FROM mainTable
+         | GROUP BY dataTime
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP agg0_day ON TABLE mainTable
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         | 'EVENT_TIME'='dataTime',
+         | 'day_granularity'='1')
+         | AS SELECT dataTime, SUM(age) FROM mainTable
+         | GROUP BY dataTime
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP agg0_month ON TABLE mainTable
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         | 'EVENT_TIME'='dataTime',
+         | 'month_granularity'='1')
+         | AS SELECT dataTime, SUM(age) FROM mainTable
+         | GROUP BY dataTime
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP agg0_year ON TABLE mainTable
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         | 'EVENT_TIME'='dataTime',
+         | 'year_granularity'='1')
+         | AS SELECT dataTime, SUM(age) FROM mainTable
+         | GROUP BY dataTime
+       """.stripMargin)
   }
 
   test("test timeseries create table Zero") {
@@ -53,28 +107,29 @@ class TestTimeSeriesCreateTable extends QueryTest with 
BeforeAndAfterAll {
     intercept[Exception] {
       sql(
         s"""
-           | create datamap agg0 on table mainTable
-           | using 'preaggregate'
+           | CREATE DATAMAP agg0_second ON TABLE mainTable
+           | USING '$timeSeries'
            | DMPROPERTIES (
-           |  'timeseries.eventTime'='dataTime',
-           |  'timeseries.hierarchy'='sec=1,hour=1,day=1,month=1,year=1')
-           | as select dataTime, sum(age) from mainTable
-           | group by dataTime
-         """.stripMargin)
+           | 'EVENT_TIME'='dataTime',
+           | 'SEC_GRANULARITY'='1')
+           | AS SELECT dataTime, SUM(age) FROM mainTable
+           | GROUP BY dataTime
+        """.stripMargin)
     }
   }
 
   test("test timeseries create table Six") {
     intercept[Exception] {
       sql(
-        """
-          | create datamap agg0 on table mainTable
-          | using 'preaggregate'
-          | DMPROPERTIES ('timeseries.eventTime'='dataTime', 
'timeseries.hierarchy'='hour=2')
-          | as select dataTime, sum(age) from mainTable
-          | group by dataTime
-        """.stripMargin)
-
+        s"""
+           | CREATE DATAMAP agg0_second ON TABLE mainTable
+           | USING '$timeSeries'
+           | DMPROPERTIES (
+           | 'EVENT_TIME'='dataTime',
+           | 'SECOND_GRANULARITY'='2')
+           | AS SELECT dataTime, SUM(age) FROM mainTable
+           | GROUP BY dataTime
+       """.stripMargin)
     }
   }
 
@@ -82,24 +137,24 @@ class TestTimeSeriesCreateTable extends QueryTest with 
BeforeAndAfterAll {
     intercept[Exception] {
       sql(
         s"""
-           | create datamap agg0 on table mainTable
-           | using 'preaggregate'
+           | CREATE DATAMAP agg0_second ON TABLE mainTable
+           | USING '$timeSeries'
            | DMPROPERTIES (
-           |    'timeseries.eventTime'='dataTime',
-           |    'timeseries.hierarchy'='hour=1,day=1,year=1,month=1')
-           | as select dataTime, sum(age) from mainTable
-           | group by dataTime
-         """.stripMargin)
+           | 'EVENT_TIME'='dataTime',
+           | 'SECOND_GRANULARITY'='1')
+           | AS SELECT dataTime, SUM(age) FROM mainTable
+           | GROUP BY dataTime
+       """.stripMargin)
       sql(
         s"""
-           | create datamap agg0 on table mainTable
-           | using 'preaggregate'
+           | CREATE DATAMAP agg0_second ON TABLE mainTable
+           | USING '$timeSeries'
            | DMPROPERTIES (
-           |    'timeseries.eventTime'='dataTime',
-           |    'timeseries.hierarchy'='hour=1,day=1,year=1,month=1')
-           | as select dataTime, sum(age) from mainTable
-           | group by dataTime
-         """.stripMargin)
+           | 'EVENT_TIME'='dataTime',
+           | 'SECOND_GRANULARITY'='1')
+           | AS SELECT dataTime, SUM(age) FROM mainTable
+           | GROUP BY dataTime
+       """.stripMargin)
     }
   }
 
@@ -107,12 +162,14 @@ class TestTimeSeriesCreateTable extends QueryTest with 
BeforeAndAfterAll {
     intercept[Exception] {
       sql(
         s"""
-           | create datamap agg0 on table mainTable
-           | using 'preaggregate'
-           | DMPROPERTIES ('timeseries.eventTime'='name', 
'timeseries.hierarchy'='hour=1,day=1,year=1,month=1')
-           | as select name, sum(age) from mainTable
-           | group by name
-         """.stripMargin)
+           | CREATE DATAMAP agg0_second ON TABLE mainTable
+           | USING '$timeSeries'
+           | DMPROPERTIES (
+           | 'EVENT_TIME'='name',
+           | 'SECOND_GRANULARITY'='1')
+           | AS SELECT dataTime, SUM(age) FROM mainTable
+           | GROUP BY dataTime
+       """.stripMargin)
     }
   }
 
@@ -120,18 +177,149 @@ class TestTimeSeriesCreateTable extends QueryTest with 
BeforeAndAfterAll {
     intercept[Exception] {
       sql(
         s"""
-           | create datamap agg0 on table mainTable
-           | using 'preaggregate'
+           | CREATE DATAMAP agg0_second ON TABLE mainTable
+           | USING '$timeSeries'
+           | DMPROPERTIES (
+           | 'EVENT_TIME'='name',
+           | 'SECOND_GRANULARITY'='1')
+           | AS SELECT dataTime, SUM(age) FROM mainTable
+           | GROUP BY dataTime
+       """.stripMargin)
+    }
+  }
+
+  test("test timeseries create table: USING") {
+    val e: Exception = intercept[MalformedDataMapCommandException] {
+      sql(
+        """CREATE DATAMAP agg1 ON TABLE mainTable
+          | USING 'abc'
+          | DMPROPERTIES (
+          |   'EVENT_TIME'='dataTime',
+          |   'SECOND_GRANULARITY'='1')
+          | AS SELECT dataTime, SUM(age) FROM mainTable
+          | GROUP BY dataTime
+        """.stripMargin)
+    }
+    assert(e.getMessage.equals("Unknown data map type abc"))
+  }
+
+  test("test timeseries create table: USING and catch 
MalformedCarbonCommandException") {
+    val e: Exception = intercept[MalformedCarbonCommandException] {
+      sql(
+        """CREATE DATAMAP agg1 ON TABLE mainTable
+          | USING 'abc'
+          | DMPROPERTIES (
+          |   'EVENT_TIME'='dataTime',
+          |   'SECOND_GRANULARITY'='1')
+          | AS SELECT dataTime, SUM(age) FROM mainTable
+          | GROUP BY dataTime
+        """.stripMargin)
+    }
+    assert(e.getMessage.equals("Unknown data map type abc"))
+  }
+
+  test("test timeseries create table: Only one granularity level can be 
defined 1") {
+    val e: Exception = intercept[MalformedCarbonCommandException] {
+      sql(
+        s"""
+           | CREATE DATAMAP agg0_second ON TABLE mainTable
+           | USING '$timeSeries'
+           | DMPROPERTIES (
+           | 'EVENT_TIME'='dataTime',
+           | 'SECOND_GRANULARITY'='1',
+           | 'HOUR_GRANULARITY'='1',
+           | 'DAY_GRANULARITY'='1',
+           | 'MONTH_GRANULARITY'='1',
+           | 'YEAR_GRANULARITY'='1')
+           | AS SELECT dataTime, SUM(age) FROM mainTable
+           | GROUP BY dataTime
+       """.stripMargin)
+    }
+    assert(e.getMessage.equals("Only one granularity level can be defined"))
+  }
+
+  test("test timeseries create table: Only one granularity level can be 
defined 2") {
+    val e: Exception = intercept[MalformedDataMapCommandException] {
+      sql(
+        s"""
+           | CREATE DATAMAP agg0_second ON TABLE mainTable
+           | USING '$timeSeries'
+           | DMPROPERTIES (
+           | 'EVENT_TIME'='dataTime',
+           | 'SECOND_GRANULARITY'='1',
+           | 'HOUR_GRANULARITY'='1')
+           | AS SELECT dataTime, SUM(age) FROM mainTable
+           | GROUP BY dataTime
+       """.stripMargin)
+    }
+    assert(e.getMessage.equals("Only one granularity level can be defined"))
+  }
+
+  test("test timeseries create table: Only one granularity level can be 
defined 3") {
+    val e: Exception = intercept[MalformedDataMapCommandException] {
+      sql(
+        s"""
+           | CREATE DATAMAP agg0_second ON TABLE mainTable
+           | USING '$timeSeries'
+           | DMPROPERTIES (
+           | 'EVENT_TIME'='dataTime',
+           | 'DAY_GRANULARITY'='1',
+           | 'HOUR_GRANULARITY'='1')
+           | AS SELECT dataTime, SUM(age) FROM mainTable
+           | GROUP BY dataTime
+       """.stripMargin)
+    }
+    assert(e.getMessage.equals("Only one granularity level can be defined"))
+  }
+
+  test("test timeseries create table: Granularity only support 1") {
+    val e = intercept[MalformedDataMapCommandException] {
+      sql(
+        s"""
+           | CREATE DATAMAP agg0_second ON TABLE mainTable
+           | USING '$timeSeries'
+           | DMPROPERTIES (
+           | 'EVENT_TIME'='dataTime',
+           | 'DAY_GRANULARITY'='2')
+           | AS SELECT dataTime, SUM(age) FROM mainTable
+           | GROUP BY dataTime
+       """.stripMargin)
+    }
+    assert(e.getMessage.equals("Granularity only support 1"))
+  }
+
+  test("test timeseries create table: Granularity only support 1 and throw 
Exception") {
+    val e = intercept[MalformedCarbonCommandException] {
+      sql(
+        s"""
+           | CREATE DATAMAP agg0_second ON TABLE mainTable
+           | USING '$timeSeries'
+           | DMPROPERTIES (
+           | 'EVENT_TIME'='dataTime',
+           | 'HOUR_GRANULARITY'='2')
+           | AS SELECT dataTime, SUM(age) FROM mainTable
+           | GROUP BY dataTime
+       """.stripMargin)
+    }
+    assert(e.getMessage.equals("Granularity only support 1"))
+  }
+
+  test("test timeseries create table: timeSeries should define time 
granularity") {
+    val e = intercept[MalformedDataMapCommandException] {
+      sql(
+        s"""
+           | CREATE DATAMAP agg0_second ON TABLE mainTable
+           | USING '$timeSeries'
            | DMPROPERTIES (
-           |    'timeseries.eventTime'='dataTime',
-           |    'timeseries.hierarchy'='hour=1,day=1,year=1,month=1')
-           | as select name, sum(age) from mainTable
-           | group by name
-         """.stripMargin)
+           | 'EVENT_TIME'='dataTime')
+           | AS SELECT dataTime, SUM(age) FROM mainTable
+           | GROUP BY dataTime
+       """.stripMargin)
     }
+    assert(e.getMessage.equals(s"$timeSeries should define time granularity"))
   }
 
   override def afterAll: Unit = {
-    sql("drop table if exists mainTable")
+    sql("DROP TABLE IF EXISTS mainTable")
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/181c280b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesCompaction.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesCompaction.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesCompaction.scala
index a410fe4..d66c402 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesCompaction.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesCompaction.scala
@@ -18,15 +18,18 @@ package 
org.apache.carbondata.integration.spark.testsuite.timeseries
 
 import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.util.SparkUtil4Test
-import org.scalatest.{BeforeAndAfterAll, Ignore}
+import org.scalatest.BeforeAndAfterAll
 import org.scalatest.Matchers._
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import 
org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
 import org.apache.carbondata.core.util.CarbonProperties
 
 class TestTimeseriesCompaction extends QueryTest with BeforeAndAfterAll {
 
   var isCompactionEnabled = false
+  val timeSeries = TIMESERIES.toString
+
   override def beforeAll: Unit = {
     SparkUtil4Test.createTaskMockUp(sqlContext)
     CarbonProperties.getInstance()
@@ -38,7 +41,67 @@ class TestTimeseriesCompaction extends QueryTest with 
BeforeAndAfterAll {
       .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
     sql("drop table if exists mainTable")
     sql("CREATE TABLE mainTable(mytime timestamp, name string, age int) STORED 
BY 'org.apache.carbondata.format'")
-    sql("create datamap agg0 on table mainTable using 'preaggregate' 
DMPROPERTIES ('timeseries.eventTime'='mytime', 
'timeseries.hierarchy'='second=1,minute=1,hour=1,day=1,month=1,year=1') as 
select mytime, sum(age) from mainTable group by mytime")
+    sql(
+      s"""
+         | CREATE DATAMAP agg0_second ON TABLE mainTable
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         | 'EVENT_TIME'='mytime',
+         | 'SECOND_GRANULARITY'='1')
+         | AS SELECT mytime, SUM(age) FROM mainTable
+         | GROUP BY mytime
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP agg0_minute ON TABLE mainTable
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         | 'EVENT_TIME'='mytime',
+         | 'MINUTE_GRANULARITY'='1')
+         | AS SELECT mytime, SUM(age) FROM mainTable
+         | GROUP BY mytime
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP agg0_hour ON TABLE mainTable
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         | 'EVENT_TIME'='mytime',
+         | 'HOUR_GRANULARITY'='1')
+         | AS SELECT mytime, SUM(age) FROM mainTable
+         | GROUP BY mytime
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP agg0_day ON TABLE mainTable
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         | 'EVENT_TIME'='mytime',
+         | 'DAY_GRANULARITY'='1')
+         | AS SELECT mytime, SUM(age) FROM mainTable
+         | GROUP BY mytime
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP agg0_month ON TABLE mainTable
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         | 'EVENT_TIME'='mytime',
+         | 'MONTH_GRANULARITY'='1')
+         | AS SELECT mytime, SUM(age) FROM mainTable
+         | GROUP BY mytime
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP agg0_year ON TABLE mainTable
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         | 'EVENT_TIME'='mytime',
+         | 'YEAR_GRANULARITY'='1')
+         | AS SELECT mytime, SUM(age) FROM mainTable
+         | GROUP BY mytime
+       """.stripMargin)
+
     sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/timeseriestest.csv' into 
table mainTable")
     sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/timeseriestest.csv' into 
table mainTable")
     sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/timeseriestest.csv' into 
table mainTable")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/181c280b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
index d25710c..8bcdfc9 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
@@ -24,10 +24,13 @@ import org.apache.spark.util.SparkUtil4Test
 import org.scalatest.{BeforeAndAfterAll, Ignore}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import 
org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
 import org.apache.carbondata.core.util.CarbonProperties
 
 class TestTimeseriesDataLoad extends QueryTest with BeforeAndAfterAll {
 
+  val timeSeries = TIMESERIES.toString
+
   override def beforeAll: Unit = {
     SparkUtil4Test.createTaskMockUp(sqlContext)
     CarbonProperties.getInstance()
@@ -35,11 +38,131 @@ class TestTimeseriesDataLoad extends QueryTest with 
BeforeAndAfterAll {
     sql("drop table if exists mainTable")
     sql("drop table if exists table_03")
     sql("CREATE TABLE mainTable(mytime timestamp, name string, age int) STORED 
BY 'org.apache.carbondata.format'")
-    sql("create datamap agg0 on table mainTable using 'preaggregate' 
DMPROPERTIES ('timeseries.eventTime'='mytime', 
'timeseries.hierarchy'='second=1,minute=1,hour=1,day=1,month=1,year=1') as 
select mytime, sum(age) from mainTable group by mytime")
+    sql(
+      s"""
+         | CREATE DATAMAP agg0_second ON TABLE mainTable
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         | 'EVENT_TIME'='mytime',
+         | 'SECOND_GRANULARITY'='1')
+         | AS SELECT mytime, SUM(age) FROM mainTable
+         | GROUP BY mytime
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP agg0_minute ON TABLE mainTable
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         | 'EVENT_TIME'='mytime',
+         | 'minute_granularity'='1')
+         | AS SELECT mytime, SUM(age) FROM mainTable
+         | GROUP BY mytime
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP agg0_hour ON TABLE mainTable
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         | 'EVENT_TIME'='mytime',
+         | 'HOUR_GRANULARITY'='1')
+         | AS SELECT mytime, SUM(age) FROM mainTable
+         | GROUP BY mytime
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP agg0_day ON TABLE mainTable
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         | 'EVENT_TIME'='mytime',
+         | 'DAY_GRANULARITY'='1')
+         | AS SELECT mytime, SUM(age) FROM mainTable
+         | GROUP BY mytime
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP agg0_month ON TABLE mainTable
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         | 'EVENT_TIME'='mytime',
+         | 'MONTH_GRANULARITY'='1')
+         | AS SELECT mytime, SUM(age) FROM mainTable
+         | GROUP BY mytime
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP agg0_year ON TABLE mainTable
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         | 'EVENT_TIME'='mytime',
+         | 'year_granularity'='1')
+         | AS SELECT mytime, SUM(age) FROM mainTable
+         | GROUP BY mytime
+       """.stripMargin)
+
     sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/timeseriestest.csv' into 
table mainTable")
     sql("CREATE TABLE table_03 (imei string,age int,mac string,productdate 
timestamp,updatedate timestamp,gamePointId double,contractid double ) STORED BY 
'org.apache.carbondata.format'")
     sql(s"LOAD DATA inpath '$resourcesPath/data_sort.csv' INTO table table_03 
options ('DELIMITER'=',', 
'QUOTECHAR'='','FILEHEADER'='imei,age,mac,productdate,updatedate,gamePointId,contractid')")
-    sql("create datamap ag1 on table table_03 using 'preaggregate' 
DMPROPERTIES ( 
'timeseries.eventtime'='productdate','timeseries.hierarchy'='second=1,minute=1,hour=1,day=1,month=1,year=1')as
 select productdate,mac,sum(age) from table_03 group by productdate,mac")
+
+    sql(
+      s"""
+         | CREATE DATAMAP ag1_second ON TABLE table_03
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         |    'EVENT_TIME'='productdate',
+         |    'SECOND_GRANULARITY'='1')
+         | AS SELECT productdate,mac,SUM(age) FROM table_03
+         | GROUP BY productdate,mac
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP ag1_minute ON TABLE table_03
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         |    'EVENT_TIME'='productdate',
+         |    'minute_granularity'='1')
+         | AS SELECT productdate,mac,SUM(age) FROM table_03
+         | GROUP BY productdate,mac
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP ag1_hour ON TABLE table_03
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         |   'EVENT_TIME'='productdate',
+         |    'HOUR_GRANULARITY'='1')
+         | AS SELECT productdate,mac,SUM(age) FROM table_03
+         | GROUP BY productdate,mac
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP ag1_day ON TABLE table_03
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         |    'EVENT_TIME'='productdate',
+         |    'DAY_GRANULARITY'='1')
+         | AS SELECT productdate,mac,SUM(age) FROM table_03
+         | GROUP BY productdate,mac
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP ag1_month ON TABLE table_03
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         |    'EVENT_TIME'='productdate',
+         |    'month_granularity'='1')
+         | AS SELECT productdate,mac,SUM(age) FROM table_03
+         | GROUP BY productdate,mac
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP ag1_year ON TABLE table_03
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         |    'EVENT_TIME'='productdate',
+         |    'year_granularity'='1')
+         | AS SELECT productdate,mac,SUM(age) FROM table_03
+         | GROUP BY productdate,mac
+       """.stripMargin)
 
   }
   test("test Year level timeseries data validation1 ") {
@@ -93,12 +216,21 @@ class TestTimeseriesDataLoad extends QueryTest with 
BeforeAndAfterAll {
         Row(Timestamp.valueOf("2016-02-23 01:02:50.0"),50)))
   }
 
-  test("test if timeseries load is successful on table creation") {
+  test("test if timeseries load is successful ON TABLE creation") {
     sql("drop table if exists mainTable")
     sql("CREATE TABLE mainTable(mytime timestamp, name string, age int) STORED 
BY 'org.apache.carbondata.format'")
     sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/timeseriestest.csv' into 
table mainTable")
-    sql("create datamap agg0 on table mainTable using 'preaggregate' 
DMPROPERTIES ('timeseries.eventTime'='mytime', 
'timeseries.hierarchy'='second=1,minute=1,hour=1,day=1,month=1,year=1') as 
select mytime, sum(age) from mainTable group by mytime")
-    checkAnswer( sql("select * from maintable_agg0_second"),
+    sql(
+      s"""
+         | CREATE DATAMAP agg0_second ON TABLE mainTable
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         | 'EVENT_TIME'='mytime',
+         | 'SECOND_GRANULARITY'='1')
+         | AS SELECT mytime, SUM(age) FROM mainTable
+         | GROUP BY mytime
+       """.stripMargin)
+    checkAnswer( sql("select * FROM maintable_agg0_second"),
       Seq(Row(Timestamp.valueOf("2016-02-23 01:01:30.0"),10),
         Row(Timestamp.valueOf("2016-02-23 01:01:40.0"),20),
         Row(Timestamp.valueOf("2016-02-23 01:01:50.0"),30),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/181c280b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
index a9d3965..3065952 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
@@ -24,14 +24,78 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.util.SparkUtil4Test
 import org.scalatest.BeforeAndAfterAll
 
+import 
org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 class TestTimeseriesTableSelection extends QueryTest with BeforeAndAfterAll {
 
+  val timeSeries = TIMESERIES.toString
+
   override def beforeAll: Unit = {
     SparkUtil4Test.createTaskMockUp(sqlContext)
     sql("drop table if exists mainTable")
     sql("CREATE TABLE mainTable(mytime timestamp, name string, age int) STORED 
BY 'org.apache.carbondata.format'")
-    sql("create datamap agg0 on table mainTable using 'preaggregate' 
DMPROPERTIES ('timeseries.eventTime'='mytime', 
'timeseries.hierarchy'='second=1,minute=1,hour=1,day=1,month=1,year=1') as 
select mytime, sum(age) from mainTable group by mytime")
+    sql(
+      s"""
+         | CREATE DATAMAP agg0_second ON TABLE mainTable
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         | 'EVENT_TIME'='mytime',
+         | 'SECOND_GRANULARITY'='1')
+         | AS SELECT mytime, SUM(age) FROM mainTable
+         | GROUP BY mytime
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP agg0_minute ON TABLE mainTable
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         | 'EVENT_TIME'='mytime',
+         | 'minute_granularity'='1')
+         | AS SELECT mytime, SUM(age) FROM mainTable
+         | GROUP BY mytime
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP agg0_hour ON TABLE mainTable
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         | 'EVENT_TIME'='mytime',
+         | 'HOUR_GRANULARITY'='1')
+         | AS SELECT mytime, SUM(age) FROM mainTable
+         | GROUP BY mytime
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP agg0_day ON TABLE mainTable
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         | 'EVENT_TIME'='mytime',
+         | 'DAY_GRANULARITY'='1')
+         | AS SELECT mytime, SUM(age) FROM mainTable
+         | GROUP BY mytime
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP agg0_month ON TABLE mainTable
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         | 'EVENT_TIME'='mytime',
+         | 'MONTH_GRANULARITY'='1')
+         | AS SELECT mytime, SUM(age) FROM mainTable
+         | GROUP BY mytime
+       """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP agg0_year ON TABLE mainTable
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         | 'EVENT_TIME'='mytime',
+         | 'YEAR_GRANULARITY'='1')
+         | AS SELECT mytime, SUM(age) FROM mainTable
+         | GROUP BY mytime
+       """.stripMargin)
+
     sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/timeseriestest.csv' into 
table mainTable")
   }
 
@@ -100,6 +164,54 @@ class TestTimeseriesTableSelection extends QueryTest with 
BeforeAndAfterAll {
     preAggTableValidator(df.queryExecution.analyzed,"maintable")
   }
 
+  test("test timeseries table selection 14: Granularity only support 1 and 
throw Exception") {
+    val e = intercept[MalformedCarbonCommandException] {
+      sql(
+        s"""
+           | CREATE DATAMAP agg3_second ON TABLE mainTable
+           | USING '$timeSeries'
+           | DMPROPERTIES (
+           | 'EVENT_TIME'='dataTime',
+           | 'HOUR_GRANULARITY'='2')
+           | AS SELECT dataTime, SUM(age) FROM mainTable
+           | GROUP BY dataTime
+       """.stripMargin)
+    }
+    assert(e.getMessage.contains("Granularity only support 1"))
+  }
+
+  test("test timeseries table selection 15: Granularity only support 1 and 
throw Exception") {
+    val e = intercept[MalformedCarbonCommandException] {
+      sql(
+        s"""
+           | CREATE DATAMAP agg3_second ON TABLE mainTable
+           | USING '$timeSeries'
+           | DMPROPERTIES (
+           | 'EVENT_TIME'='dataTime',
+           | 'HOUR_GRANULARITY'='1.5')
+           | AS SELECT dataTime, SUM(age) FROM mainTable
+           | GROUP BY dataTime
+       """.stripMargin)
+    }
+    assert(e.getMessage.contains("Granularity only support 1"))
+  }
+
+  test("test timeseries table selection 16: Granularity only support 1 and 
throw Exception") {
+    val e = intercept[MalformedCarbonCommandException] {
+      sql(
+        s"""
+           | CREATE DATAMAP agg3_second ON TABLE mainTable
+           | USING '$timeSeries'
+           | DMPROPERTIES (
+           | 'EVENT_TIME'='dataTime',
+           | 'HOUR_GRANULARITY'='-1')
+           | AS SELECT dataTime, SUM(age) FROM mainTable
+           | GROUP BY dataTime
+       """.stripMargin)
+    }
+    assert(e.getMessage.contains("Granularity only support 1"))
+  }
+
   def preAggTableValidator(plan: LogicalPlan, actualTableName: String) : Unit 
={
     var isValidPlan = false
     plan.transform {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/181c280b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
index a0ea317..0c38239 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
@@ -27,6 +27,7 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.spark.exception.MalformedDataMapCommandException
 
 class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
 
@@ -39,37 +40,42 @@ class TestDataMapCommand extends QueryTest with 
BeforeAndAfterAll {
     sql("create table datamaptest (a string, b string, c string) stored by 
'carbondata'")
   }
 
-
-  test("test datamap create") {
-    sql("create datamap datamap1 on table datamaptest using 'new.class'")
-    val table = CarbonMetadata.getInstance().getCarbonTable("default", 
"datamaptest")
-    assert(table != null)
-    val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
-    assert(dataMapSchemaList.size() == 1)
-    assert(dataMapSchemaList.get(0).getDataMapName.equals("datamap1"))
-    assert(dataMapSchemaList.get(0).getClassName.equals("new.class"))
+  val newClass = "org.apache.spark.sql.CarbonSource"
+
+  test("test datamap create: don't support using class, only support short 
name") {
+    intercept[MalformedDataMapCommandException] {
+      sql(s"CREATE DATAMAP datamap1 ON TABLE datamaptest USING '$newClass'")
+      val table = CarbonMetadata.getInstance().getCarbonTable("default", 
"datamaptest")
+      assert(table != null)
+      val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
+      assert(dataMapSchemaList.size() == 1)
+      assert(dataMapSchemaList.get(0).getDataMapName.equals("datamap1"))
+      assert(dataMapSchemaList.get(0).getClassName.equals(newClass))
+    }
   }
 
-  test("test datamap create with dmproperties") {
-    sql("create datamap datamap2 on table datamaptest using 'new.class' 
dmproperties('key'='value')")
-    val table = CarbonMetadata.getInstance().getCarbonTable("default", 
"datamaptest")
-    assert(table != null)
-    val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
-    assert(dataMapSchemaList.size() == 2)
-    assert(dataMapSchemaList.get(1).getDataMapName.equals("datamap2"))
-    assert(dataMapSchemaList.get(1).getClassName.equals("new.class"))
-    assert(dataMapSchemaList.get(1).getProperties.get("key").equals("value"))
+  test("test datamap create with dmproperties: don't support using class") {
+    intercept[MalformedDataMapCommandException] {
+      sql(s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' 
DMPROPERTIES('key'='value')")
+      val table = CarbonMetadata.getInstance().getCarbonTable("default", 
"datamaptest")
+      assert(table != null)
+      val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
+      assert(dataMapSchemaList.size() == 2)
+      assert(dataMapSchemaList.get(1).getDataMapName.equals("datamap2"))
+      assert(dataMapSchemaList.get(1).getClassName.equals(newClass))
+      assert(dataMapSchemaList.get(1).getProperties.get("key").equals("value"))
+    }
   }
 
-  test("test datamap create with existing name") {
-    intercept[Exception] {
+  test("test datamap create with existing name: don't support using class") {
+    intercept[MalformedDataMapCommandException] {
       sql(
-        "create datamap datamap2 on table datamaptest using 'new.class' 
dmproperties('key'='value')")
+        s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' 
DMPROPERTIES('key'='value')")
+      val table = CarbonMetadata.getInstance().getCarbonTable("default", 
"datamaptest")
+      assert(table != null)
+      val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
+      assert(dataMapSchemaList.size() == 2)
     }
-    val table = CarbonMetadata.getInstance().getCarbonTable("default", 
"datamaptest")
-    assert(table != null)
-    val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
-    assert(dataMapSchemaList.size() == 2)
   }
 
   test("test datamap create with preagg") {
@@ -79,10 +85,10 @@ class TestDataMapCommand extends QueryTest with 
BeforeAndAfterAll {
     val table = CarbonMetadata.getInstance().getCarbonTable("default", 
"datamaptest")
     assert(table != null)
     val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
-    assert(dataMapSchemaList.size() == 3)
-    assert(dataMapSchemaList.get(2).getDataMapName.equals("datamap3"))
-    assert(dataMapSchemaList.get(2).getProperties.get("key").equals("value"))
-    
assert(dataMapSchemaList.get(2).getChildSchema.getTableName.equals("datamaptest_datamap3"))
+    assert(dataMapSchemaList.size() == 1)
+    assert(dataMapSchemaList.get(0).getDataMapName.equals("datamap3"))
+    assert(dataMapSchemaList.get(0).getProperties.get("key").equals("value"))
+    
assert(dataMapSchemaList.get(0).getChildSchema.getTableName.equals("datamaptest_datamap3"))
   }
 
   test("check hivemetastore after drop datamap") {
@@ -110,7 +116,7 @@ class TestDataMapCommand extends QueryTest with 
BeforeAndAfterAll {
     }
   }
 
-  test("drop the table having pre-aggregate"){
+  test("drop the table having pre-aggregate") {
     try {
       CarbonProperties.getInstance()
         .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
@@ -140,13 +146,24 @@ class TestDataMapCommand extends QueryTest with 
BeforeAndAfterAll {
   test("test datamap create with preagg with duplicate name") {
     intercept[Exception] {
       sql(
-        "create datamap datamap2 on table datamaptest using 'preaggregate' 
dmproperties('key'='value') as select count(a) from datamaptest")
-
+        s"""
+           | CREATE DATAMAP datamap2 ON TABLE datamaptest
+           | USING 'preaggregate'
+           | DMPROPERTIES('key'='value')
+           | AS SELECT COUNT(a) FROM datamaptest
+         """.stripMargin)
+      sql(
+        s"""
+           | CREATE DATAMAP datamap2 ON TABLE datamaptest
+           | USING 'preaggregate'
+           | DMPROPERTIES('key'='value')
+           | AS SELECT COUNT(a) FROM datamaptest
+         """.stripMargin)
     }
     val table = CarbonMetadata.getInstance().getCarbonTable("default", 
"datamaptest")
     assert(table != null)
     val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
-    assert(dataMapSchemaList.size() == 3)
+    assert(dataMapSchemaList.size() == 2)
   }
 
   test("test datamap drop with preagg") {
@@ -157,25 +174,29 @@ class TestDataMapCommand extends QueryTest with 
BeforeAndAfterAll {
     val table = CarbonMetadata.getInstance().getCarbonTable("default", 
"datamaptest")
     assert(table != null)
     val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
-    assert(dataMapSchemaList.size() == 3)
+    assert(dataMapSchemaList.size() == 2)
   }
 
-  test("test show datamap without preaggregate") {
-    sql("drop table if exists datamapshowtest")
-    sql("create table datamapshowtest (a string, b string, c string) stored by 
'carbondata'")
-    sql("create datamap datamap1 on table datamapshowtest using 'new.class' 
dmproperties('key'='value')")
-    sql("create datamap datamap2 on table datamapshowtest using 'new.class' 
dmproperties('key'='value')")
-    checkExistence(sql("show datamap on table datamapshowtest"), true, 
"datamap1", "datamap2", "(NA)", "new.class")
+  test("test show datamap without preaggregate: don't support using class") {
+    intercept[MalformedDataMapCommandException] {
+      sql("drop table if exists datamapshowtest")
+      sql("create table datamapshowtest (a string, b string, c string) stored 
by 'carbondata'")
+      sql(s"CREATE DATAMAP datamap1 ON TABLE datamapshowtest USING '$newClass' 
DMPROPERTIES('key'='value')")
+      sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' 
DMPROPERTIES('key'='value')")
+      checkExistence(sql("SHOW DATAMAP ON TABLE datamapshowtest"), true, 
"datamap1", "datamap2", "(NA)", newClass)
+    }
   }
 
-  test("test show datamap with preaggregate") {
-    sql("drop table if exists datamapshowtest")
-    sql("create table datamapshowtest (a string, b string, c string) stored by 
'carbondata'")
-    sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' 
as select count(a) from datamapshowtest")
-    sql("create datamap datamap2 on table datamapshowtest using 'new.class' 
dmproperties('key'='value')")
-    val frame = sql("show datamap on table datamapshowtest")
-    assert(frame.collect().length == 2)
-    checkExistence(frame, true, "datamap1", "datamap2", "(NA)", "new.class", 
"default.datamapshowtest_datamap1")
+  test("test show datamap with preaggregate: don't support using class") {
+    intercept[MalformedDataMapCommandException] {
+      sql("drop table if exists datamapshowtest")
+      sql("create table datamapshowtest (a string, b string, c string) stored 
by 'carbondata'")
+      sql("create datamap datamap1 on table datamapshowtest using 
'preaggregate' as select count(a) from datamapshowtest")
+      sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' 
DMPROPERTIES('key'='value')")
+      val frame = sql("show datamap on table datamapshowtest")
+      assert(frame.collect().length == 2)
+      checkExistence(frame, true, "datamap1", "datamap2", "(NA)", newClass, 
"default.datamapshowtest_datamap1")
+    }
   }
 
   test("test show datamap with no datamap") {
@@ -184,15 +205,17 @@ class TestDataMapCommand extends QueryTest with 
BeforeAndAfterAll {
     assert(sql("show datamap on table datamapshowtest").collect().length == 0)
   }
 
-  test("test show datamap after dropping datamap") {
-    sql("drop table if exists datamapshowtest")
-    sql("create table datamapshowtest (a string, b string, c string) stored by 
'carbondata'")
-    sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' 
as select count(a) from datamapshowtest")
-    sql("create datamap datamap2 on table datamapshowtest using 'new.class' 
dmproperties('key'='value')")
-    sql("drop datamap datamap1 on table datamapshowtest")
-    val frame = sql("show datamap on table datamapshowtest")
-    assert(frame.collect().length == 1)
-    checkExistence(frame, true, "datamap2", "(NA)", "new.class")
+  test("test show datamap after dropping datamap: don't support using class") {
+    intercept[MalformedDataMapCommandException] {
+      sql("drop table if exists datamapshowtest")
+      sql("create table datamapshowtest (a string, b string, c string) stored 
by 'carbondata'")
+      sql("create datamap datamap1 on table datamapshowtest using 
'preaggregate' as select count(a) from datamapshowtest")
+      sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' 
DMPROPERTIES('key'='value')")
+      sql("drop datamap datamap1 on table datamapshowtest")
+      val frame = sql("show datamap on table datamapshowtest")
+      assert(frame.collect().length == 1)
+      checkExistence(frame, true, "datamap2", "(NA)", newClass)
+    }
   }
 
   test("test if preaggregate load is successfull for hivemetastore") {
@@ -217,7 +240,7 @@ class TestDataMapCommand extends QueryTest with 
BeforeAndAfterAll {
     sql("CREATE TABLE uniqdata(CUST_ID int,CUST_NAME 
String,ACTIVE_EMUI_VERSION string,DOB timestamp,DOJ timestamp, BIGINT_COLUMN1 
bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 
decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 
int) STORED BY 'org.apache.carbondata.format'")
     sql("insert into uniqdata select 
9000,'CUST_NAME_00000','ACTIVE_EMUI_VERSION_00000','1970-01-01 
01:00:03','1970-01-01 
02:00:03',123372036854,-223372036854,12345678901.1234000000,22345678901.1234000000,11234567489.7976000000,-11234567489.7976000000,1")
     sql("create datamap uniqdata_agg on table uniqdata using 'preaggregate' as 
select min(DECIMAL_COLUMN1) from uniqdata group by DECIMAL_COLUMN1")
-    checkAnswer(sql("select * from 
uniqdata_uniqdata_agg"),Seq(Row(12345678901.1234000000, 
12345678901.1234000000)))
+    checkAnswer(sql("select * from uniqdata_uniqdata_agg"), 
Seq(Row(12345678901.1234000000, 12345678901.1234000000)))
     sql("drop datamap if exists uniqdata_agg on table uniqdata")
   }
 
@@ -229,16 +252,16 @@ class TestDataMapCommand extends QueryTest with 
BeforeAndAfterAll {
     sql("insert into main select 10,11,'amy',12")
     sql("insert into main select 10,11,'amy',14")
     sql("create datamap preagg on table main " +
-        "using 'preaggregate' " +
-        s"dmproperties ('path'='$path') " +
-        "as select name,avg(salary) from main group by name")
+      "using 'preaggregate' " +
+      s"dmproperties ('path'='$path') " +
+      "as select name,avg(salary) from main group by name")
     assertResult(true)(new File(path).exists())
     assertResult(true)(new File(s"${CarbonTablePath.getSegmentPath(path, 
"0")}")
-                         .list(new FilenameFilter {
-                           override def accept(dir: File, name: String): 
Boolean = {
-                             name.contains(CarbonCommonConstants.FACT_FILE_EXT)
-                           }
-                         }).length > 0)
+      .list(new FilenameFilter {
+        override def accept(dir: File, name: String): Boolean = {
+          name.contains(CarbonCommonConstants.FACT_FILE_EXT)
+        }
+      }).length > 0)
     checkAnswer(sql("select name,avg(salary) from main group by name"), 
Row("amy", 13.0))
     checkAnswer(sql("select * from main_preagg"), Row("amy", 26, 2))
     sql("drop datamap preagg on table main")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/181c280b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedDataMapCommandException.java
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedDataMapCommandException.java
 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedDataMapCommandException.java
new file mode 100644
index 0000000..a05d8e6
--- /dev/null
+++ 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedDataMapCommandException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.exception;
+
+/**
+ * Throw exception when using illegal argument
+ */
+public class MalformedDataMapCommandException extends 
MalformedCarbonCommandException {
+  /**
+   * default serial version ID.
+   */
+  private static final long serialVersionUID = 1L;
+
+  public MalformedDataMapCommandException(String msg) {
+    super(msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/181c280b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 8e00635..c4d32b4 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -16,18 +16,15 @@
  */
 package org.apache.spark.sql.execution.command.datamap
 
-import scala.collection.JavaConverters._
-
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command._
-import 
org.apache.spark.sql.execution.command.preaaggregate.{CreatePreAggregateTableCommand,
 PreAggregateUtil}
+import 
org.apache.spark.sql.execution.command.preaaggregate.CreatePreAggregateTableCommand
 import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider._
+import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, 
MalformedDataMapCommandException}
 
 /**
  * Below command class will be used to create datamap on table
@@ -41,70 +38,63 @@ case class CarbonCreateDataMapCommand(
     queryString: Option[String])
   extends AtomicRunnableCommand {
 
-  var createPreAggregateTableCommands: Seq[CreatePreAggregateTableCommand] = _
+  var createPreAggregateTableCommands: CreatePreAggregateTableCommand = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     // since streaming segment does not support building index and 
pre-aggregate yet,
     // so streaming table does not support create datamap
     val carbonTable =
-      CarbonEnv.getCarbonTable(tableIdentifier.database, 
tableIdentifier.table)(sparkSession)
+    CarbonEnv.getCarbonTable(tableIdentifier.database, 
tableIdentifier.table)(sparkSession)
     if (carbonTable.isStreamingTable) {
       throw new MalformedCarbonCommandException("Streaming table does not 
support creating datamap")
     }
     val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-    if 
(dmClassName.equals("org.apache.carbondata.datamap.AggregateDataMapHandler") ||
-        dmClassName.equalsIgnoreCase("preaggregate")) {
-      val timeHierarchyString = 
dmproperties.get(CarbonCommonConstants.TIMESERIES_HIERARCHY)
-      createPreAggregateTableCommands = if (timeHierarchyString.isDefined) {
+
+    if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) ||
+      dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
+      TimeSeriesUtil.validateTimeSeriesGranularity(dmproperties, dmClassName)
+      createPreAggregateTableCommands = if 
(dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
         val details = TimeSeriesUtil
-          .validateAndGetTimeSeriesHierarchyDetails(
-            timeHierarchyString.get)
-        val updatedDmProperties = dmproperties - 
CarbonCommonConstants.TIMESERIES_HIERARCHY
-        details.map { f =>
-          CreatePreAggregateTableCommand(dataMapName + '_' + f._1,
-            tableIdentifier,
-            dmClassName,
-            updatedDmProperties,
-            queryString.get,
-            Some(f._1))
-        }.toSeq
+          .getTimeSeriesGranularityDetails(dmproperties, dmClassName)
+        val updatedDmProperties = dmproperties - details._1
+        CreatePreAggregateTableCommand(dataMapName,
+          tableIdentifier,
+          dmClassName,
+          updatedDmProperties,
+          queryString.get,
+          Some(details._1))
       } else {
-        Seq(CreatePreAggregateTableCommand(
+        CreatePreAggregateTableCommand(
           dataMapName,
           tableIdentifier,
           dmClassName,
           dmproperties,
           queryString.get
-        ))
+        )
       }
-      createPreAggregateTableCommands.flatMap(_.processMetadata(sparkSession))
+      createPreAggregateTableCommands.processMetadata(sparkSession)
     } else {
-      val dataMapSchema = new DataMapSchema(dataMapName, dmClassName)
-      dataMapSchema.setProperties(new java.util.HashMap[String, 
String](dmproperties.asJava))
-      val dbName = 
CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession)
-      // upadting the parent table about dataschema
-      PreAggregateUtil.updateMainTable(dbName, tableIdentifier.table, 
dataMapSchema, sparkSession)
+      throw new MalformedDataMapCommandException("Unknown data map type " + 
dmClassName)
     }
-    LOGGER.audit(s"DataMap $dataMapName successfully added to Table ${ 
tableIdentifier.table }")
+    LOGGER.audit(s"DataMap $dataMapName successfully added to Table 
${tableIdentifier.table}")
     Seq.empty
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    if 
(dmClassName.equals("org.apache.carbondata.datamap.AggregateDataMapHandler") ||
-        dmClassName.equalsIgnoreCase("preaggregate")) {
-      createPreAggregateTableCommands.flatMap(_.processData(sparkSession))
+    if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) ||
+      dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
+      createPreAggregateTableCommands.processData(sparkSession)
     } else {
-      Seq.empty
+      throw new MalformedDataMapCommandException("Unknown data map type " + 
dmClassName)
     }
   }
 
   override def undoMetadata(sparkSession: SparkSession, exception: Exception): 
Seq[Row] = {
-    if 
(dmClassName.equals("org.apache.carbondata.datamap.AggregateDataMapHandler") ||
-        dmClassName.equalsIgnoreCase("preaggregate")) {
-      val timeHierarchyString = 
dmproperties.get(CarbonCommonConstants.TIMESERIES_HIERARCHY)
-      createPreAggregateTableCommands.flatMap(_.undoMetadata(sparkSession, 
exception))
+    if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) ||
+      dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
+      createPreAggregateTableCommands.undoMetadata(sparkSession, exception)
     } else {
-      Seq.empty
+      throw new MalformedDataMapCommandException("Unknown data map type " + 
dmClassName)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/181c280b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index a75a06f..dbbf90c 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -104,9 +104,9 @@ case class CreatePreAggregateTableCommand(
       TimeSeriesUtil.validateTimeSeriesEventTime(dmProperties, parentTable)
       TimeSeriesUtil.validateEventTimeColumnExitsInSelect(
         fieldRelationMap,
-        dmProperties.get(CarbonCommonConstants.TIMESERIES_EVENTTIME).get)
+        dmProperties.get(TimeSeriesUtil.TIMESERIES_EVENTTIME).get)
       TimeSeriesUtil.updateTimeColumnSelect(fieldRelationMap,
-        dmProperties.get(CarbonCommonConstants.TIMESERIES_EVENTTIME).get,
+        dmProperties.get(TimeSeriesUtil.TIMESERIES_EVENTTIME).get,
       timeSeriesFunction.get)
     }
     tableModel.parentTable = Some(parentTable)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/181c280b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
index 4fe9df0..987d4fe 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
@@ -18,32 +18,33 @@ package org.apache.spark.sql.execution.command.timeseries
 
 import org.apache.spark.sql.execution.command.{DataMapField, Field}
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.datatype.DataTypes
+import 
org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
+import org.apache.carbondata.core.metadata.schema.datamap.Granularity
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.preagg.TimeSeriesUDF
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, 
MalformedDataMapCommandException}
 
 /**
  * Utility class for time series to keep
  */
 object TimeSeriesUtil {
 
+  val TIMESERIES_EVENTTIME = "event_time"
+
   /**
    * Below method will be used to validate whether column mentioned in time 
series
    * is timestamp column or not
    *
-   * @param dmproperties
-   * data map properties
-   * @param parentTable
-   * parent table
+   * @param dmproperties data map properties
+   * @param parentTable  parent table
    * @return whether time stamp column
    */
   def validateTimeSeriesEventTime(dmproperties: Map[String, String],
       parentTable: CarbonTable) {
-    val eventTime = 
dmproperties.get(CarbonCommonConstants.TIMESERIES_EVENTTIME)
+    val eventTime = dmproperties.get(TIMESERIES_EVENTTIME)
     if (!eventTime.isDefined) {
-      throw new MalformedCarbonCommandException("Eventtime not defined in time 
series")
+      throw new MalformedCarbonCommandException("event_time not defined in 
time series")
     } else {
       val carbonColumn = parentTable.getColumnByName(parentTable.getTableName, 
eventTime.get)
       if (carbonColumn.getDataType != DataTypes.TIMESTAMP) {
@@ -55,13 +56,79 @@ object TimeSeriesUtil {
   }
 
   /**
+   * validate TimeSeries Granularity
+   *
+   * @param dmProperties datamap properties
+   * @param dmClassName  datamap class name
+   * @return whether find  only one granularity
+   */
+  def validateTimeSeriesGranularity(
+      dmProperties: Map[String, String],
+      dmClassName: String): Boolean = {
+    var isFound = false
+
+    // 1. granularity only support one
+    for (granularity <- Granularity.values()) {
+      if (dmProperties.get(granularity.getName).isDefined) {
+        if (isFound) {
+          throw new MalformedDataMapCommandException(
+            s"Only one granularity level can be defined")
+        } else {
+          isFound = true
+        }
+      }
+    }
+
+    // 2. check whether timeseries and granularity match
+    if (isFound && !dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
+      throw new MalformedDataMapCommandException(
+        s"${TIMESERIES.toString} keyword missing")
+    } else if (!isFound && dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
+      throw new MalformedDataMapCommandException(
+        s"${TIMESERIES.toString} should define time granularity")
+    } else if (isFound) {
+      true
+    } else {
+      false
+    }
+  }
+
+  /**
+   * get TimeSeries Granularity key and value
+   * check the value
+   *
+   * TODO:we will support value not only equal to 1 in the future
+   *
+   * @param dmProperties datamap properties
+   * @param dmClassName  datamap class name
+   * @return key and value tuple
+   */
+  def getTimeSeriesGranularityDetails(
+      dmProperties: Map[String, String],
+      dmClassName: String): (String, String) = {
+
+    val defaultValue = "1"
+    for (granularity <- Granularity.values()) {
+      if (dmProperties.get(granularity.getName).isDefined &&
+        
dmProperties.get(granularity.getName).get.equalsIgnoreCase(defaultValue)) {
+        return (granularity.toString.toLowerCase, 
dmProperties.get(granularity.getName).get)
+      }
+    }
+
+    throw new MalformedDataMapCommandException(
+      s"Granularity only support $defaultValue")
+  }
+
+  /**
    * Below method will be used to validate the hierarchy of time series and 
its value
    * validation will be done whether hierarchy order is proper or not and 
hierarchy level
    * value
+   * TODO: we should remove this method
    *
    * @param timeSeriesHierarchyDetails
    * time series hierarchy string
    */
+  @deprecated
   def validateAndGetTimeSeriesHierarchyDetails(timeSeriesHierarchyDetails: 
String): Array[
     (String, String)] = {
     val updatedtimeSeriesHierarchyDetails = 
timeSeriesHierarchyDetails.toLowerCase

http://git-wip-us.apache.org/repos/asf/carbondata/blob/181c280b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
index 0f934cb..eb52910 100644
--- 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
@@ -260,4 +260,12 @@ class CarbonDataSourceSuite extends Spark2QueryTest with 
BeforeAndAfterAll {
     sql("drop table if exists carbon_test")
     assert(exception.contains("Table creation failed. Table name cannot 
contain blank space"))
   }
+
+  test("test create table: using") {
+    sql("DROP TABLE IF EXISTS usingTable")
+    val e: Exception = intercept[ClassNotFoundException] {
+      sql("CREATE TABLE usingTable(name STRING) USING abc")
+    }
+    assert(e.getMessage.contains("Failed to find data source: abc"))
+  }
 }

Reply via email to