[CARBONDATA-1517]- Pre Aggregate Create Table Support

Support CTAS in carbon and support creating aggregation tables using CTAS and 
update aggregation table information to main table schema.

This closes #1433


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/aa12127d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/aa12127d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/aa12127d

Branch: refs/heads/pre-aggregate
Commit: aa12127dda4c4481b6e49b399086e01d2ed6bf08
Parents: 0a66f91
Author: kumarvishal <kumarvishal.1...@gmail.com>
Authored: Sun Oct 15 18:05:55 2017 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Fri Nov 10 10:45:30 2017 +0530

----------------------------------------------------------------------
 .../table/column/ParentColumnTableRelation.java |  71 +++
 .../ThriftWrapperSchemaConverterImplTest.java   |  28 +-
 .../preaggregate/TestPreAggCreateCommand.scala  | 148 +++++++
 .../carbondata/spark/util/CommonUtil.scala      |   9 +
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |  12 +-
 .../command/carbonTableSchemaCommon.scala       | 170 +++++---
 .../command/management/LoadTableCommand.scala   |   2 +-
 .../CreatePreAggregateTableCommand.scala        | 136 ++++++
 .../preaaggregate/PreAggregateUtil.scala        | 431 +++++++++++++++++++
 .../schema/CarbonAlterTableRenameCommand.scala  |   2 +-
 .../spark/sql/hive/CarbonFileMetastore.scala    |  41 +-
 .../spark/sql/hive/CarbonHiveMetaStore.scala    |  72 +++-
 .../apache/spark/sql/hive/CarbonMetaStore.scala |  21 +-
 .../sql/parser/CarbonSpark2SqlParser.scala      |   2 +-
 .../spark/sql/parser/CarbonSparkSqlParser.scala |  33 +-
 .../org/apache/spark/util/AlterTableUtil.scala  |  10 +-
 16 files changed, 1100 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa12127d/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ParentColumnTableRelation.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ParentColumnTableRelation.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ParentColumnTableRelation.java
