[CARBONDATA-1517]- Pre Aggregate Create Table Support Support CTAS in carbon and support creating aggregation tables using CTAS and update aggregation table information to main table schema.
This closes #1433 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/aa12127d Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/aa12127d Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/aa12127d Branch: refs/heads/pre-aggregate Commit: aa12127dda4c4481b6e49b399086e01d2ed6bf08 Parents: 0a66f91 Author: kumarvishal <kumarvishal.1...@gmail.com> Authored: Sun Oct 15 18:05:55 2017 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Fri Nov 10 10:45:30 2017 +0530 ---------------------------------------------------------------------- .../table/column/ParentColumnTableRelation.java | 71 +++ .../ThriftWrapperSchemaConverterImplTest.java | 28 +- .../preaggregate/TestPreAggCreateCommand.scala | 148 +++++++ .../carbondata/spark/util/CommonUtil.scala | 9 + .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 12 +- .../command/carbonTableSchemaCommon.scala | 170 +++++--- .../command/management/LoadTableCommand.scala | 2 +- .../CreatePreAggregateTableCommand.scala | 136 ++++++ .../preaaggregate/PreAggregateUtil.scala | 431 +++++++++++++++++++ .../schema/CarbonAlterTableRenameCommand.scala | 2 +- .../spark/sql/hive/CarbonFileMetastore.scala | 41 +- .../spark/sql/hive/CarbonHiveMetaStore.scala | 72 +++- .../apache/spark/sql/hive/CarbonMetaStore.scala | 21 +- .../sql/parser/CarbonSpark2SqlParser.scala | 2 +- .../spark/sql/parser/CarbonSparkSqlParser.scala | 33 +- .../org/apache/spark/util/AlterTableUtil.scala | 10 +- 16 files changed, 1100 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa12127d/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ParentColumnTableRelation.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ParentColumnTableRelation.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ParentColumnTableRelation.java new file mode 100644 index 0000000..425d0f2 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ParentColumnTableRelation.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.metadata.schema.table.column; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; + +import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier; +import org.apache.carbondata.core.metadata.schema.table.Writable; + +/** + * To maintain the relation of child column to parent table column + */ +public class ParentColumnTableRelation implements Serializable, Writable { + + private RelationIdentifier relationIdentifier; + /** + * parent column id + */ + private String columnId; + + private String columnName; + + public ParentColumnTableRelation(RelationIdentifier relationIdentifier, String columId, + String columnName) { + this.relationIdentifier = relationIdentifier; + this.columnId = columId; + this.columnName = columnName; + } + + public RelationIdentifier getRelationIdentifier() { + return relationIdentifier; + } + + public String getColumnId() { + return columnId; + } + + public String getColumnName() { + return columnName; + } + + @Override public void write(DataOutput out) throws IOException { + relationIdentifier.write(out); + out.writeUTF(columnId); + out.writeUTF(columnName); + } + + @Override public void readFields(DataInput in) throws IOException { + this.relationIdentifier = new RelationIdentifier(null, null, null); + relationIdentifier.readFields(in); + this.columnId = in.readUTF(); + this.columnName = in.readUTF(); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa12127d/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java index 42c0ad6..0fddc25 100644 --- a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java +++ b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java @@ -31,6 +31,7 @@ import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry; import org.apache.carbondata.core.metadata.schema.table.TableInfo; import org.apache.carbondata.core.metadata.schema.table.TableSchema; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.format.DataMapSchema; import mockit.Mock; import mockit.MockUp; @@ -84,6 +85,7 @@ public class ThriftWrapperSchemaConverterImplTest { new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.STRING, "columnName", "1", true, encoders, true); thriftColumnSchema.setSchemaOrdinal(1); + thriftColumnSchema.setAggregate_function(""); thriftColumnSchemas = new ArrayList<org.apache.carbondata.format.ColumnSchema>(); thriftColumnSchemas.add(thriftColumnSchema); thriftSchemaEvolutionEntries = new ArrayList<>(); @@ -421,6 +423,7 @@ public class ThriftWrapperSchemaConverterImplTest { new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.BOOLEAN, "columnName", "1", true, encoders, true); thriftColumnSchema.setSchemaOrdinal(1); + thriftColumnSchema.setAggregate_function(""); ColumnSchema wrapperColumnSchema = new ColumnSchema(); new MockUp<ColumnSchema>() { @@ -483,6 +486,8 @@ public class ThriftWrapperSchemaConverterImplTest { @Mock public String getColumnReferenceId() { return "1"; } + + @Mock public String getAggFunction() {return "" ;} }; org.apache.carbondata.format.ColumnSchema actualResult = @@ -496,7 +501,7 @@ public class ThriftWrapperSchemaConverterImplTest { new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.STRING, "columnName", "1", true, encoders, true); thriftColumnSchema.setSchemaOrdinal(1); - + thriftColumnSchema.setAggregate_function(""); new MockUp<ColumnSchema>() { @Mock public List<Encoding> getEncodingList() { return encodings; @@ -571,7 +576,7 @@ public class ThriftWrapperSchemaConverterImplTest { new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.INT, "columnName", "1", true, encoders, true); thriftColumnSchema.setSchemaOrdinal(1); - + thriftColumnSchema.setAggregate_function(""); new MockUp<ColumnSchema>() { @Mock public List<Encoding> getEncodingList() { return encodings; @@ -645,6 +650,7 @@ public class ThriftWrapperSchemaConverterImplTest { new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.SHORT, "columnName", "1", true, encoders, true); thriftColumnSchema.setSchemaOrdinal(1); + thriftColumnSchema.setAggregate_function(""); new MockUp<ColumnSchema>() { @Mock public List<Encoding> getEncodingList() { return encodings; @@ -718,7 +724,7 @@ public class ThriftWrapperSchemaConverterImplTest { new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.LONG, "columnName", "1", true, encoders, true); thriftColumnSchema.setSchemaOrdinal(1); - + thriftColumnSchema.setAggregate_function(""); new MockUp<ColumnSchema>() { @Mock public List<Encoding> getEncodingList() { return encodings; @@ -792,6 +798,7 @@ public class ThriftWrapperSchemaConverterImplTest { new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.DOUBLE, "columnName", "1", true, encoders, true); thriftColumnSchema.setSchemaOrdinal(1); + thriftColumnSchema.setAggregate_function(""); new MockUp<ColumnSchema>() { @Mock public List<Encoding> getEncodingList() { @@ -866,6 +873,7 @@ public class ThriftWrapperSchemaConverterImplTest { new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.DECIMAL, "columnName", "1", true, encoders, true); thriftColumnSchema.setSchemaOrdinal(1); + thriftColumnSchema.setAggregate_function(""); new MockUp<ColumnSchema>() { @Mock public List<Encoding> getEncodingList() { return encodings; @@ -926,6 +934,10 @@ public class ThriftWrapperSchemaConverterImplTest { @Mock public String getColumnReferenceId() { return "1"; } + + @Mock public String getAggFunction() { + return ""; + } }; ColumnSchema wrapperColumnSchema = new ColumnSchema(); @@ -940,6 +952,7 @@ public class ThriftWrapperSchemaConverterImplTest { org.apache.carbondata.format.DataType.TIMESTAMP, "columnName", "1", true, encoders, true); thriftColumnSchema.setSchemaOrdinal(1); + thriftColumnSchema.setAggregate_function(""); new MockUp<ColumnSchema>() { @Mock public List<Encoding> getEncodingList() { @@ -1014,7 +1027,7 @@ public class ThriftWrapperSchemaConverterImplTest { new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.ARRAY, "columnName", "1", true, encoders, true); thriftColumnSchema.setSchemaOrdinal(1); - + thriftColumnSchema.setAggregate_function(""); new MockUp<ColumnSchema>() { @Mock public List<Encoding> getEncodingList() { return encodings; @@ -1088,6 +1101,7 @@ public class ThriftWrapperSchemaConverterImplTest { new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.STRUCT, "columnName", "1", true, encoders, true); thriftColumnSchema.setSchemaOrdinal(1); + thriftColumnSchema.setAggregate_function(""); new MockUp<ColumnSchema>() { @Mock public List<Encoding> getEncodingList() { @@ -1168,6 +1182,7 @@ public class ThriftWrapperSchemaConverterImplTest { encoders, true); thriftColumnSchema.setSchemaOrdinal(1); + thriftColumnSchema.setAggregate_function(""); new MockUp<ColumnSchema>() { @Mock public List<Encoding> getEncodingList() { @@ -1318,7 +1333,7 @@ public class ThriftWrapperSchemaConverterImplTest { new org.apache.carbondata.format.ColumnSchema(org.apache.carbondata.format.DataType.BOOLEAN, "columnName", "1", true, encoders, true); thriftColumnSchema.setSchemaOrdinal(1); - + thriftColumnSchema.setAggregate_function(""); ColumnSchema wrapperColumnSchema = new ColumnSchema(); org.apache.carbondata.format.ColumnSchema actualResult = thriftWrapperSchemaConverter.fromWrapperToExternalColumnSchema(wrapperColumnSchema); @@ -1506,6 +1521,8 @@ public class ThriftWrapperSchemaConverterImplTest { @Mock public String getColumnReferenceId() { return "1"; } + + @Mock public String getAggFunction() { return "";} }; new MockUp<TableInfo>() { @@ -1542,6 +1559,7 @@ public class ThriftWrapperSchemaConverterImplTest { org.apache.carbondata.format.TableInfo expectedResult = new org.apache.carbondata.format.TableInfo(thriftFactTable, new ArrayList<org.apache .carbondata.format.TableSchema>()); + expectedResult.setDataMapSchemas(new ArrayList<DataMapSchema>()); assertEquals(expectedResult, actualResult); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa12127d/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala new file mode 100644 index 0000000..6120e88 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala @@ -0,0 +1,148 @@ +package org.apache.carbondata.integration.spark.testsuite.preaggregate + +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll { + + override def beforeAll { + sql("drop table if exists PreAggMain") + sql("drop table if exists PreAggMain1") + sql("drop table if exists PreAggMain2") + sql("create table preaggMain (a string, b string, c string) stored by 'carbondata'") + sql("create table preaggMain1 (a string, b string, c string) stored by 'carbondata' tblProperties('DICTIONARY_INCLUDE' = 'a')") + sql("create table preaggMain2 (a string, b string, c string) stored by 'carbondata'") + } + + + test("test pre agg create table One") { + sql("create table preagg1 stored BY 'carbondata' tblproperties('parent'='PreAggMain') as select a,sum(b) from PreAggMain group by a") + checkExistence(sql("DESCRIBE FORMATTED preagg1"), true, "preaggmain_a") + checkExistence(sql("DESCRIBE FORMATTED preagg1"), true, "preaggmain_b_sum") + sql("drop table preagg1") + } + + test("test pre agg create table Two") { + sql("create table preagg2 stored BY 'carbondata' tblproperties('parent'='PreAggMain') as select a as a1,sum(b) from PreAggMain group by a") + checkExistence(sql("DESCRIBE FORMATTED preagg2"), true, "preaggmain_a") + checkExistence(sql("DESCRIBE FORMATTED preagg2"), true, "preaggmain_b_sum") + sql("drop table preagg2") + } + + test("test pre agg create table Three") { + sql("create table preagg3 stored BY 'carbondata' tblproperties('parent'='PreAggMain') as select a,sum(b) as sum from PreAggMain group by a") + checkExistence(sql("DESCRIBE FORMATTED preagg3"), true, "preaggmain_a") + checkExistence(sql("DESCRIBE FORMATTED preagg3"), true, "preaggmain_b_sum") + sql("drop table preagg3") + } + + test("test pre agg create table four") { + sql("create table preagg4 stored BY 'carbondata' tblproperties('parent'='PreAggMain') as select a as a1,sum(b) as sum from PreAggMain group by a") + checkExistence(sql("DESCRIBE FORMATTED preagg4"), true, "preaggmain_a") + checkExistence(sql("DESCRIBE FORMATTED preagg4"), true, "preaggmain_b_sum") + sql("drop table preagg4") + } + + + test("test pre agg create table five") { + sql("create table preagg11 stored BY 'carbondata' tblproperties('parent'='PreAggMain1') as select a,sum(b) from PreAggMain1 group by a") + checkExistence(sql("DESCRIBE FORMATTED preagg11"), true, "preaggmain1_a") + checkExistence(sql("DESCRIBE FORMATTED preagg11"), true, "preaggmain1_b_sum") + checkExistence(sql("DESCRIBE FORMATTED preagg11"), true, "DICTIONARY") + sql("drop table preagg11") + } + + test("test pre agg create table six") { + sql("create table preagg12 stored BY 'carbondata' tblproperties('parent'='PreAggMain1') as select a as a1,sum(b) from PreAggMain1 group by a") + checkExistence(sql("DESCRIBE FORMATTED preagg12"), true, "preaggmain1_a") + checkExistence(sql("DESCRIBE FORMATTED preagg12"), true, "preaggmain1_b_sum") + checkExistence(sql("DESCRIBE FORMATTED preagg12"), true, "DICTIONARY") + sql("drop table preagg12") + } + + test("test pre agg create table seven") { + sql("create table preagg13 stored BY 'carbondata' tblproperties('parent'='PreAggMain1') as select a,sum(b) as sum from PreAggMain1 group by a") + checkExistence(sql("DESCRIBE FORMATTED preagg13"), true, "preaggmain1_a") + checkExistence(sql("DESCRIBE FORMATTED preagg13"), true, "preaggmain1_b_sum") + checkExistence(sql("DESCRIBE FORMATTED preagg13"), true, "DICTIONARY") + sql("drop table preagg13") + } + + test("test pre agg create table eight") { + sql("create table preagg14 stored BY 'carbondata' tblproperties('parent'='PreAggMain1') as select a as a1,sum(b) as sum from PreAggMain1 group by a") + checkExistence(sql("DESCRIBE FORMATTED preagg14"), true, "preaggmain1_a") + checkExistence(sql("DESCRIBE FORMATTED preagg14"), true, "preaggmain1_b_sum") + checkExistence(sql("DESCRIBE FORMATTED preagg14"), true, "DICTIONARY") + sql("drop table preagg14") + } + + + test("test pre agg create table nine") { + sql("create table preagg15 stored BY 'carbondata' tblproperties('parent'='PreAggMain2') as select a,avg(b) from PreAggMain2 group by a") + checkExistence(sql("DESCRIBE FORMATTED preagg15"), true, "preaggmain2_a") + checkExistence(sql("DESCRIBE FORMATTED preagg15"), true, "preaggmain2_b_sum") + checkExistence(sql("DESCRIBE FORMATTED preagg15"), true, "preaggmain2_b_count") + sql("drop table preagg15") + } + + test("test pre agg create table ten") { + sql("create table preagg16 stored BY 'carbondata' tblproperties('parent'='PreAggMain2') as select a as a1,max(b) from PreAggMain2 group by a") + checkExistence(sql("DESCRIBE FORMATTED preagg16"), true, "preaggmain2_a") + checkExistence(sql("DESCRIBE FORMATTED preagg16"), true, "preaggmain2_b_max") + sql("drop table preagg16") + } + + test("test pre agg create table eleven") { + sql("create table preagg17 stored BY 'carbondata' tblproperties('parent'='PreAggMain2') as select a,min(b) from PreAggMain2 group by a") + checkExistence(sql("DESCRIBE FORMATTED preagg17"), true, "preaggmain2_a") + checkExistence(sql("DESCRIBE FORMATTED preagg17"), true, "preaggmain2_b_min") + sql("drop table preagg17") + } + + test("test pre agg create table twelve") { + sql("create table preagg18 stored BY 'carbondata' tblproperties('parent'='PreAggMain2') as select a as a1,count(b) from PreAggMain2 group by a") + checkExistence(sql("DESCRIBE FORMATTED preagg18"), true, "preaggmain2_a") + checkExistence(sql("DESCRIBE FORMATTED preagg18"), true, "preaggmain2_b_count") + sql("drop table preagg18") + } + + test("test pre agg create table thirteen") { + try { + sql( + "create table preagg19 stored BY 'carbondata' tblproperties('parent'='PreAggMain2') as select a as a1,count(distinct b) from PreAggMain2 group by a") + assert(false) + } catch { + case _: Exception => + assert(true) + } + } + + test("test pre agg create table fourteen") { + try { + sql( + "create table preagg20 stored BY 'carbondata' tblproperties('parent'='PreAggMain2') as select a as a1,sum(distinct b) from PreAggMain2 group by a") + assert(false) + } catch { + case _: Exception => + assert(true) + } + } + + test("test pre agg create table fifteen") { + try { + sql( + "create table preagg21 stored BY 'carbondata' tblproperties('parent'='PreAggMain2') as select a as a1,sum(b) from PreAggMain2 where a='vishal' group by a") + assert(false) + } catch { + case _: Exception => + assert(true) + } + } + + + override def afterAll { + sql("drop table if exists PreAggMain") + sql("drop table if exists PreAggMain1") + sql("drop table if exists PreAggMain2") + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa12127d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index 27ebf42..84294ff 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -20,6 +20,7 @@ package org.apache.carbondata.spark.util import java.text.SimpleDateFormat import java.util +import java.util.regex.{Matcher, Pattern} import scala.collection.JavaConverters._ import scala.collection.mutable.Map @@ -834,6 +835,14 @@ object CommonUtil { } } + def getScaleAndPrecision(dataType: String): (Int, Int) = { + val m: Matcher = Pattern.compile("^decimal\\(([^)]+)\\)").matcher(dataType) + m.find() + val matchedString: String = m.group(1) + val scaleAndPrecision = matchedString.split(",") + (Integer.parseInt(scaleAndPrecision(0).trim), Integer.parseInt(scaleAndPrecision(1).trim)) + } + /** * Merge the carbonindex files with in the segment to carbonindexmerge file inside same segment */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa12127d/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index ee51954..bb80bce 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -221,16 +221,6 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { dimensions ++ complexDimensions } - - - def getScaleAndPrecision(dataType: String): (Int, Int) = { - val m: Matcher = Pattern.compile("^decimal\\(([^)]+)\\)").matcher(dataType) - m.find() - val matchedString: String = m.group(1) - val scaleAndPrecision = matchedString.split(",") - (Integer.parseInt(scaleAndPrecision(0).trim), Integer.parseInt(scaleAndPrecision(1).trim)) - } - /** * This will prepate the Model from the Tree details. * @@ -1074,7 +1064,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { // if it is present then extracting the precision and scale. resetting the data type // with Decimal. case _ if dataType.startsWith("decimal") => - val (precision, scale) = getScaleAndPrecision(dataType) + val (precision, scale) = CommonUtil.getScaleAndPrecision(dataType) Field(field.column, Some("Decimal"), field.name, http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa12127d/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala index 1188b59..37ba8a5 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala @@ -33,8 +33,8 @@ import org.apache.carbondata.core.metadata.CarbonTableIdentifier import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, DecimalType} import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema._ -import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema} -import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationIdentifier, TableInfo, TableSchema} +import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema, ParentColumnTableRelation} import org.apache.carbondata.core.service.CarbonCommonFactory import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentUpdateStatusManager} import org.apache.carbondata.core.util.DataTypeUtil @@ -61,13 +61,30 @@ case class TableModel( util.List[ColumnProperty]]] = None, bucketFields: Option[BucketFields], partitionInfo: Option[PartitionInfo], - tableComment: Option[String] = None) + tableComment: Option[String] = None, + var parentTable: Option[CarbonTable] = None, + var dataMapRelation: Option[scala.collection.mutable.LinkedHashMap[Field, DataMapField]] = None) case class Field(column: String, var dataType: Option[String], name: Option[String], children: Option[List[Field]], parent: String = null, storeType: Option[String] = Some("columnar"), var schemaOrdinal: Int = -1, - var precision: Int = 0, var scale: Int = 0, var rawSchema: String = "") + var precision: Int = 0, var scale: Int = 0, var rawSchema: String = "") { + override def equals(o: Any) : Boolean = o match { + case that: Field => + that.column.equalsIgnoreCase(this.column) + case _ => false + } + override def hashCode : Int = column.hashCode +} + +case class DataMapField(aggregateFunction: String = "", + columnTableRelation: Option[ColumnTableRelation] = None) { +} + +case class ColumnTableRelation(parentColumnName: String, parentColumnId: String, + parentTableName: String, parentDatabaseName: String, parentTableId: String) { +} case class ColumnProperty(key: String, value: String) @@ -358,15 +375,13 @@ class TableNewProcessor(cm: TableModel) { fields.foreach(field => { val encoders = new java.util.ArrayList[Encoding]() encoders.add(Encoding.DICTIONARY) - val columnSchema: ColumnSchema = TableNewProcessor.createColumnSchema( - field, + val columnSchema: ColumnSchema = getColumnSchema( + DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")), + field.name.getOrElse(field.column), encoders, - isDimensionCol = true, - field.precision, - field.scale, - field.schemaOrdinal, - cm.highcardinalitydims.getOrElse(Seq()), - cm.databaseName) + true, + field, + cm.dataMapRelation) allColumns ++= Seq(columnSchema) if (field.children.get != null) { columnSchema.setNumberOfChild(field.children.get.size) @@ -377,6 +392,56 @@ class TableNewProcessor(cm: TableModel) { allColumns } + def getColumnSchema( + dataType: DataType, + colName: String, + encoders: java.util.List[Encoding], + isDimensionCol: Boolean, + field: Field, + map: Option[scala.collection.mutable.LinkedHashMap[Field, DataMapField]]) : ColumnSchema = { + val columnSchema = new ColumnSchema() + columnSchema.setDataType(dataType) + columnSchema.setColumnName(colName) + val isParentColumnRelation = map.isDefined && map.get.get(field).isDefined + if(!isParentColumnRelation) { + val highCardinalityDims = cm.highcardinalitydims.getOrElse(Seq()) + if (highCardinalityDims.contains(colName)) { + encoders.remove(Encoding.DICTIONARY) + } + if (dataType == DataTypes.DATE) { + encoders.add(Encoding.DIRECT_DICTIONARY) + } + if (dataType == DataTypes.TIMESTAMP && !highCardinalityDims.contains(colName)) { + encoders.add(Encoding.DIRECT_DICTIONARY) + } + } + columnSchema.setEncodingList(encoders) + val colUniqueIdGenerator = CarbonCommonFactory.getColumnUniqueIdGenerator + val columnUniqueId = colUniqueIdGenerator.generateUniqueId(cm.databaseName, + columnSchema) + columnSchema.setColumnUniqueId(columnUniqueId) + columnSchema.setColumnReferenceId(columnUniqueId) + columnSchema.setDimensionColumn(isDimensionCol) + columnSchema.setPrecision(field.precision) + columnSchema.setScale(field.scale) + columnSchema.setSchemaOrdinal(field.schemaOrdinal) + columnSchema.setSortColumn(false) + if(isParentColumnRelation) { + val dataMapField = map.get.get(field).get + columnSchema.setAggFunction(dataMapField.aggregateFunction); + val relation = dataMapField.columnTableRelation.get + val parentColumnTableRelationList = new util.ArrayList[ParentColumnTableRelation] + val relationIdentifier = new RelationIdentifier( + relation.parentDatabaseName, relation.parentTableName, relation.parentTableId) + val parentColumnTableRelation = new ParentColumnTableRelation( + relationIdentifier, relation.parentColumnId, relation.parentColumnName) + parentColumnTableRelationList.add(parentColumnTableRelation) + columnSchema.setParentColumnTableRelations(parentColumnTableRelationList) + } + // TODO: Need to fill RowGroupID, converted type + // & Number of Children after DDL finalization + columnSchema + } // process create dml fields and create wrapper TableInfo object def process: TableInfo = { @@ -388,17 +453,22 @@ class TableNewProcessor(cm: TableModel) { // Sort columns should be at the begin of all columns cm.sortKeyDims.get.foreach { keyDim => val field = cm.dimCols.find(keyDim equals _.column).get - val encoders = new java.util.ArrayList[Encoding]() - encoders.add(Encoding.DICTIONARY) - val columnSchema: ColumnSchema = TableNewProcessor.createColumnSchema( - field, + val encoders = if (cm.parentTable.isDefined && cm.dataMapRelation.get.get(field).isDefined) { + cm.parentTable.get.getColumnByName( + cm.parentTable.get.getFactTableName, + cm.dataMapRelation.get.get(field).get.columnTableRelation.get.parentColumnName).getEncoder + } else { + val encoders = new java.util.ArrayList[Encoding]() + encoders.add(Encoding.DICTIONARY) + encoders + } + val columnSchema = getColumnSchema( + DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")), + field.name.getOrElse(field.column), encoders, - isDimensionCol = true, - field.precision, - field.scale, - field.schemaOrdinal, - cm.highcardinalitydims.getOrElse(Seq()), - cm.databaseName) + true, + field, + cm.dataMapRelation) columnSchema.setSortColumn(true) allColumns :+= columnSchema index = index + 1 @@ -407,17 +477,24 @@ class TableNewProcessor(cm: TableModel) { cm.dimCols.foreach { field => val sortField = cm.sortKeyDims.get.find(field.column equals _) if (sortField.isEmpty) { - val encoders = new java.util.ArrayList[Encoding]() - encoders.add(Encoding.DICTIONARY) - val columnSchema: ColumnSchema = TableNewProcessor.createColumnSchema( - field, + val encoders = if (cm.parentTable.isDefined && + cm.dataMapRelation.get.get(field).isDefined) { + cm.parentTable.get.getColumnByName( + cm.parentTable.get.getFactTableName, + cm.dataMapRelation.get.get(field).get. + columnTableRelation.get.parentColumnName).getEncoder + } else { + val encoders = new java.util.ArrayList[Encoding]() + encoders.add(Encoding.DICTIONARY) + encoders + } + val columnSchema = getColumnSchema( + DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")), + field.name.getOrElse(field.column), encoders, - isDimensionCol = true, - field.precision, - field.scale, - field.schemaOrdinal, - cm.highcardinalitydims.getOrElse(Seq()), - cm.databaseName) + true, + field, + cm.dataMapRelation) allColumns :+= columnSchema index = index + 1 if (field.children.isDefined && field.children.get != null) { @@ -429,15 +506,13 @@ class TableNewProcessor(cm: TableModel) { cm.msrCols.foreach { field => val encoders = new java.util.ArrayList[Encoding]() - val columnSchema: ColumnSchema = TableNewProcessor.createColumnSchema( - field, + val columnSchema = getColumnSchema( + DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")), + field.name.getOrElse(field.column), encoders, - isDimensionCol = false, - field.precision, - field.scale, - field.schemaOrdinal, - cm.highcardinalitydims.getOrElse(Seq()), - cm.databaseName) + false, + field, + cm.dataMapRelation) allColumns :+= columnSchema index = index + 1 measureCount += 1 @@ -486,15 +561,13 @@ class TableNewProcessor(cm: TableModel) { Some(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE), None ) - val columnSchema: ColumnSchema = TableNewProcessor.createColumnSchema( - field, + val columnSchema: ColumnSchema = getColumnSchema( + DataTypes.DOUBLE, + CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE, encoders, - isDimensionCol = false, - field.precision, - field.scale, - -1, - cm.highcardinalitydims.getOrElse(Seq()), - cm.databaseName) + false, + field, + cm.dataMapRelation) columnSchema.setInvisible(true) allColumns :+= columnSchema } @@ -503,6 +576,7 @@ class TableNewProcessor(cm: TableModel) { val tableInfo = new TableInfo() val tableSchema = new TableSchema() + val schemaEvol = new SchemaEvolution() schemaEvol.setSchemaEvolutionEntryList(new util.ArrayList[SchemaEvolutionEntry]()) tableSchema.setTableId(UUID.randomUUID().toString) http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa12127d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala index bda6829..222c30d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala @@ -364,7 +364,7 @@ case class LoadTableCommand( entry.setTime_stamp(System.currentTimeMillis()) // write TableInfo - metastore.updateTableSchema(carbonTablePath.getCarbonTableIdentifier, + metastore.updateTableSchemaForAlter(carbonTablePath.getCarbonTableIdentifier, carbonTablePath.getCarbonTableIdentifier, tableInfo, entry, carbonTablePath.getPath)(sparkSession) http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa12127d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala new file mode 100644 index 0000000..ca384f9 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.command.preaaggregate + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command._ + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.exception.InvalidConfigurationException +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.schema.table.{RelationIdentifier, TableInfo} +import org.apache.carbondata.core.util.CarbonUtil + +/** + * Below command class will be used to create pre-aggregate table + * and updating the parent table about the child table information + * Failure case: + * 1. failed to create pre aggregate table. + * 2. failed to update main table + * + * @param cm + * @param dataFrame + * @param createDSTable + * @param queryString + */ +case class CreatePreAggregateTableCommand( + cm: TableModel, + dataFrame: DataFrame, + createDSTable: Boolean = true, + queryString: String, + fieldRelationMap: scala.collection.mutable.LinkedHashMap[Field, DataMapField]) + extends RunnableCommand with SchemaProcessCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + processSchema(sparkSession) + } + + override def processSchema(sparkSession: SparkSession): Seq[Row] = { + val storePath = CarbonEnv.getInstance(sparkSession).storePath + CarbonEnv.getInstance(sparkSession).carbonMetastore. + checkSchemasModifiedTimeAndReloadTables(storePath) + val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + cm.databaseName = GetDB.getDatabaseName(cm.databaseNameOp, sparkSession) + val tbName = cm.tableName + val dbName = cm.databaseName + LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]") + // getting the parent table + val parentTable = PreAggregateUtil.getParentCarbonTable(dataFrame.logicalPlan) + // getting the table name + val parentTableName = parentTable.getFactTableName + // getting the db name of parent table + val parentDbName = parentTable.getDatabaseName + // updating the relation identifier, this will be stored in child table + // which can be used during dropping of pre-aggreate table as parent table will + // also get updated + cm.parentTable = Some(parentTable) + cm.dataMapRelation = Some(fieldRelationMap) + val tableInfo: TableInfo = TableNewProcessor(cm) + // Add validation for sort scope when create table + val sortScope = tableInfo.getFactTable.getTableProperties + .getOrDefault("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT) + if (!CarbonUtil.isValidSortOption(sortScope)) { + throw new InvalidConfigurationException( + s"Passing invalid SORT_SCOPE '$sortScope', valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT'," + + s" 'LOCAL_SORT' and 'GLOBAL_SORT' ") + } + + if (tableInfo.getFactTable.getListOfColumns.size <= 0) { + sys.error("No Dimensions found. Table should have at least one dimesnion !") + } + + if (sparkSession.sessionState.catalog.listTables(dbName) + .exists(_.table.equalsIgnoreCase(tbName))) { + if (!cm.ifNotExistsSet) { + LOGGER.audit( + s"Table creation with Database name [$dbName] and Table name [$tbName] failed. " + + s"Table [$tbName] already exists under database [$dbName]") + sys.error(s"Table [$tbName] already exists under database [$dbName]") + } + } else { + val tableIdentifier = AbsoluteTableIdentifier.from(storePath, dbName, tbName) + // Add Database to catalog and persist + val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore + val tablePath = tableIdentifier.getTablePath + val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tablePath) + if (createDSTable) { + try { + val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size) + cm.dimCols.foreach(f => fields(f.schemaOrdinal) = f) + cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f) + sparkSession.sql( + s"""CREATE TABLE $dbName.$tbName + |(${ fields.map(f => f.rawSchema).mkString(",") }) + |USING org.apache.spark.sql.CarbonSource""".stripMargin + + s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin + + s""""$tablePath"$carbonSchemaString) """) + // child schema object which will be updated on parent table about the + val childSchema = tableInfo.getFactTable + .buildChildSchema("", tableInfo.getDatabaseName, queryString, "AGGREGATION") + // upadting the parent table about child table + PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession) + } catch { + case e: Exception => + val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName)) + // call the drop table to delete the created table. + CarbonEnv.getInstance(sparkSession).carbonMetastore + .dropTable(tablePath, identifier)(sparkSession) + LOGGER.audit(s"Table creation with Database name [$dbName] " + + s"and Table name [$tbName] failed") + throw e + } + } + + LOGGER.audit(s"Table created with Database name [$dbName] and Table name [$tbName]") + } + Seq.empty + } +} + + http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa12127d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala new file mode 100644 index 0000000..c4b6783 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala @@ -0,0 +1,431 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.command.preaaggregate + +import scala.collection.mutable.ListBuffer + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast} +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.command.{ColumnTableRelation, DataMapField, Field} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState} +import org.apache.spark.sql.hive.HiveExternalCatalog.{DATASOURCE_SCHEMA_NUMPARTS, DATASOURCE_SCHEMA_PART_PREFIX} +import org.apache.spark.sql.types.DataType + +import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} +import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.format.TableInfo +import org.apache.carbondata.spark.exception.MalformedCarbonCommandException +import org.apache.carbondata.spark.util.CommonUtil + +/** + * Utility class for keeping all the utility method for pre-aggregate + */ +object PreAggregateUtil { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + def getParentCarbonTable(plan: LogicalPlan): CarbonTable = { + plan match { + case Aggregate(_, aExp, SubqueryAlias(_, l: LogicalRelation, _)) + if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => + l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.metaData.carbonTable + case _ => throw new MalformedCarbonCommandException("table does not exist") + } + } + + /** + * Below method will be used to validate the select plan + * and get the required fields from select plan + * Currently only aggregate query is support any other type of query will + * fail + * @param plan + * @param selectStmt + * @return list of fields + */ + def validateActualSelectPlanAndGetAttrubites(plan: LogicalPlan, + selectStmt: String): scala.collection.mutable.LinkedHashMap[Field, DataMapField] = { + val fieldToDataMapFieldMap = scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField] + plan match { + case Aggregate(_, aExp, SubqueryAlias(_, l: LogicalRelation, _)) + if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => + val carbonTable = l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation + .metaData.carbonTable + val parentTableName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier + .getTableName + val parentDatabaseName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier + .getDatabaseName + val parentTableId = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier + .getTableId + if (!carbonTable.getTableInfo.getParentRelationIdentifiers.isEmpty) { + throw new MalformedCarbonCommandException( + "Pre Aggregation is not supported on Pre-Aggregated Table") + } + aExp.map { + case Alias(attr: AggregateExpression, _) => + if (attr.isDistinct) { + throw new MalformedCarbonCommandException( + "Distinct is not supported On Pre Aggregation") + } + fieldToDataMapFieldMap ++= ((validateAggregateFunctionAndGetFields(carbonTable, + attr.aggregateFunction, + parentTableName, + parentDatabaseName, + parentTableId))) + case attr: AttributeReference => + fieldToDataMapFieldMap += getField(attr.name, + attr.dataType, + parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, + parentTableName = parentTableName, + parentDatabaseName = parentDatabaseName, parentTableId = parentTableId) + case Alias(attr: AttributeReference, _) => + fieldToDataMapFieldMap += getField(attr.name, + attr.dataType, + parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, + parentTableName = parentTableName, + parentDatabaseName = parentDatabaseName, parentTableId = parentTableId) + case _ => + throw new MalformedCarbonCommandException(s"Unsupported Select Statement:${ + selectStmt } ") + } + Some(carbonTable) + case _ => + throw new MalformedCarbonCommandException(s"Unsupported Select Statement:${ selectStmt } ") + } + fieldToDataMapFieldMap + } + + /** + * Below method will be used to validate about the aggregate function + * which is applied on select query. + * Currently sum, max, min, count, avg is supported + * in case of any other aggregate function it will throw error + * In case of avg it will return two fields one for count + * and other of sum of that column to support rollup + * @param carbonTable + * @param aggFunctions + * @param parentTableName + * @param parentDatabaseName + * @param parentTableId + * @return list of fields + */ + def validateAggregateFunctionAndGetFields(carbonTable: CarbonTable, + aggFunctions: AggregateFunction, + parentTableName: String, + parentDatabaseName: String, + parentTableId: String) : scala.collection.mutable.ListBuffer[(Field, DataMapField)] = { + val list = scala.collection.mutable.ListBuffer.empty[(Field, DataMapField)] + aggFunctions match { + case sum@Sum(attr: AttributeReference) => + list += getField(attr.name, + attr.dataType, + sum.prettyName, + carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, + parentTableName, + parentDatabaseName, parentTableId = parentTableId) + case sum@Sum(Cast(attr: AttributeReference, changeDataType: DataType)) => + list += getField(attr.name, + changeDataType, + sum.prettyName, + carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, + parentTableName, + parentDatabaseName, parentTableId = parentTableId) + case count@Count(Seq(attr: AttributeReference)) => + list += getField(attr.name, + attr.dataType, + count.prettyName, + carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, + parentTableName, + parentDatabaseName, parentTableId = parentTableId) + case min@Min(attr: AttributeReference) => + list += getField(attr.name, + attr.dataType, + min.prettyName, + carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, + parentTableName, + parentDatabaseName, parentTableId = parentTableId) + case min@Min(Cast(attr: AttributeReference, changeDataType: DataType)) => + list += getField(attr.name, + changeDataType, + min.prettyName, + carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, + parentTableName, + parentDatabaseName, parentTableId = parentTableId) + case max@Max(attr: AttributeReference) => + list += getField(attr.name, + attr.dataType, + max.prettyName, + carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, + parentTableName, + parentDatabaseName, parentTableId = parentTableId) + case max@Max(Cast(attr: AttributeReference, changeDataType: DataType)) => + list += getField(attr.name, + changeDataType, + max.prettyName, + carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, + parentTableName, + parentDatabaseName, parentTableId = parentTableId) + case Average(attr: AttributeReference) => + getField(attr.name, + attr.dataType, + "sum", + carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, + parentTableName, + parentDatabaseName, parentTableId = parentTableId) + list += getField(attr.name, + attr.dataType, + "count", + carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, + parentTableName, + parentDatabaseName, parentTableId = parentTableId) + case Average(Cast(attr: AttributeReference, changeDataType: DataType)) => + list += getField(attr.name, + changeDataType, + "sum", + carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, + parentTableName, + parentDatabaseName, parentTableId = parentTableId) + list += getField(attr.name, + changeDataType, + "count", + carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, + parentTableName, + parentDatabaseName, parentTableId = parentTableId) + case _ => + throw new MalformedCarbonCommandException("Un-Supported Aggregation Type") + } + } + + /** + * Below method will be used to get the fields object for pre aggregate table + * @param columnName + * @param dataType + * @param aggregateType + * @param parentColumnId + * @param parentTableName + * @param parentDatabaseName + * @param parentTableId + * @return fields object + */ + def getField(columnName: String, + dataType: DataType, + aggregateType: String = "", + parentColumnId: String, + parentTableName: String, + parentDatabaseName: String, + parentTableId: String): (Field, DataMapField) = { + val actualColumnName = if (aggregateType.equals("")) { + parentTableName + '_' + columnName + } else { + parentTableName + '_' + columnName + '_' + aggregateType + } + val rawSchema = '`' + actualColumnName + '`' + ' ' + dataType.typeName + val columnTableRelation = ColumnTableRelation(parentColumnName = columnName, + parentColumnId = parentColumnId, + parentTableName = parentTableName, + parentDatabaseName = parentDatabaseName, parentTableId = parentTableId) + val dataMapField = DataMapField(aggregateType, Some(columnTableRelation)) + if (dataType.typeName.startsWith("decimal")) { + val (precision, scale) = CommonUtil.getScaleAndPrecision(dataType.catalogString) + (Field(column = actualColumnName, + dataType = Some(dataType.typeName), + name = Some(actualColumnName), + children = None, + precision = precision, + scale = scale, + rawSchema = rawSchema), dataMapField) + } + else { + (Field(column = actualColumnName, + dataType = Some(dataType.typeName), + name = Some(actualColumnName), + children = None, + rawSchema = rawSchema), dataMapField) + } + } + + /** + * Below method will be used to update the main table about the pre aggregate table information + * in case of any exption it will throw error so pre aggregate table creation will fail + * @param dbName + * @param tableName + * @param childSchema + * @param sparkSession + */ + def updateMainTable(dbName: String, tableName: String, + childSchema: DataMapSchema, sparkSession: SparkSession): Unit = { + val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + val locksToBeAcquired = List(LockUsage.METADATA_LOCK, + LockUsage.DROP_TABLE_LOCK) + var locks = List.empty[ICarbonLock] + var carbonTable: CarbonTable = null + var numberOfCurrentChild: Int = 0 + try { + val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore + carbonTable = metastore + .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] + .tableMeta.carbonTable + locks = acquireLock(dbName, tableName, locksToBeAcquired, carbonTable) + // get the latest carbon table and check for column existence + // read the latest schema file + val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, + carbonTable.getCarbonTableIdentifier) + val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) + val schemaConverter = new ThriftWrapperSchemaConverterImpl() + val wrapperTableInfo = schemaConverter + .fromExternalToWrapperTableInfo(thriftTableInfo, + dbName, + tableName, + carbonTable.getStorePath) + numberOfCurrentChild = wrapperTableInfo.getDataMapSchemaList.size + wrapperTableInfo.getDataMapSchemaList.add(childSchema) + val thriftTable = schemaConverter + .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName) + updateSchemaInfo(carbonTable, + thriftTable)(sparkSession, + sparkSession.sessionState.asInstanceOf[CarbonSessionState]) + LOGGER.info(s"Pre Aggeragte Parent table updated is successful for table $dbName.$tableName") + } catch { + case e: Exception => + LOGGER.error(e, "Pre Aggregate Parent table update failed reverting changes") + revertMainTableChanges(dbName, tableName, numberOfCurrentChild)(sparkSession) + throw e + } finally { + // release lock after command execution completion + releaseLocks(locks) + } + Seq.empty + } + + /** + * Below method will be used to update the main table schema + * @param carbonTable + * @param thriftTable + * @param sparkSession + * @param sessionState + */ + def updateSchemaInfo(carbonTable: CarbonTable, + thriftTable: TableInfo)(sparkSession: SparkSession, + sessionState: CarbonSessionState): Unit = { + val dbName = carbonTable.getDatabaseName + val tableName = carbonTable.getFactTableName + CarbonEnv.getInstance(sparkSession).carbonMetastore + .updateTableSchemaForDataMap(carbonTable.getCarbonTableIdentifier, + carbonTable.getCarbonTableIdentifier, + thriftTable, + carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession) + val tableIdentifier = TableIdentifier(tableName, Some(dbName)) + sparkSession.catalog.refreshTable(tableIdentifier.quotedString) + } + + /** + * This method will split schema string into multiple parts of configured size and + * registers the parts as keys in tableProperties which will be read by spark to prepare + * Carbon Table fields + * + * @param sparkConf + * @param schemaJsonString + * @return + */ + private def prepareSchemaJson(sparkConf: SparkConf, + schemaJsonString: String): String = { + val threshold = sparkConf + .getInt(CarbonCommonConstants.SPARK_SCHEMA_STRING_LENGTH_THRESHOLD, + CarbonCommonConstants.SPARK_SCHEMA_STRING_LENGTH_THRESHOLD_DEFAULT) + // Split the JSON string. + val parts = schemaJsonString.grouped(threshold).toSeq + var schemaParts: Seq[String] = Seq.empty + schemaParts = schemaParts :+ s"'$DATASOURCE_SCHEMA_NUMPARTS'='${ parts.size }'" + parts.zipWithIndex.foreach { case (part, index) => + schemaParts = schemaParts :+ s"'$DATASOURCE_SCHEMA_PART_PREFIX$index'='$part'" + } + schemaParts.mkString(",") + } + + /** + * Validates that the table exists and acquires meta lock on it. + * + * @param dbName + * @param tableName + * @return + */ + def acquireLock(dbName: String, + tableName: String, + locksToBeAcquired: List[String], + table: CarbonTable): List[ICarbonLock] = { + // acquire the lock first + val acquiredLocks = ListBuffer[ICarbonLock]() + try { + locksToBeAcquired.foreach { lock => + acquiredLocks += CarbonLockUtil.getLockObject(table.getCarbonTableIdentifier, lock) + } + acquiredLocks.toList + } catch { + case e: Exception => + releaseLocks(acquiredLocks.toList) + throw e + } + } + + /** + * This method will release the locks acquired for an operation + * + * @param locks + */ + def releaseLocks(locks: List[ICarbonLock]): Unit = { + locks.foreach { carbonLock => + if (carbonLock.unlock()) { + LOGGER.info("Pre agg table lock released successfully") + } else { + LOGGER.error("Unable to release lock during Pre agg table cretion") + } + } + } + + /** + * This method reverts the changes to the schema if add column command fails. + * + * @param dbName + * @param tableName + * @param numberOfChildSchema + * @param sparkSession + */ + def revertMainTableChanges(dbName: String, tableName: String, numberOfChildSchema: Int) + (sparkSession: SparkSession): Unit = { + val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore + val carbonTable = metastore + .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta + .carbonTable + carbonTable.getTableLastUpdatedTime + val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, + carbonTable.getCarbonTableIdentifier) + val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) + if (thriftTable.dataMapSchemas.size > numberOfChildSchema) { + metastore + .revertTableSchemaForPreAggCreationFailure(carbonTable.getCarbonTableIdentifier, + thriftTable, carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession) + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa12127d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala index c000488..47c570b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala @@ -106,7 +106,7 @@ private[sql] case class CarbonAlterTableRenameCommand( } val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName, newTableName, carbonTable.getCarbonTableIdentifier.getTableId) - val newTablePath = metastore.updateTableSchema(newTableIdentifier, + val newTablePath = metastore.updateTableSchemaForAlter(newTableIdentifier, carbonTable.getCarbonTableIdentifier, tableInfo, schemaEvolutionEntry, http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa12127d/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala index 9822d8f..51c7f3b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala @@ -43,6 +43,7 @@ import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.core.writer.ThriftWriter import org.apache.carbondata.events.{LookupRelationPostEvent, OperationContext, OperationListenerBus} +import org.apache.carbondata.format import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo} import org.apache.carbondata.processing.merger.TableMeta import org.apache.carbondata.spark.util.CarbonSparkUtil @@ -111,6 +112,22 @@ class CarbonFileMetastore extends CarbonMetaStore { } } + /** + * This method will overwrite the existing schema and update it with the given details + * + * @param newTableIdentifier + * @param thriftTableInfo + * @param carbonStorePath + * @param sparkSession + */ + def updateTableSchemaForDataMap(newTableIdentifier: CarbonTableIdentifier, + oldTableIdentifier: CarbonTableIdentifier, + thriftTableInfo: org.apache.carbondata.format.TableInfo, + carbonStorePath: String)(sparkSession: SparkSession): String = { + updateTableSchemaForAlter(newTableIdentifier, + oldTableIdentifier, thriftTableInfo, null, carbonStorePath) (sparkSession) + } + def lookupRelation(dbName: Option[String], tableName: String) (sparkSession: SparkSession): LogicalPlan = { lookupRelation(TableIdentifier(tableName, dbName))(sparkSession) @@ -214,7 +231,7 @@ class CarbonFileMetastore extends CarbonMetaStore { * @param tablePath * @param sparkSession */ - def updateTableSchema(newTableIdentifier: CarbonTableIdentifier, + def updateTableSchemaForAlter(newTableIdentifier: CarbonTableIdentifier, oldTableIdentifier: CarbonTableIdentifier, thriftTableInfo: org.apache.carbondata.format.TableInfo, schemaEvolutionEntry: SchemaEvolutionEntry, @@ -251,7 +268,7 @@ class CarbonFileMetastore extends CarbonMetaStore { * @param tablePath * @param sparkSession */ - def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier, + def revertTableSchemaInAlterFailure(carbonTableIdentifier: CarbonTableIdentifier, thriftTableInfo: org.apache.carbondata.format.TableInfo, tablePath: String)(sparkSession: SparkSession): String = { val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath) @@ -271,7 +288,27 @@ class CarbonFileMetastore extends CarbonMetaStore { path } + override def revertTableSchemaForPreAggCreationFailure(carbonTableIdentifier: + CarbonTableIdentifier, + thriftTableInfo: org.apache.carbondata.format.TableInfo, + tablePath: String)(sparkSession: SparkSession): String = { + val tableIdentifier = AbsoluteTableIdentifier.fromTablePath(tablePath) + val schemaConverter = new ThriftWrapperSchemaConverterImpl + val wrapperTableInfo = schemaConverter + .fromExternalToWrapperTableInfo(thriftTableInfo, + carbonTableIdentifier.getDatabaseName, + carbonTableIdentifier.getTableName, + tableIdentifier.getStorePath) + val childSchemaList = wrapperTableInfo.getDataMapSchemaList + childSchemaList.remove(childSchemaList.size() - 1) + wrapperTableInfo.setStorePath(tableIdentifier.getStorePath) + val path = createSchemaThriftFile(wrapperTableInfo, + thriftTableInfo, + tableIdentifier.getCarbonTableIdentifier) + addTableCache(wrapperTableInfo, tableIdentifier) + path + } /** * http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa12127d/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala index 76241a6..c64b7bb 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala @@ -115,7 +115,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { * @param schemaEvolutionEntry * @param sparkSession */ - override def updateTableSchema(newTableIdentifier: CarbonTableIdentifier, + override def updateTableSchemaForAlter(newTableIdentifier: CarbonTableIdentifier, oldTableIdentifier: CarbonTableIdentifier, thriftTableInfo: format.TableInfo, schemaEvolutionEntry: SchemaEvolutionEntry, @@ -126,7 +126,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { if (schemaEvolutionEntry != null) { thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry) } - updateHiveMetaStore(newTableIdentifier, + updateHiveMetaStoreForAlter(newTableIdentifier, oldTableIdentifier, thriftTableInfo, identifier.getStorePath, @@ -134,7 +134,29 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { schemaConverter) } - private def updateHiveMetaStore(newTableIdentifier: CarbonTableIdentifier, + /** + * This method will overwrite the existing schema and update it with the given details + * + * @param newTableIdentifier + * @param thriftTableInfo + * @param carbonStorePath + * @param sparkSession + */ + override def updateTableSchemaForDataMap(newTableIdentifier: CarbonTableIdentifier, + oldTableIdentifier: CarbonTableIdentifier, + thriftTableInfo: org.apache.carbondata.format.TableInfo, + carbonStorePath: String)(sparkSession: SparkSession): String = { + val schemaConverter = new ThriftWrapperSchemaConverterImpl + val identifier = AbsoluteTableIdentifier.fromTablePath(carbonStorePath) + updateHiveMetaStoreForDataMap(newTableIdentifier, + oldTableIdentifier, + thriftTableInfo, + identifier.getStorePath, + sparkSession, + schemaConverter) + } + + private def updateHiveMetaStoreForAlter(newTableIdentifier: CarbonTableIdentifier, oldTableIdentifier: CarbonTableIdentifier, thriftTableInfo: format.TableInfo, carbonStorePath: String, @@ -161,6 +183,30 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { CarbonStorePath.getCarbonTablePath(carbonStorePath, newTableIdentifier).getPath } + private def updateHiveMetaStoreForDataMap(newTableIdentifier: CarbonTableIdentifier, + oldTableIdentifier: CarbonTableIdentifier, + thriftTableInfo: format.TableInfo, + carbonStorePath: String, + sparkSession: SparkSession, + schemaConverter: ThriftWrapperSchemaConverterImpl) = { + val wrapperTableInfo = schemaConverter + .fromExternalToWrapperTableInfo(thriftTableInfo, + newTableIdentifier.getDatabaseName, + newTableIdentifier.getTableName, + carbonStorePath) + wrapperTableInfo.setStorePath(carbonStorePath) + val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonStorePath, newTableIdentifier) + val schemaMetadataPath = + CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath) + wrapperTableInfo.setMetaDataFilepath(schemaMetadataPath) + val dbName = oldTableIdentifier.getDatabaseName + val tableName = oldTableIdentifier.getTableName + sparkSession.catalog.refreshTable(TableIdentifier(tableName, Some(dbName)).quotedString) + removeTableFromMetadata(dbName, tableName) + CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo) + CarbonStorePath.getCarbonTablePath(carbonStorePath, newTableIdentifier).getPath + } + /** * This method will is used to remove the evolution entry in case of failure. * @@ -168,7 +214,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { * @param thriftTableInfo * @param sparkSession */ - override def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier, + override def revertTableSchemaInAlterFailure(carbonTableIdentifier: CarbonTableIdentifier, thriftTableInfo: format.TableInfo, tablePath: String) (sparkSession: SparkSession): String = { @@ -176,7 +222,23 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath) val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history evolutionEntries.remove(evolutionEntries.size() - 1) - updateHiveMetaStore(carbonTableIdentifier, + updateHiveMetaStoreForAlter(carbonTableIdentifier, + carbonTableIdentifier, + thriftTableInfo, + identifier.getStorePath, + sparkSession, + schemaConverter) + } + + override def revertTableSchemaForPreAggCreationFailure(carbonTableIdentifier: + CarbonTableIdentifier, + thriftTableInfo: org.apache.carbondata.format.TableInfo, + tablePath: String)(sparkSession: SparkSession): String = { + val schemaConverter = new ThriftWrapperSchemaConverterImpl + val identifier = AbsoluteTableIdentifier.fromTablePath(tablePath) + val childSchemas = thriftTableInfo.dataMapSchemas + childSchemas.remove(childSchemas.size()) + updateHiveMetaStoreForAlter(carbonTableIdentifier, carbonTableIdentifier, thriftTableInfo, identifier.getStorePath, http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa12127d/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala index dcb43d1..24996ed 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala @@ -66,13 +66,26 @@ trait CarbonMetaStore { * @param carbonStorePath * @param sparkSession */ - def updateTableSchema(newTableIdentifier: CarbonTableIdentifier, + def updateTableSchemaForAlter(newTableIdentifier: CarbonTableIdentifier, oldTableIdentifier: CarbonTableIdentifier, thriftTableInfo: org.apache.carbondata.format.TableInfo, schemaEvolutionEntry: SchemaEvolutionEntry, carbonStorePath: String)(sparkSession: SparkSession): String /** + * This method will overwrite the existing schema and update it with the given details + * + * @param newTableIdentifier + * @param thriftTableInfo + * @param carbonStorePath + * @param sparkSession + */ + def updateTableSchemaForDataMap(newTableIdentifier: CarbonTableIdentifier, + oldTableIdentifier: CarbonTableIdentifier, + thriftTableInfo: org.apache.carbondata.format.TableInfo, + carbonStorePath: String)(sparkSession: SparkSession): String + + /** * This method will is used to remove the evolution entry in case of failure. * * @param carbonTableIdentifier @@ -80,11 +93,15 @@ trait CarbonMetaStore { * @param tablePath * @param sparkSession */ - def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier, + def revertTableSchemaInAlterFailure(carbonTableIdentifier: CarbonTableIdentifier, thriftTableInfo: org.apache.carbondata.format.TableInfo, tablePath: String) (sparkSession: SparkSession): String + + def revertTableSchemaForPreAggCreationFailure(carbonTableIdentifier: CarbonTableIdentifier, + thriftTableInfo: org.apache.carbondata.format.TableInfo, + tablePath: String)(sparkSession: SparkSession): String /** * Prepare Thrift Schema from wrapper TableInfo and write to disk */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa12127d/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index fc2ed41..bf21bc8 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -440,7 +440,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { // so checking the start of the string and taking the precision and scale. // resetting the data type with decimal if (f.dataType.getOrElse("").startsWith("decimal")) { - val (precision, scale) = getScaleAndPrecision(col.dataType.catalogString) + val (precision, scale) = CommonUtil.getScaleAndPrecision(col.dataType.catalogString) f.precision = precision f.scale = scale f.dataType = Some("decimal") http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa12127d/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala index 0a918df..24a6418 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala @@ -18,14 +18,15 @@ package org.apache.spark.sql.parser import scala.collection.mutable -import org.apache.spark.sql.{CarbonEnv, SparkSession} +import org.apache.spark.sql.{CarbonEnv, DataFrame, Dataset, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, ParseException, SqlBaseParser} import org.apache.spark.sql.catalyst.parser.ParserUtils._ import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateTableContext, TablePropertyListContext} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkSqlAstBuilder -import org.apache.spark.sql.execution.command.{BucketFields, CarbonCreateTableCommand, PartitionerField, TableModel} +import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.execution.command.preaaggregate.{CreatePreAggregateTableCommand, PreAggregateUtil} import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution} import org.apache.spark.sql.types.StructField @@ -40,7 +41,7 @@ import org.apache.carbondata.spark.util.CommonUtil */ class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends AbstractSqlParser { - val astBuilder = new CarbonSqlAstBuilder(conf) + val astBuilder = new CarbonSqlAstBuilder(conf, sparkSession: SparkSession) private val substitutor = new VariableSubstitution(conf) @@ -71,7 +72,8 @@ class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends Ab } } -class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) { +class CarbonSqlAstBuilder(conf: SQLConf, sparkSession: SparkSession) + extends SparkSqlAstBuilder(conf) { val parser = new CarbonSpark2SqlParser @@ -119,8 +121,18 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) { val (partitionByStructFields, partitionFields) = validateParitionFields(ctx, colNames, tableProperties) - val fields = parser.getFields(colsStructFields ++ partitionByStructFields) - + val isAggTable = tableProperties.get("parent").isDefined + var fields = parser.getFields(colsStructFields ++ partitionByStructFields) + val dfAndFieldRelationTuple = if (isAggTable) { + val selectQuery = Option(ctx.query).map(plan).get + val df = Dataset.ofRows(sparkSession, selectQuery) + val fieldRelationMap = PreAggregateUtil + .validateActualSelectPlanAndGetAttrubites(df.logicalPlan, source(ctx.query())) + fields = fieldRelationMap.keySet.toSeq + Some(df, fieldRelationMap) + } else { + None + } // validate bucket fields val bucketFields: Option[BucketFields] = parser.getBucketFields(tableProperties, fields, options) @@ -139,7 +151,14 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) { isAlterFlow = false, tableComment) - CarbonCreateTableCommand(tableModel) + if(!isAggTable) { + CarbonCreateTableCommand(tableModel) + } else { + CreatePreAggregateTableCommand(tableModel, + dfAndFieldRelationTuple.get._1, + queryString = source(ctx.query).toString, + fieldRelationMap = dfAndFieldRelationTuple.get._2) + } } else { super.visitCreateTable(ctx) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa12127d/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala index 44f5a36..bda4eeb 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala @@ -136,7 +136,7 @@ object AlterTableUtil { val dbName = carbonTable.getDatabaseName val tableName = carbonTable.getFactTableName CarbonEnv.getInstance(sparkSession).carbonMetastore - .updateTableSchema(carbonTable.getCarbonTableIdentifier, + .updateTableSchemaForAlter(carbonTable.getCarbonTableIdentifier, carbonTable.getCarbonTableIdentifier, thriftTable, schemaEvolutionEntry, @@ -211,7 +211,7 @@ object AlterTableUtil { .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR + oldTableIdentifier.table) val tableIdentifier = new CarbonTableIdentifier(database, oldTableIdentifier.table, tableId) - metastore.revertTableSchema(tableIdentifier, + metastore.revertTableSchemaInAlterFailure(tableIdentifier, tableInfo, carbonTablePath.getPath)(sparkSession) metastore.removeTableFromMetadata(database, newTableName) } @@ -243,7 +243,7 @@ object AlterTableUtil { val addedSchemas = evolutionEntryList.get(evolutionEntryList.size() - 1).added thriftTable.fact_table.table_columns.removeAll(addedSchemas) metastore - .revertTableSchema(carbonTable.getCarbonTableIdentifier, + .revertTableSchemaInAlterFailure(carbonTable.getCarbonTableIdentifier, thriftTable, carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession) } } @@ -278,7 +278,7 @@ object AlterTableUtil { } } metastore - .revertTableSchema(carbonTable.getCarbonTableIdentifier, + .revertTableSchemaInAlterFailure(carbonTable.getCarbonTableIdentifier, thriftTable, carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession) } } @@ -316,7 +316,7 @@ object AlterTableUtil { } } metastore - .revertTableSchema(carbonTable.getCarbonTableIdentifier, + .revertTableSchemaInAlterFailure(carbonTable.getCarbonTableIdentifier, thriftTable, carbonTable.getAbsoluteTableIdentifier.getTablePath)(sparkSession) } }