new file mode 100644
index 0000000..425d0f2
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ParentColumnTableRelation.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata.schema.table.column;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.Writable;
+
+/**
+ * To maintain the relation of child column to parent table column
+ */
+public class ParentColumnTableRelation implements Serializable, Writable {
+
+  private RelationIdentifier relationIdentifier;
+  /**
+   * parent column id
+   */
+  private String columnId;
+
+  private String columnName;
+
+  public ParentColumnTableRelation(RelationIdentifier relationIdentifier, 
String columId,
+      String columnName) {
+    this.relationIdentifier = relationIdentifier;
+    this.columnId = columId;
+    this.columnName = columnName;
+  }
+
+  public RelationIdentifier getRelationIdentifier() {
+    return relationIdentifier;
+  }
+
+  public String getColumnId() {
+    return columnId;
+  }
+
+  public String getColumnName() {
+    return columnName;
+  }
+
+  @Override public void write(DataOutput out) throws IOException {
+    relationIdentifier.write(out);
+    out.writeUTF(columnId);
+    out.writeUTF(columnName);
+  }
+
+  @Override public void readFields(DataInput in) throws IOException {
+    this.relationIdentifier = new RelationIdentifier(null, null, null);
+    relationIdentifier.readFields(in);
+    this.columnId = in.readUTF();
+    this.columnName = in.readUTF();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa12127d/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
 
b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
index 42c0ad6..0fddc25 100644
--- 
a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
+++ 
b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
@@ -31,6 +31,7 @@ import 
org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.metadata.schema.table.TableSchema;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.format.DataMapSchema;
 
 import mockit.Mock;
 import mockit.MockUp;
@@ -84,6 +85,7 @@ public class ThriftWrapperSchemaConverterImplTest {
         new 
org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.STRING,
             "columnName", "1", true, encoders, true);
     thriftColumnSchema.setSchemaOrdinal(1);
+    thriftColumnSchema.setAggregate_function("");
     thriftColumnSchemas = new 
ArrayList<org.apache.carbondata.format.ColumnSchema>();
     thriftColumnSchemas.add(thriftColumnSchema);
     thriftSchemaEvolutionEntries = new ArrayList<>();
@@ -421,6 +423,7 @@ public class ThriftWrapperSchemaConverterImplTest {
             new 
org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.BOOLEAN,
                     "columnName", "1", true, encoders, true);
     thriftColumnSchema.setSchemaOrdinal(1);
+    thriftColumnSchema.setAggregate_function("");
     ColumnSchema wrapperColumnSchema = new ColumnSchema();
 
     new MockUp<ColumnSchema>() {
@@ -483,6 +486,8 @@ public class ThriftWrapperSchemaConverterImplTest {
       @Mock public String getColumnReferenceId() {
         return "1";
       }
+
+      @Mock public String getAggFunction() {return "" ;}
     };
 
     org.apache.carbondata.format.ColumnSchema actualResult =
@@ -496,7 +501,7 @@ public class ThriftWrapperSchemaConverterImplTest {
         new 
org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.STRING,
             "columnName", "1", true, encoders, true);
     thriftColumnSchema.setSchemaOrdinal(1);
-
+    thriftColumnSchema.setAggregate_function("");
     new MockUp<ColumnSchema>() {
       @Mock public List<Encoding> getEncodingList() {
         return encodings;
@@ -571,7 +576,7 @@ public class ThriftWrapperSchemaConverterImplTest {
         new 
org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.INT,
             "columnName", "1", true, encoders, true);
     thriftColumnSchema.setSchemaOrdinal(1);
-
+    thriftColumnSchema.setAggregate_function("");
     new MockUp<ColumnSchema>() {
       @Mock public List<Encoding> getEncodingList() {
         return encodings;
@@ -645,6 +650,7 @@ public class ThriftWrapperSchemaConverterImplTest {
         new 
org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.SHORT,
             "columnName", "1", true, encoders, true);
     thriftColumnSchema.setSchemaOrdinal(1);
+    thriftColumnSchema.setAggregate_function("");
     new MockUp<ColumnSchema>() {
       @Mock public List<Encoding> getEncodingList() {
         return encodings;
@@ -718,7 +724,7 @@ public class ThriftWrapperSchemaConverterImplTest {
         new 
org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.LONG,
             "columnName", "1", true, encoders, true);
     thriftColumnSchema.setSchemaOrdinal(1);
-
+    thriftColumnSchema.setAggregate_function("");
     new MockUp<ColumnSchema>() {
       @Mock public List<Encoding> getEncodingList() {
         return encodings;
@@ -792,6 +798,7 @@ public class ThriftWrapperSchemaConverterImplTest {
         new 
org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.DOUBLE,
             "columnName", "1", true, encoders, true);
     thriftColumnSchema.setSchemaOrdinal(1);
+    thriftColumnSchema.setAggregate_function("");
 
     new MockUp<ColumnSchema>() {
       @Mock public List<Encoding> getEncodingList() {
@@ -866,6 +873,7 @@ public class ThriftWrapperSchemaConverterImplTest {
         new 
org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.DECIMAL,
             "columnName", "1", true, encoders, true);
     thriftColumnSchema.setSchemaOrdinal(1);
+    thriftColumnSchema.setAggregate_function("");
     new MockUp<ColumnSchema>() {
       @Mock public List<Encoding> getEncodingList() {
         return encodings;
@@ -926,6 +934,10 @@ public class ThriftWrapperSchemaConverterImplTest {
       @Mock public String getColumnReferenceId() {
         return "1";
       }
+
+      @Mock public String getAggFunction() {
+        return "";
+      }
     };
 
     ColumnSchema wrapperColumnSchema = new ColumnSchema();
@@ -940,6 +952,7 @@ public class ThriftWrapperSchemaConverterImplTest {
             org.apache.carbondata.format.DataType.TIMESTAMP, "columnName", 
"1", true, encoders,
             true);
     thriftColumnSchema.setSchemaOrdinal(1);
+    thriftColumnSchema.setAggregate_function("");
 
     new MockUp<ColumnSchema>() {
       @Mock public List<Encoding> getEncodingList() {
@@ -1014,7 +1027,7 @@ public class ThriftWrapperSchemaConverterImplTest {
         new 
org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.ARRAY,
             "columnName", "1", true, encoders, true);
     thriftColumnSchema.setSchemaOrdinal(1);
-
+    thriftColumnSchema.setAggregate_function("");
     new MockUp<ColumnSchema>() {
       @Mock public List<Encoding> getEncodingList() {
         return encodings;
@@ -1088,6 +1101,7 @@ public class ThriftWrapperSchemaConverterImplTest {
         new 
org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.STRUCT,
             "columnName", "1", true, encoders, true);
     thriftColumnSchema.setSchemaOrdinal(1);
+    thriftColumnSchema.setAggregate_function("");
 
     new MockUp<ColumnSchema>() {
       @Mock public List<Encoding> getEncodingList() {
@@ -1168,6 +1182,7 @@ public class ThriftWrapperSchemaConverterImplTest {
             encoders,
             true);
     thriftColumnSchema.setSchemaOrdinal(1);
+    thriftColumnSchema.setAggregate_function("");
 
     new MockUp<ColumnSchema>() {
       @Mock public List<Encoding> getEncodingList() {
@@ -1318,7 +1333,7 @@ public class ThriftWrapperSchemaConverterImplTest {
         new 
org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.BOOLEAN,
             "columnName", "1", true, encoders, true);
     thriftColumnSchema.setSchemaOrdinal(1);
-
+    thriftColumnSchema.setAggregate_function("");
     ColumnSchema wrapperColumnSchema = new ColumnSchema();
     org.apache.carbondata.format.ColumnSchema actualResult =
         
thriftWrapperSchemaConverter.fromWrapperToExternalColumnSchema(wrapperColumnSchema);
@@ -1506,6 +1521,8 @@ public class ThriftWrapperSchemaConverterImplTest {
       @Mock public String getColumnReferenceId() {
         return "1";
       }
+
+      @Mock public String getAggFunction() { return "";}
     };
 
     new MockUp<TableInfo>() {
@@ -1542,6 +1559,7 @@ public class ThriftWrapperSchemaConverterImplTest {
     org.apache.carbondata.format.TableInfo expectedResult =
         new org.apache.carbondata.format.TableInfo(thriftFactTable, new 
ArrayList<org.apache
             .carbondata.format.TableSchema>());
+    expectedResult.setDataMapSchemas(new ArrayList<DataMapSchema>());
     assertEquals(expectedResult, actualResult);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa12127d/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
new file mode 100644
index 0000000..6120e88
--- /dev/null
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -0,0 +1,148 @@
+package org.apache.carbondata.integration.spark.testsuite.preaggregate
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    sql("drop table if exists PreAggMain")
+    sql("drop table if exists PreAggMain1")
+    sql("drop table if exists PreAggMain2")
+    sql("create table preaggMain (a string, b string, c string) stored by 
'carbondata'")
+    sql("create table preaggMain1 (a string, b string, c string) stored by 
'carbondata' tblProperties('DICTIONARY_INCLUDE' = 'a')")
+    sql("create table preaggMain2 (a string, b string, c string) stored by 
'carbondata'")
+  }
+
+
+  test("test pre agg create table One") {
+    sql("create table preagg1 stored BY 'carbondata' 
tblproperties('parent'='PreAggMain') as select a,sum(b) from PreAggMain group 
by a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg1"), true, "preaggmain_a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg1"), true, "preaggmain_b_sum")
+    sql("drop table preagg1")
+  }
+
+  test("test pre agg create table Two") {
+    sql("create table preagg2 stored BY 'carbondata' 
tblproperties('parent'='PreAggMain') as select a as a1,sum(b) from PreAggMain 
group by a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg2"), true, "preaggmain_a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg2"), true, "preaggmain_b_sum")
+    sql("drop table preagg2")
+  }
+
+  test("test pre agg create table Three") {
+    sql("create table preagg3 stored BY 'carbondata' 
tblproperties('parent'='PreAggMain') as select a,sum(b) as sum from PreAggMain 
group by a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg3"), true, "preaggmain_a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg3"), true, "preaggmain_b_sum")
+    sql("drop table preagg3")
+  }
+
+  test("test pre agg create table four") {
+    sql("create table preagg4 stored BY 'carbondata' 
tblproperties('parent'='PreAggMain') as select a as a1,sum(b) as sum from 
PreAggMain group by a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg4"), true, "preaggmain_a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg4"), true, "preaggmain_b_sum")
+    sql("drop table preagg4")
+  }
+
+
+  test("test pre agg create table five") {
+    sql("create table preagg11 stored BY 'carbondata' 
tblproperties('parent'='PreAggMain1') as select a,sum(b) from PreAggMain1 group 
by a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg11"), true, "preaggmain1_a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg11"), true, 
"preaggmain1_b_sum")
+    checkExistence(sql("DESCRIBE FORMATTED preagg11"), true, "DICTIONARY")
+    sql("drop table preagg11")
+  }
+
+  test("test pre agg create table six") {
+    sql("create table preagg12 stored BY 'carbondata' 
tblproperties('parent'='PreAggMain1') as select a as a1,sum(b) from PreAggMain1 
group by a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg12"), true, "preaggmain1_a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg12"), true, 
"preaggmain1_b_sum")
+    checkExistence(sql("DESCRIBE FORMATTED preagg12"), true, "DICTIONARY")
+    sql("drop table preagg12")
+  }
+
+  test("test pre agg create table seven") {
+    sql("create table preagg13 stored BY 'carbondata' 
tblproperties('parent'='PreAggMain1') as select a,sum(b) as sum from 
PreAggMain1 group by a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg13"), true, "preaggmain1_a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg13"), true, 
"preaggmain1_b_sum")
+    checkExistence(sql("DESCRIBE FORMATTED preagg13"), true, "DICTIONARY")
+    sql("drop table preagg13")
+  }
+
+  test("test pre agg create table eight") {
+    sql("create table preagg14 stored BY 'carbondata' 
tblproperties('parent'='PreAggMain1') as select a as a1,sum(b) as sum from 
PreAggMain1 group by a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg14"), true, "preaggmain1_a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg14"), true, 
"preaggmain1_b_sum")
+    checkExistence(sql("DESCRIBE FORMATTED preagg14"), true, "DICTIONARY")
+    sql("drop table preagg14")
+  }
+
+
+  test("test pre agg create table nine") {
+    sql("create table preagg15 stored BY 'carbondata' 
tblproperties('parent'='PreAggMain2') as select a,avg(b) from PreAggMain2 group 
by a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg15"), true, "preaggmain2_a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg15"), true, 
"preaggmain2_b_sum")
+    checkExistence(sql("DESCRIBE FORMATTED preagg15"), true, 
"preaggmain2_b_count")
+    sql("drop table preagg15")
+  }
+
+  test("test pre agg create table ten") {
+    sql("create table preagg16 stored BY 'carbondata' 
tblproperties('parent'='PreAggMain2') as select a as a1,max(b) from PreAggMain2 
group by a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg16"), true, "preaggmain2_a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg16"), true, 
"preaggmain2_b_max")
+    sql("drop table preagg16")
+  }
+
+  test("test pre agg create table eleven") {
+    sql("create table preagg17 stored BY 'carbondata' 
tblproperties('parent'='PreAggMain2') as select a,min(b) from PreAggMain2 group 
by a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg17"), true, "preaggmain2_a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg17"), true, 
"preaggmain2_b_min")
+    sql("drop table preagg17")
+  }
+
+  test("test pre agg create table twelve") {
+    sql("create table preagg18 stored BY 'carbondata' 
tblproperties('parent'='PreAggMain2') as select a as a1,count(b) from 
PreAggMain2 group by a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg18"), true, "preaggmain2_a")
+    checkExistence(sql("DESCRIBE FORMATTED preagg18"), true, 
"preaggmain2_b_count")
+    sql("drop table preagg18")
+  }
+
+  test("test pre agg create table thirteen") {
+    try {
+      sql(
+        "create table preagg19 stored BY 'carbondata' 
tblproperties('parent'='PreAggMain2') as select a as a1,count(distinct b) from 
PreAggMain2 group by a")
+      assert(false)
+    } catch {
+      case _: Exception =>
+        assert(true)
+    }
+  }
+
+  test("test pre agg create table fourteen") {
+    try {
+      sql(
+        "create table preagg20 stored BY 'carbondata' 
tblproperties('parent'='PreAggMain2') as select a as a1,sum(distinct b) from 
PreAggMain2 group by a")
+      assert(false)
+    } catch {
+      case _: Exception =>
+        assert(true)
+    }
+  }
+
+  test("test pre agg create table fifteen") {
+    try {
+      sql(
+        "create table preagg21 stored BY 'carbondata' 
tblproperties('parent'='PreAggMain2') as select a as a1,sum(b) from PreAggMain2 
where a='vishal' group by a")
+      assert(false)
+    } catch {
+      case _: Exception =>
+        assert(true)
+    }
+  }
+
+
+  override def afterAll {
+    sql("drop table if exists PreAggMain")
+    sql("drop table if exists PreAggMain1")
+    sql("drop table if exists PreAggMain2")
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa12127d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 27ebf42..84294ff 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -20,6 +20,7 @@ package org.apache.carbondata.spark.util
 
 import java.text.SimpleDateFormat
 import java.util
+import java.util.regex.{Matcher, Pattern}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.Map
@@ -834,6 +835,14 @@ object CommonUtil {
     }
   }
 
+  def getScaleAndPrecision(dataType: String): (Int, Int) = {
+    val m: Matcher = Pattern.compile("^decimal\\(([^)]+)\\)").matcher(dataType)
+    m.find()
+    val matchedString: String = m.group(1)
+    val scaleAndPrecision = matchedString.split(",")
+    (Integer.parseInt(scaleAndPrecision(0).trim), 
Integer.parseInt(scaleAndPrecision(1).trim))
+  }
+
   /**
    * Merge the carbonindex files with in the segment to carbonindexmerge file 
inside same segment
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa12127d/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index ee51954..bb80bce 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -221,16 +221,6 @@ abstract class CarbonDDLSqlParser extends 
AbstractCarbonSparkSQLParser {
     dimensions ++ complexDimensions
   }
 
-
-
-  def getScaleAndPrecision(dataType: String): (Int, Int) = {
-    val m: Matcher = Pattern.compile("^decimal\\(([^)]+)\\)").matcher(dataType)
-    m.find()
-    val matchedString: String = m.group(1)
-    val scaleAndPrecision = matchedString.split(",")
-    (Integer.parseInt(scaleAndPrecision(0).trim), 
Integer.parseInt(scaleAndPrecision(1).trim))
-  }
-
   /**
    * This will prepate the Model from the Tree details.
    *
@@ -1074,7 +1064,7 @@ abstract class CarbonDDLSqlParser extends 
AbstractCarbonSparkSQLParser {
       // if it is present then extracting the precision and scale. resetting 
the data type
       // with Decimal.
       case _ if dataType.startsWith("decimal") =>
-        val (precision, scale) = getScaleAndPrecision(dataType)
+        val (precision, scale) = CommonUtil.getScaleAndPrecision(dataType)
         Field(field.column,
           Some("Decimal"),
           field.name,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa12127d/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 1188b59..37ba8a5 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -33,8 +33,8 @@ import 
org.apache.carbondata.core.metadata.CarbonTableIdentifier
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, 
DecimalType}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema._
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo, TableSchema}
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
RelationIdentifier, TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema, 
ParentColumnTableRelation}
 import org.apache.carbondata.core.service.CarbonCommonFactory
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentUpdateStatusManager}
 import org.apache.carbondata.core.util.DataTypeUtil
@@ -61,13 +61,30 @@ case class TableModel(
     util.List[ColumnProperty]]] = None,
     bucketFields: Option[BucketFields],
     partitionInfo: Option[PartitionInfo],
-    tableComment: Option[String] = None)
+    tableComment: Option[String] = None,
+    var parentTable: Option[CarbonTable] = None,
+    var dataMapRelation: Option[scala.collection.mutable.LinkedHashMap[Field, 
DataMapField]] = None)
 
 case class Field(column: String, var dataType: Option[String], name: 
Option[String],
     children: Option[List[Field]], parent: String = null,
     storeType: Option[String] = Some("columnar"),
     var schemaOrdinal: Int = -1,
-    var precision: Int = 0, var scale: Int = 0, var rawSchema: String = "")
+    var precision: Int = 0, var scale: Int = 0, var rawSchema: String = "") {
+  override def equals(o: Any) : Boolean = o match {
+    case that: Field =>
+      that.column.equalsIgnoreCase(this.column)
+    case _ => false
+  }
+  override def hashCode : Int = column.hashCode
+}
+
+case class DataMapField(aggregateFunction: String = "",
+    columnTableRelation: Option[ColumnTableRelation] = None) {
+}
+
+case class ColumnTableRelation(parentColumnName: String, parentColumnId: 
String,
+    parentTableName: String, parentDatabaseName: String, parentTableId: 
String) {
+}
 
 case class ColumnProperty(key: String, value: String)
 
@@ -358,15 +375,13 @@ class TableNewProcessor(cm: TableModel) {
       fields.foreach(field => {
         val encoders = new java.util.ArrayList[Encoding]()
         encoders.add(Encoding.DICTIONARY)
-        val columnSchema: ColumnSchema = TableNewProcessor.createColumnSchema(
-          field,
+        val columnSchema: ColumnSchema = getColumnSchema(
+          
DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
+          field.name.getOrElse(field.column),
           encoders,
-          isDimensionCol = true,
-          field.precision,
-          field.scale,
-          field.schemaOrdinal,
-          cm.highcardinalitydims.getOrElse(Seq()),
-          cm.databaseName)
+          true,
+          field,
+          cm.dataMapRelation)
         allColumns ++= Seq(columnSchema)
         if (field.children.get != null) {
           columnSchema.setNumberOfChild(field.children.get.size)
@@ -377,6 +392,56 @@ class TableNewProcessor(cm: TableModel) {
     allColumns
   }
 
+  def getColumnSchema(
+      dataType: DataType,
+      colName: String,
+      encoders: java.util.List[Encoding],
+      isDimensionCol: Boolean,
+      field: Field,
+      map: Option[scala.collection.mutable.LinkedHashMap[Field, 
DataMapField]]) : ColumnSchema = {
+    val columnSchema = new ColumnSchema()
+    columnSchema.setDataType(dataType)
+    columnSchema.setColumnName(colName)
+    val isParentColumnRelation = map.isDefined && map.get.get(field).isDefined
+    if(!isParentColumnRelation) {
+      val highCardinalityDims = cm.highcardinalitydims.getOrElse(Seq())
+      if (highCardinalityDims.contains(colName)) {
+        encoders.remove(Encoding.DICTIONARY)
+      }
+    if (dataType == DataTypes.DATE) {
+        encoders.add(Encoding.DIRECT_DICTIONARY)
+      }
+    if (dataType == DataTypes.TIMESTAMP && 
!highCardinalityDims.contains(colName)) {
+        encoders.add(Encoding.DIRECT_DICTIONARY)
+      }
+    }
+    columnSchema.setEncodingList(encoders)
+    val colUniqueIdGenerator = CarbonCommonFactory.getColumnUniqueIdGenerator
+    val columnUniqueId = colUniqueIdGenerator.generateUniqueId(cm.databaseName,
+      columnSchema)
+    columnSchema.setColumnUniqueId(columnUniqueId)
+    columnSchema.setColumnReferenceId(columnUniqueId)
+    columnSchema.setDimensionColumn(isDimensionCol)
+    columnSchema.setPrecision(field.precision)
+    columnSchema.setScale(field.scale)
+    columnSchema.setSchemaOrdinal(field.schemaOrdinal)
+    columnSchema.setSortColumn(false)
+    if(isParentColumnRelation) {
+      val dataMapField = map.get.get(field).get
+      columnSchema.setAggFunction(dataMapField.aggregateFunction);
+        val relation = dataMapField.columnTableRelation.get
+        val parentColumnTableRelationList = new 
util.ArrayList[ParentColumnTableRelation]
+        val relationIdentifier = new RelationIdentifier(
+          relation.parentDatabaseName, relation.parentTableName, 
relation.parentTableId)
+        val parentColumnTableRelation = new ParentColumnTableRelation(
+          relationIdentifier, relation.parentColumnId, 
relation.parentColumnName)
+        parentColumnTableRelationList.add(parentColumnTableRelation)
+        
columnSchema.setParentColumnTableRelations(parentColumnTableRelationList)
+    }
+    // TODO: Need to fill RowGroupID, converted type
+    // & Number of Children after DDL finalization
+    columnSchema
+  }
 
   // process create dml fields and create wrapper TableInfo object
   def process: TableInfo = {
@@ -388,17 +453,22 @@ class TableNewProcessor(cm: TableModel) {
     // Sort columns should be at the begin of all columns
     cm.sortKeyDims.get.foreach { keyDim =>
       val field = cm.dimCols.find(keyDim equals _.column).get
-      val encoders = new java.util.ArrayList[Encoding]()
-      encoders.add(Encoding.DICTIONARY)
-      val columnSchema: ColumnSchema = TableNewProcessor.createColumnSchema(
-        field,
+      val encoders = if (cm.parentTable.isDefined && 
cm.dataMapRelation.get.get(field).isDefined) {
+        cm.parentTable.get.getColumnByName(
+          cm.parentTable.get.getFactTableName,
+          
cm.dataMapRelation.get.get(field).get.columnTableRelation.get.parentColumnName).getEncoder
+      } else {
+        val encoders = new java.util.ArrayList[Encoding]()
+        encoders.add(Encoding.DICTIONARY)
+        encoders
+      }
+      val columnSchema = getColumnSchema(
+        
DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
+        field.name.getOrElse(field.column),
         encoders,
-        isDimensionCol = true,
-        field.precision,
-        field.scale,
-        field.schemaOrdinal,
-        cm.highcardinalitydims.getOrElse(Seq()),
-        cm.databaseName)
+        true,
+        field,
+        cm.dataMapRelation)
       columnSchema.setSortColumn(true)
       allColumns :+= columnSchema
       index = index + 1
@@ -407,17 +477,24 @@ class TableNewProcessor(cm: TableModel) {
     cm.dimCols.foreach { field =>
       val sortField = cm.sortKeyDims.get.find(field.column equals _)
       if (sortField.isEmpty) {
-        val encoders = new java.util.ArrayList[Encoding]()
-        encoders.add(Encoding.DICTIONARY)
-        val columnSchema: ColumnSchema = TableNewProcessor.createColumnSchema(
-          field,
+        val encoders = if (cm.parentTable.isDefined &&
+                           cm.dataMapRelation.get.get(field).isDefined) {
+          cm.parentTable.get.getColumnByName(
+            cm.parentTable.get.getFactTableName,
+            cm.dataMapRelation.get.get(field).get.
+              columnTableRelation.get.parentColumnName).getEncoder
+        } else {
+          val encoders = new java.util.ArrayList[Encoding]()
+          encoders.add(Encoding.DICTIONARY)
+          encoders
+        }
+        val columnSchema = getColumnSchema(
+          
DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
+          field.name.getOrElse(field.column),
           encoders,
-          isDimensionCol = true,
-          field.precision,
-          field.scale,
-          field.schemaOrdinal,
-          cm.highcardinalitydims.getOrElse(Seq()),
-          cm.databaseName)
+          true,
+          field,
+          cm.dataMapRelation)
         allColumns :+= columnSchema
         index = index + 1
         if (field.children.isDefined && field.children.get != null) {
@@ -429,15 +506,13 @@ class TableNewProcessor(cm: TableModel) {
 
     cm.msrCols.foreach { field =>
       val encoders = new java.util.ArrayList[Encoding]()
-      val columnSchema: ColumnSchema = TableNewProcessor.createColumnSchema(
-        field,
+      val columnSchema = getColumnSchema(
+        
DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
+        field.name.getOrElse(field.column),
         encoders,
-        isDimensionCol = false,
-        field.precision,
-        field.scale,
-        field.schemaOrdinal,
-        cm.highcardinalitydims.getOrElse(Seq()),
-        cm.databaseName)
+        false,
+        field,
+        cm.dataMapRelation)
       allColumns :+= columnSchema
       index = index + 1
       measureCount += 1
@@ -486,15 +561,13 @@ class TableNewProcessor(cm: TableModel) {
         Some(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE),
         None
       )
-      val columnSchema: ColumnSchema = TableNewProcessor.createColumnSchema(
-        field,
+      val columnSchema: ColumnSchema = getColumnSchema(
+        DataTypes.DOUBLE,
+        CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE,
         encoders,
-        isDimensionCol = false,
-        field.precision,
-        field.scale,
-        -1,
-        cm.highcardinalitydims.getOrElse(Seq()),
-        cm.databaseName)
+        false,
+        field,
+        cm.dataMapRelation)
       columnSchema.setInvisible(true)
       allColumns :+= columnSchema
     }
@@ -503,6 +576,7 @@ class TableNewProcessor(cm: TableModel) {
 
     val tableInfo = new TableInfo()
     val tableSchema = new TableSchema()
+
     val schemaEvol = new SchemaEvolution()
     schemaEvol.setSchemaEvolutionEntryList(new 
util.ArrayList[SchemaEvolutionEntry]())
     tableSchema.setTableId(UUID.randomUUID().toString)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa12127d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
index bda6829..222c30d 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
@@ -364,7 +364,7 @@ case class LoadTableCommand(
     entry.setTime_stamp(System.currentTimeMillis())
 
     // write TableInfo
-    metastore.updateTableSchema(carbonTablePath.getCarbonTableIdentifier,
+    
metastore.updateTableSchemaForAlter(carbonTablePath.getCarbonTableIdentifier,
       carbonTablePath.getCarbonTableIdentifier,
       tableInfo, entry, carbonTablePath.getPath)(sparkSession)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa12127d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
new file mode 100644
index 0000000..ca384f9
--- /dev/null
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.preaaggregate
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.exception.InvalidConfigurationException
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema.table.{RelationIdentifier, 
TableInfo}
+import org.apache.carbondata.core.util.CarbonUtil
+
+/**
+ * Below command class will be used to create pre-aggregate table
+ * and updating the parent table about the child table information
+ * Failure case:
+ * 1. failed to create pre aggregate table.
+ * 2. failed to update main table
+ *
+ * @param cm
+ * @param dataFrame
+ * @param createDSTable
+ * @param queryString
+ */
+case class CreatePreAggregateTableCommand(
+    cm: TableModel,
+    dataFrame: DataFrame,
+    createDSTable: Boolean = true,
+    queryString: String,
+    fieldRelationMap: scala.collection.mutable.LinkedHashMap[Field, 
DataMapField])
+  extends RunnableCommand with SchemaProcessCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    processSchema(sparkSession)
+  }
+
+  override def processSchema(sparkSession: SparkSession): Seq[Row] = {
+    val storePath = CarbonEnv.getInstance(sparkSession).storePath
+    CarbonEnv.getInstance(sparkSession).carbonMetastore.
+      checkSchemasModifiedTimeAndReloadTables(storePath)
+    val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    cm.databaseName = GetDB.getDatabaseName(cm.databaseNameOp, sparkSession)
+    val tbName = cm.tableName
+    val dbName = cm.databaseName
+    LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name 
[$tbName]")
+    // getting the parent table
+    val parentTable = 
PreAggregateUtil.getParentCarbonTable(dataFrame.logicalPlan)
+    // getting the table name
+    val parentTableName = parentTable.getFactTableName
+    // getting the db name of parent table
+    val parentDbName = parentTable.getDatabaseName
+    // updating the relation identifier, this will be stored in child table
+    // which can be used during dropping of pre-aggreate table as parent table 
will
+    // also get updated
+    cm.parentTable = Some(parentTable)
+    cm.dataMapRelation = Some(fieldRelationMap)
+    val tableInfo: TableInfo = TableNewProcessor(cm)
+    // Add validation for sort scope when create table
+    val sortScope = tableInfo.getFactTable.getTableProperties
+      .getOrDefault("sort_scope", 
CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
+    if (!CarbonUtil.isValidSortOption(sortScope)) {
+      throw new InvalidConfigurationException(
+        s"Passing invalid SORT_SCOPE '$sortScope', valid SORT_SCOPE are 
'NO_SORT', 'BATCH_SORT'," +
+        s" 'LOCAL_SORT' and 'GLOBAL_SORT' ")
+    }
+
+    if (tableInfo.getFactTable.getListOfColumns.size <= 0) {
+      sys.error("No Dimensions found. Table should have at least one dimesnion 
!")
+    }
+
+    if (sparkSession.sessionState.catalog.listTables(dbName)
+      .exists(_.table.equalsIgnoreCase(tbName))) {
+      if (!cm.ifNotExistsSet) {
+        LOGGER.audit(
+          s"Table creation with Database name [$dbName] and Table name 
[$tbName] failed. " +
+          s"Table [$tbName] already exists under database [$dbName]")
+        sys.error(s"Table [$tbName] already exists under database [$dbName]")
+      }
+    } else {
+      val tableIdentifier = AbsoluteTableIdentifier.from(storePath, dbName, 
tbName)
+      // Add Database to catalog and persist
+      val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      val tablePath = tableIdentifier.getTablePath
+      val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, 
tablePath)
+      if (createDSTable) {
+        try {
+          val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size)
+          cm.dimCols.foreach(f => fields(f.schemaOrdinal) = f)
+          cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f)
+          sparkSession.sql(
+            s"""CREATE TABLE $dbName.$tbName
+               |(${ fields.map(f => f.rawSchema).mkString(",") })
+               |USING org.apache.spark.sql.CarbonSource""".stripMargin +
+            s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath 
""".stripMargin +
+            s""""$tablePath"$carbonSchemaString) """)
+          // child schema object which will be updated on parent table about 
the
+          val childSchema = tableInfo.getFactTable
+            .buildChildSchema("", tableInfo.getDatabaseName, queryString, 
"AGGREGATION")
+          // upadting the parent table about child table
+          PreAggregateUtil.updateMainTable(parentDbName, parentTableName, 
childSchema, sparkSession)
+        } catch {
+          case e: Exception =>
+            val identifier: TableIdentifier = TableIdentifier(tbName, 
Some(dbName))
+            // call the drop table to delete the created table.
+            CarbonEnv.getInstance(sparkSession).carbonMetastore
+              .dropTable(tablePath, identifier)(sparkSession)
+            LOGGER.audit(s"Table creation with Database name [$dbName] " +
+                         s"and Table name [$tbName] failed")
+            throw e
+        }
+      }
+
+      LOGGER.audit(s"Table created with Database name [$dbName] and Table name 
[$tbName]")
+    }
+    Seq.empty
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa12127d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
new file mode 100644
index 0000000..c4b6783
--- /dev/null
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.preaaggregate
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, 
SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
Cast}
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.command.{ColumnTableRelation, 
DataMapField, Field}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
+import 
org.apache.spark.sql.hive.HiveExternalCatalog.{DATASOURCE_SCHEMA_NUMPARTS, 
DATASOURCE_SCHEMA_PART_PREFIX}
+import org.apache.spark.sql.types.DataType
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, 
LockUsage}
+import 
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
DataMapSchema}
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.format.TableInfo
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.util.CommonUtil
+
+/**
+ * Utility class for keeping all the utility method for pre-aggregate
+ */
+object PreAggregateUtil {
+
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  def getParentCarbonTable(plan: LogicalPlan): CarbonTable = {
+    plan match {
+      case Aggregate(_, aExp, SubqueryAlias(_, l: LogicalRelation, _))
+        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+        
l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.metaData.carbonTable
+      case _ => throw new MalformedCarbonCommandException("table does not 
exist")
+    }
+  }
+
+  /**
+   * Below method will be used to validate the select plan
+   * and get the required fields from select plan
+   * Currently only aggregate query is support any other type of query will
+   * fail
+   * @param plan
+   * @param selectStmt
+   * @return list of fields
+   */
+  def validateActualSelectPlanAndGetAttrubites(plan: LogicalPlan,
+      selectStmt: String): scala.collection.mutable.LinkedHashMap[Field, 
DataMapField] = {
+    val fieldToDataMapFieldMap = 
scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField]
+    plan match {
+      case Aggregate(_, aExp, SubqueryAlias(_, l: LogicalRelation, _))
+        if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+        val carbonTable = 
l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation
+          .metaData.carbonTable
+        val parentTableName = 
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
+          .getTableName
+        val parentDatabaseName = 
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
+          .getDatabaseName
+        val parentTableId = 
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
+          .getTableId
+        if (!carbonTable.getTableInfo.getParentRelationIdentifiers.isEmpty) {
+          throw new MalformedCarbonCommandException(
+            "Pre Aggregation is not supported on Pre-Aggregated Table")
+        }
+        aExp.map {
+          case Alias(attr: AggregateExpression, _) =>
+            if (attr.isDistinct) {
+              throw new MalformedCarbonCommandException(
+                "Distinct is not supported On Pre Aggregation")
+            }
+            fieldToDataMapFieldMap ++= 
((validateAggregateFunctionAndGetFields(carbonTable,
+              attr.aggregateFunction,
+              parentTableName,
+              parentDatabaseName,
+              parentTableId)))
+          case attr: AttributeReference =>
+            fieldToDataMapFieldMap += getField(attr.name,
+              attr.dataType,
+              parentColumnId = carbonTable.getColumnByName(parentTableName, 
attr.name).getColumnId,
+              parentTableName = parentTableName,
+              parentDatabaseName = parentDatabaseName, parentTableId = 
parentTableId)
+          case Alias(attr: AttributeReference, _) =>
+            fieldToDataMapFieldMap += getField(attr.name,
+              attr.dataType,
+              parentColumnId = carbonTable.getColumnByName(parentTableName, 
attr.name).getColumnId,
+              parentTableName = parentTableName,
+              parentDatabaseName = parentDatabaseName, parentTableId = 
parentTableId)
+          case _ =>
+            throw new MalformedCarbonCommandException(s"Unsupported Select 
Statement:${
+              selectStmt } ")
+        }
+        Some(carbonTable)
+      case _ =>
+        throw new MalformedCarbonCommandException(s"Unsupported Select 
Statement:${ selectStmt } ")
+    }
+    fieldToDataMapFieldMap
+  }
+
+  /**
+   * Below method will be used to validate about the aggregate function
+   * which is applied on select query.
+   * Currently sum, max, min, count, avg is supported
+   * in case of any other aggregate function it will throw error
+   * In case of avg it will return two fields one for count
+   * and other of sum of that column to support rollup
+   * @param carbonTable
+   * @param aggFunctions
+   * @param parentTableName
+   * @param parentDatabaseName
+   * @param parentTableId
+   * @return list of fields
+   */
+  def validateAggregateFunctionAndGetFields(carbonTable: CarbonTable,
+      aggFunctions: AggregateFunction,
+      parentTableName: String,
+      parentDatabaseName: String,
+      parentTableId: String) : scala.collection.mutable.ListBuffer[(Field, 
DataMapField)] = {
+    val list = scala.collection.mutable.ListBuffer.empty[(Field, DataMapField)]
+    aggFunctions match {
+      case sum@Sum(attr: AttributeReference) =>
+        list += getField(attr.name,
+          attr.dataType,
+          sum.prettyName,
+          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+          parentTableName,
+          parentDatabaseName, parentTableId = parentTableId)
+      case sum@Sum(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+        list += getField(attr.name,
+          changeDataType,
+          sum.prettyName,
+          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+          parentTableName,
+          parentDatabaseName, parentTableId = parentTableId)
+      case count@Count(Seq(attr: AttributeReference)) =>
+        list += getField(attr.name,
+          attr.dataType,
+          count.prettyName,
+          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+          parentTableName,
+          parentDatabaseName, parentTableId = parentTableId)
+      case min@Min(attr: AttributeReference) =>
+        list += getField(attr.name,
+          attr.dataType,
+          min.prettyName,
+          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+          parentTableName,
+          parentDatabaseName, parentTableId = parentTableId)
+      case min@Min(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+        list += getField(attr.name,
+          changeDataType,
+          min.prettyName,
+          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+          parentTableName,
+          parentDatabaseName, parentTableId = parentTableId)
+      case max@Max(attr: AttributeReference) =>
+        list += getField(attr.name,
+          attr.dataType,
+          max.prettyName,
+          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+          parentTableName,
+          parentDatabaseName, parentTableId = parentTableId)
+      case max@Max(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+        list += getField(attr.name,
+          changeDataType,
+          max.prettyName,
+          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+          parentTableName,
+          parentDatabaseName, parentTableId = parentTableId)
+      case Average(attr: AttributeReference) =>
+        getField(attr.name,
+          attr.dataType,
+          "sum",
+          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+          parentTableName,
+          parentDatabaseName, parentTableId = parentTableId)
+        list += getField(attr.name,
+          attr.dataType,
+          "count",
+          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+          parentTableName,
+          parentDatabaseName, parentTableId = parentTableId)
+      case Average(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+        list += getField(attr.name,
+          changeDataType,
+          "sum",
+          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+          parentTableName,
+          parentDatabaseName, parentTableId = parentTableId)
+        list += getField(attr.name,
+          changeDataType,
+          "count",
+          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+          parentTableName,
+          parentDatabaseName, parentTableId = parentTableId)
+      case _ =>
+        throw new MalformedCarbonCommandException("Un-Supported Aggregation 
Type")
+    }
+  }
+
+  /**
+   * Below method will be used to get the fields object for pre aggregate table
+   * @param columnName
+   * @param dataType
+   * @param aggregateType
+   * @param parentColumnId
+   * @param parentTableName
+   * @param parentDatabaseName
+   * @param parentTableId
+   * @return fields object
+   */
+  def getField(columnName: String,
+      dataType: DataType,
+      aggregateType: String = "",
+      parentColumnId: String,
+      parentTableName: String,
+      parentDatabaseName: String,
+      parentTableId: String): (Field, DataMapField) = {
+    val actualColumnName = if (aggregateType.equals("")) {
+      parentTableName + '_' + columnName
+    } else {
+      parentTableName + '_' + columnName + '_' + aggregateType
+    }
+    val rawSchema = '`' + actualColumnName + '`' + ' ' + dataType.typeName
+    val columnTableRelation = ColumnTableRelation(parentColumnName = 
columnName,
+      parentColumnId = parentColumnId,
+      parentTableName = parentTableName,
+      parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
+    val dataMapField = DataMapField(aggregateType, Some(columnTableRelation))
+    if (dataType.typeName.startsWith("decimal")) {
+      val (precision, scale) = 
CommonUtil.getScaleAndPrecision(dataType.catalogString)
+      (Field(column = actualColumnName,
+        dataType = Some(dataType.typeName),
+        name = Some(actualColumnName),
+        children = None,
+        precision = precision,
+        scale = scale,
+        rawSchema = rawSchema), dataMapField)
+    }
+    else {
+      (Field(column = actualColumnName,
+        dataType = Some(dataType.typeName),
+        name = Some(actualColumnName),
+        children = None,
+        rawSchema = rawSchema), dataMapField)
+    }
+  }
+
+  /**
+   * Below method will be used to update the main table about the pre 
aggregate table information
+   * in case of any exption it will throw error so pre aggregate table 
creation will fail
+   * @param dbName
+   * @param tableName
+   * @param childSchema
+   * @param sparkSession
+   */
+  def updateMainTable(dbName: String, tableName: String,
+      childSchema: DataMapSchema, sparkSession: SparkSession): Unit = {
+    val LOGGER: LogService = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
+      LockUsage.DROP_TABLE_LOCK)
+    var locks = List.empty[ICarbonLock]
+    var carbonTable: CarbonTable = null
+    var numberOfCurrentChild: Int = 0
+    try {
+      val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      carbonTable = metastore
+        .lookupRelation(Some(dbName), 
tableName)(sparkSession).asInstanceOf[CarbonRelation]
+        .tableMeta.carbonTable
+      locks = acquireLock(dbName, tableName, locksToBeAcquired, carbonTable)
+      // get the latest carbon table and check for column existence
+      // read the latest schema file
+      val carbonTablePath = 
CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+        carbonTable.getCarbonTableIdentifier)
+      val thriftTableInfo: TableInfo = 
metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+      val schemaConverter = new ThriftWrapperSchemaConverterImpl()
+      val wrapperTableInfo = schemaConverter
+        .fromExternalToWrapperTableInfo(thriftTableInfo,
+          dbName,
+          tableName,
+          carbonTable.getStorePath)
+      numberOfCurrentChild = wrapperTableInfo.getDataMapSchemaList.size
+      wrapperTableInfo.getDataMapSchemaList.add(childSchema)
+      val thriftTable = schemaConverter
+        .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
+      updateSchemaInfo(carbonTable,
+        thriftTable)(sparkSession,
+        sparkSession.sessionState.asInstanceOf[CarbonSessionState])
+      LOGGER.info(s"Pre Aggeragte Parent table updated is successful for table 
$dbName.$tableName")
+    } catch {
+      case e: Exception =>
+        LOGGER.error(e, "Pre Aggregate Parent table update failed reverting 
changes")
+        revertMainTableChanges(dbName, tableName, 
numberOfCurrentChild)(sparkSession)
+        throw e
+    } finally {
+      // release lock after command execution completion
+      releaseLocks(locks)
+    }
+    Seq.empty
+  }
+
+  /**
+   * Below method will be used to update the main table schema
+   * @param carbonTable
+   * @param thriftTable
+   * @param sparkSession
+   * @param sessionState
+   */
+  def updateSchemaInfo(carbonTable: CarbonTable,
+      thriftTable: TableInfo)(sparkSession: SparkSession,
+      sessionState: CarbonSessionState): Unit = {
+    val dbName = carbonTable.getDatabaseName
+    val tableName = carbonTable.getFactTableName
+    CarbonEnv.getInstance(sparkSession).carbonMetastore
+      .updateTableSchemaForDataMap(carbonTable.getCarbonTableIdentifier,
+        carbonTable.getCarbonTableIdentifier,
+        thriftTable,
+        carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
+    val tableIdentifier = TableIdentifier(tableName, Some(dbName))
+    sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
+  }
+
+  /**
+   * This method will split schema string into multiple parts of configured 
size and
+   * registers the parts as keys in tableProperties which will be read by 
spark to prepare
+   * Carbon Table fields
+   *
+   * @param sparkConf
+   * @param schemaJsonString
+   * @return
+   */
+  private def prepareSchemaJson(sparkConf: SparkConf,
+      schemaJsonString: String): String = {
+    val threshold = sparkConf
+      .getInt(CarbonCommonConstants.SPARK_SCHEMA_STRING_LENGTH_THRESHOLD,
+        CarbonCommonConstants.SPARK_SCHEMA_STRING_LENGTH_THRESHOLD_DEFAULT)
+    // Split the JSON string.
+    val parts = schemaJsonString.grouped(threshold).toSeq
+    var schemaParts: Seq[String] = Seq.empty
+    schemaParts = schemaParts :+ s"'$DATASOURCE_SCHEMA_NUMPARTS'='${ 
parts.size }'"
+    parts.zipWithIndex.foreach { case (part, index) =>
+      schemaParts = schemaParts :+ 
s"'$DATASOURCE_SCHEMA_PART_PREFIX$index'='$part'"
+    }
+    schemaParts.mkString(",")
+  }
+
+  /**
+   * Validates that the table exists and acquires meta lock on it.
+   *
+   * @param dbName
+   * @param tableName
+   * @return
+   */
+  def acquireLock(dbName: String,
+      tableName: String,
+      locksToBeAcquired: List[String],
+      table: CarbonTable): List[ICarbonLock] = {
+    // acquire the lock first
+    val acquiredLocks = ListBuffer[ICarbonLock]()
+    try {
+      locksToBeAcquired.foreach { lock =>
+        acquiredLocks += 
CarbonLockUtil.getLockObject(table.getCarbonTableIdentifier, lock)
+      }
+      acquiredLocks.toList
+    } catch {
+      case e: Exception =>
+        releaseLocks(acquiredLocks.toList)
+        throw e
+    }
+  }
+
+  /**
+   * This method will release the locks acquired for an operation
+   *
+   * @param locks
+   */
+  def releaseLocks(locks: List[ICarbonLock]): Unit = {
+    locks.foreach { carbonLock =>
+      if (carbonLock.unlock()) {
+        LOGGER.info("Pre agg table lock released successfully")
+      } else {
+        LOGGER.error("Unable to release lock during Pre agg table cretion")
+      }
+    }
+  }
+
+  /**
+   * This method reverts the changes to the schema if add column command fails.
+   *
+   * @param dbName
+   * @param tableName
+   * @param numberOfChildSchema
+   * @param sparkSession
+   */
+  def revertMainTableChanges(dbName: String, tableName: String, 
numberOfChildSchema: Int)
+    (sparkSession: SparkSession): Unit = {
+    val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    val carbonTable = metastore
+      .lookupRelation(Some(dbName), 
tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
+      .carbonTable
+    carbonTable.getTableLastUpdatedTime
+    val carbonTablePath = 
CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+      carbonTable.getCarbonTableIdentifier)
+    val thriftTable: TableInfo = 
metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+    if (thriftTable.dataMapSchemas.size > numberOfChildSchema) {
+      metastore
+        
.revertTableSchemaForPreAggCreationFailure(carbonTable.getCarbonTableIdentifier,
+          thriftTable, 
carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa12127d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index c000488..47c570b 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -106,7 +106,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
       }
       val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
         newTableName, carbonTable.getCarbonTableIdentifier.getTableId)
-      val newTablePath = metastore.updateTableSchema(newTableIdentifier,
+      val newTablePath = 
metastore.updateTableSchemaForAlter(newTableIdentifier,
         carbonTable.getCarbonTableIdentifier,
         tableInfo,
         schemaEvolutionEntry,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa12127d/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 9822d8f..51c7f3b 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -43,6 +43,7 @@ import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.core.writer.ThriftWriter
 import org.apache.carbondata.events.{LookupRelationPostEvent, 
OperationContext, OperationListenerBus}
+import org.apache.carbondata.format
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
 import org.apache.carbondata.processing.merger.TableMeta
 import org.apache.carbondata.spark.util.CarbonSparkUtil
@@ -111,6 +112,22 @@ class CarbonFileMetastore extends CarbonMetaStore {
     }
   }
 
+  /**
+   * This method will overwrite the existing schema and update it with the 
given details
+   *
+   * @param newTableIdentifier
+   * @param thriftTableInfo
+   * @param carbonStorePath
+   * @param sparkSession
+   */
+  def updateTableSchemaForDataMap(newTableIdentifier: CarbonTableIdentifier,
+      oldTableIdentifier: CarbonTableIdentifier,
+      thriftTableInfo: org.apache.carbondata.format.TableInfo,
+      carbonStorePath: String)(sparkSession: SparkSession): String = {
+    updateTableSchemaForAlter(newTableIdentifier,
+      oldTableIdentifier, thriftTableInfo, null, carbonStorePath) 
(sparkSession)
+  }
+
   def lookupRelation(dbName: Option[String], tableName: String)
     (sparkSession: SparkSession): LogicalPlan = {
     lookupRelation(TableIdentifier(tableName, dbName))(sparkSession)
@@ -214,7 +231,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
    * @param tablePath
    * @param sparkSession
    */
-  def updateTableSchema(newTableIdentifier: CarbonTableIdentifier,
+  def updateTableSchemaForAlter(newTableIdentifier: CarbonTableIdentifier,
       oldTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
       schemaEvolutionEntry: SchemaEvolutionEntry,
@@ -251,7 +268,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
    * @param tablePath
    * @param sparkSession
    */
-  def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
+  def revertTableSchemaInAlterFailure(carbonTableIdentifier: 
CarbonTableIdentifier,
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
       tablePath: String)(sparkSession: SparkSession): String = {
     val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
@@ -271,7 +288,27 @@ class CarbonFileMetastore extends CarbonMetaStore {
     path
   }
 
+  override def revertTableSchemaForPreAggCreationFailure(carbonTableIdentifier:
+  CarbonTableIdentifier,
+      thriftTableInfo: org.apache.carbondata.format.TableInfo,
+      tablePath: String)(sparkSession: SparkSession): String = {
+    val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+    val schemaConverter = new ThriftWrapperSchemaConverterImpl
+    val wrapperTableInfo = schemaConverter
+      .fromExternalToWrapperTableInfo(thriftTableInfo,
+        carbonTableIdentifier.getDatabaseName,
+        carbonTableIdentifier.getTableName,
+        tableIdentifier.getStorePath)
+    val childSchemaList = wrapperTableInfo.getDataMapSchemaList
+    childSchemaList.remove(childSchemaList.size() - 1)
+    wrapperTableInfo.setStorePath(tableIdentifier.getStorePath)
+    val path = createSchemaThriftFile(wrapperTableInfo,
+      thriftTableInfo,
+      tableIdentifier.getCarbonTableIdentifier)
+    addTableCache(wrapperTableInfo, tableIdentifier)
+    path
 
+  }
 
   /**
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa12127d/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index 76241a6..c64b7bb 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -115,7 +115,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
    * @param schemaEvolutionEntry
    * @param sparkSession
    */
-  override def updateTableSchema(newTableIdentifier: CarbonTableIdentifier,
+  override def updateTableSchemaForAlter(newTableIdentifier: 
CarbonTableIdentifier,
       oldTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: format.TableInfo,
       schemaEvolutionEntry: SchemaEvolutionEntry,
@@ -126,7 +126,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
     if (schemaEvolutionEntry != null) {
       
thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry)
     }
-    updateHiveMetaStore(newTableIdentifier,
+    updateHiveMetaStoreForAlter(newTableIdentifier,
       oldTableIdentifier,
       thriftTableInfo,
       identifier.getStorePath,
@@ -134,7 +134,29 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
       schemaConverter)
   }
 
-  private def updateHiveMetaStore(newTableIdentifier: CarbonTableIdentifier,
+  /**
+   * This method will overwrite the existing schema and update it with the 
given details
+   *
+   * @param newTableIdentifier
+   * @param thriftTableInfo
+   * @param carbonStorePath
+   * @param sparkSession
+   */
+  override def updateTableSchemaForDataMap(newTableIdentifier: 
CarbonTableIdentifier,
+      oldTableIdentifier: CarbonTableIdentifier,
+      thriftTableInfo: org.apache.carbondata.format.TableInfo,
+      carbonStorePath: String)(sparkSession: SparkSession): String = {
+    val schemaConverter = new ThriftWrapperSchemaConverterImpl
+    val identifier = AbsoluteTableIdentifier.fromTablePath(carbonStorePath)
+    updateHiveMetaStoreForDataMap(newTableIdentifier,
+      oldTableIdentifier,
+      thriftTableInfo,
+      identifier.getStorePath,
+      sparkSession,
+      schemaConverter)
+  }
+
+  private def updateHiveMetaStoreForAlter(newTableIdentifier: 
CarbonTableIdentifier,
       oldTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: format.TableInfo,
       carbonStorePath: String,
@@ -161,6 +183,30 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
     CarbonStorePath.getCarbonTablePath(carbonStorePath, 
newTableIdentifier).getPath
   }
 
+  private def updateHiveMetaStoreForDataMap(newTableIdentifier: 
CarbonTableIdentifier,
+      oldTableIdentifier: CarbonTableIdentifier,
+      thriftTableInfo: format.TableInfo,
+      carbonStorePath: String,
+      sparkSession: SparkSession,
+      schemaConverter: ThriftWrapperSchemaConverterImpl) = {
+    val wrapperTableInfo = schemaConverter
+      .fromExternalToWrapperTableInfo(thriftTableInfo,
+        newTableIdentifier.getDatabaseName,
+        newTableIdentifier.getTableName,
+        carbonStorePath)
+    wrapperTableInfo.setStorePath(carbonStorePath)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonStorePath, 
newTableIdentifier)
+    val schemaMetadataPath =
+      
CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
+    wrapperTableInfo.setMetaDataFilepath(schemaMetadataPath)
+    val dbName = oldTableIdentifier.getDatabaseName
+    val tableName = oldTableIdentifier.getTableName
+    sparkSession.catalog.refreshTable(TableIdentifier(tableName, 
Some(dbName)).quotedString)
+    removeTableFromMetadata(dbName, tableName)
+    CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
+    CarbonStorePath.getCarbonTablePath(carbonStorePath, 
newTableIdentifier).getPath
+  }
+
   /**
    * This method will is used to remove the evolution entry in case of failure.
    *
@@ -168,7 +214,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
    * @param thriftTableInfo
    * @param sparkSession
    */
-  override def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
+  override def revertTableSchemaInAlterFailure(carbonTableIdentifier: 
CarbonTableIdentifier,
       thriftTableInfo: format.TableInfo,
       tablePath: String)
     (sparkSession: SparkSession): String = {
@@ -176,7 +222,23 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
     val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
     val evolutionEntries = 
thriftTableInfo.fact_table.schema_evolution.schema_evolution_history
     evolutionEntries.remove(evolutionEntries.size() - 1)
-    updateHiveMetaStore(carbonTableIdentifier,
+    updateHiveMetaStoreForAlter(carbonTableIdentifier,
+      carbonTableIdentifier,
+      thriftTableInfo,
+      identifier.getStorePath,
+      sparkSession,
+      schemaConverter)
+  }
+
+  override def revertTableSchemaForPreAggCreationFailure(carbonTableIdentifier:
+  CarbonTableIdentifier,
+      thriftTableInfo: org.apache.carbondata.format.TableInfo,
+      tablePath: String)(sparkSession: SparkSession): String = {
+    val schemaConverter = new ThriftWrapperSchemaConverterImpl
+    val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath)
+    val childSchemas = thriftTableInfo.dataMapSchemas
+    childSchemas.remove(childSchemas.size())
+    updateHiveMetaStoreForAlter(carbonTableIdentifier,
       carbonTableIdentifier,
       thriftTableInfo,
       identifier.getStorePath,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa12127d/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
index dcb43d1..24996ed 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
@@ -66,13 +66,26 @@ trait CarbonMetaStore {
    * @param carbonStorePath
    * @param sparkSession
    */
-  def updateTableSchema(newTableIdentifier: CarbonTableIdentifier,
+  def updateTableSchemaForAlter(newTableIdentifier: CarbonTableIdentifier,
       oldTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
       schemaEvolutionEntry: SchemaEvolutionEntry,
       carbonStorePath: String)(sparkSession: SparkSession): String
 
   /**
+   * This method will overwrite the existing schema and update it with the 
given details
+   *
+   * @param newTableIdentifier
+   * @param thriftTableInfo
+   * @param carbonStorePath
+   * @param sparkSession
+   */
+  def updateTableSchemaForDataMap(newTableIdentifier: CarbonTableIdentifier,
+      oldTableIdentifier: CarbonTableIdentifier,
+      thriftTableInfo: org.apache.carbondata.format.TableInfo,
+      carbonStorePath: String)(sparkSession: SparkSession): String
+
+  /**
    * This method will is used to remove the evolution entry in case of failure.
    *
    * @param carbonTableIdentifier
@@ -80,11 +93,15 @@ trait CarbonMetaStore {
    * @param tablePath
    * @param sparkSession
    */
-  def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
+  def revertTableSchemaInAlterFailure(carbonTableIdentifier: 
CarbonTableIdentifier,
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
       tablePath: String)
     (sparkSession: SparkSession): String
 
+
+  def revertTableSchemaForPreAggCreationFailure(carbonTableIdentifier: 
CarbonTableIdentifier,
+      thriftTableInfo: org.apache.carbondata.format.TableInfo,
+      tablePath: String)(sparkSession: SparkSession): String
   /**
    * Prepare Thrift Schema from wrapper TableInfo and write to disk
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa12127d/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index fc2ed41..bf21bc8 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -440,7 +440,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
       // so checking the start of the string and taking the precision and 
scale.
       // resetting the data type with decimal
       if (f.dataType.getOrElse("").startsWith("decimal")) {
-        val (precision, scale) = 
getScaleAndPrecision(col.dataType.catalogString)
+        val (precision, scale) = 
CommonUtil.getScaleAndPrecision(col.dataType.catalogString)
         f.precision = precision
         f.scale = scale
         f.dataType = Some("decimal")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa12127d/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 0a918df..24a6418 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -18,14 +18,15 @@ package org.apache.spark.sql.parser
 
 import scala.collection.mutable
 
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, 
ParseException, SqlBaseParser}
 import org.apache.spark.sql.catalyst.parser.ParserUtils._
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateTableContext, 
TablePropertyListContext}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkSqlAstBuilder
-import org.apache.spark.sql.execution.command.{BucketFields, 
CarbonCreateTableCommand, PartitionerField, TableModel}
+import org.apache.spark.sql.execution.command._
+import 
org.apache.spark.sql.execution.command.preaaggregate.{CreatePreAggregateTableCommand,
 PreAggregateUtil}
 import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
 import org.apache.spark.sql.types.StructField
 
@@ -40,7 +41,7 @@ import org.apache.carbondata.spark.util.CommonUtil
  */
 class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends 
AbstractSqlParser {
 
-  val astBuilder = new CarbonSqlAstBuilder(conf)
+  val astBuilder = new CarbonSqlAstBuilder(conf, sparkSession: SparkSession)
 
   private val substitutor = new VariableSubstitution(conf)
 
@@ -71,7 +72,8 @@ class CarbonSparkSqlParser(conf: SQLConf, sparkSession: 
SparkSession) extends Ab
   }
 }
 
-class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
+class CarbonSqlAstBuilder(conf: SQLConf, sparkSession: SparkSession)
+  extends SparkSqlAstBuilder(conf) {
 
   val parser = new CarbonSpark2SqlParser
 
@@ -119,8 +121,18 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends 
SparkSqlAstBuilder(conf) {
       val (partitionByStructFields, partitionFields) =
         validateParitionFields(ctx, colNames, tableProperties)
 
-      val fields = parser.getFields(colsStructFields ++ 
partitionByStructFields)
-
+      val isAggTable = tableProperties.get("parent").isDefined
+      var fields = parser.getFields(colsStructFields ++ 
partitionByStructFields)
+      val dfAndFieldRelationTuple = if (isAggTable) {
+        val selectQuery = Option(ctx.query).map(plan).get
+        val df = Dataset.ofRows(sparkSession, selectQuery)
+        val fieldRelationMap = PreAggregateUtil
+          .validateActualSelectPlanAndGetAttrubites(df.logicalPlan, 
source(ctx.query()))
+        fields = fieldRelationMap.keySet.toSeq
+        Some(df, fieldRelationMap)
+      } else {
+        None
+      }
       // validate bucket fields
       val bucketFields: Option[BucketFields] =
         parser.getBucketFields(tableProperties, fields, options)
@@ -139,7 +151,14 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends 
SparkSqlAstBuilder(conf) {
         isAlterFlow = false,
         tableComment)
 
-      CarbonCreateTableCommand(tableModel)
+      if(!isAggTable) {
+        CarbonCreateTableCommand(tableModel)
+      } else {
+        CreatePreAggregateTableCommand(tableModel,
+          dfAndFieldRelationTuple.get._1,
+          queryString = source(ctx.query).toString,
+          fieldRelationMap = dfAndFieldRelationTuple.get._2)
+      }
     } else {
       super.visitCreateTable(ctx)
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa12127d/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala 
b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 44f5a36..bda4eeb 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -136,7 +136,7 @@ object AlterTableUtil {
     val dbName = carbonTable.getDatabaseName
     val tableName = carbonTable.getFactTableName
     CarbonEnv.getInstance(sparkSession).carbonMetastore
-      .updateTableSchema(carbonTable.getCarbonTableIdentifier,
+      .updateTableSchemaForAlter(carbonTable.getCarbonTableIdentifier,
         carbonTable.getCarbonTableIdentifier,
         thriftTable,
         schemaEvolutionEntry,
@@ -211,7 +211,7 @@ object AlterTableUtil {
           .renameForce(carbonTablePath.getParent.toString + 
CarbonCommonConstants.FILE_SEPARATOR +
                        oldTableIdentifier.table)
         val tableIdentifier = new CarbonTableIdentifier(database, 
oldTableIdentifier.table, tableId)
-        metastore.revertTableSchema(tableIdentifier,
+        metastore.revertTableSchemaInAlterFailure(tableIdentifier,
           tableInfo, carbonTablePath.getPath)(sparkSession)
         metastore.removeTableFromMetadata(database, newTableName)
       }
@@ -243,7 +243,7 @@ object AlterTableUtil {
       val addedSchemas = evolutionEntryList.get(evolutionEntryList.size() - 
1).added
       thriftTable.fact_table.table_columns.removeAll(addedSchemas)
       metastore
-        .revertTableSchema(carbonTable.getCarbonTableIdentifier,
+        .revertTableSchemaInAlterFailure(carbonTable.getCarbonTableIdentifier,
           thriftTable, 
carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
     }
   }
@@ -278,7 +278,7 @@ object AlterTableUtil {
         }
       }
       metastore
-        .revertTableSchema(carbonTable.getCarbonTableIdentifier,
+        .revertTableSchemaInAlterFailure(carbonTable.getCarbonTableIdentifier,
           thriftTable, 
carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
     }
   }
@@ -316,7 +316,7 @@ object AlterTableUtil {
         }
       }
       metastore
-        .revertTableSchema(carbonTable.getCarbonTableIdentifier,
+        .revertTableSchemaInAlterFailure(carbonTable.getCarbonTableIdentifier,
           thriftTable, 
carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession)
     }
   }

Reply via email to