This is an automated email from the ASF dual-hosted git repository. ravipesala pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 2a28dba [CARBONDATA-3357] Support TableProperties from single parent table and restrict alter/delete/partition on mv 2a28dba is described below commit 2a28dba04236ce976984d9cbc398eb8fa517d6f5 Author: Indhumathi27 <indhumathi...@gmail.com> AuthorDate: Wed Apr 24 01:04:21 2019 +0530 [CARBONDATA-3357] Support TableProperties from single parent table and restrict alter/delete/partition on mv Inherit Table Properties from main table to mv datamap table, if datamap has single parent table, else use default table properties. Restrict Alter/Delete/Partition operations on MV This closes #3184 --- .../core/datamap/DataMapStoreManager.java | 27 +- .../carbondata/core/datamap/DataMapUtil.java | 1 + .../core/metadata/schema/table/CarbonTable.java | 17 -- .../core/metadata/schema/table/DataMapSchema.java | 14 + .../carbondata/mv/datamap/MVDataMapProvider.scala | 19 +- .../apache/carbondata/mv/datamap/MVHelper.scala | 110 ++++++-- .../org/apache/carbondata/mv/datamap/MVUtil.scala | 287 +++++++++++++++++++++ .../mv/rewrite/MVCountAndCaseTestCase.scala | 2 - .../carbondata/mv/rewrite/MVCreateTestCase.scala | 29 +-- .../mv/rewrite/MVIncrementalLoadingTestcase.scala | 1 - .../mv/rewrite/MVMultiJoinTestCase.scala | 8 +- .../carbondata/mv/rewrite/MVTpchTestCase.scala | 10 +- .../mv/rewrite/TestAllOperationsOnMV.scala | 255 ++++++++++++++++++ .../mv/rewrite/matching/TestSQLBatch.scala | 4 +- .../preaggregate/TestPreAggregateLoad.scala | 2 +- .../TestTimeSeriesUnsupportedSuite.scala | 8 +- .../scala/org/apache/spark/sql/CarbonEnv.scala | 9 +- .../command/datamap/CarbonDropDataMapCommand.scala | 9 + .../management/CarbonCleanFilesCommand.scala | 3 +- .../execution/command/mv/DataMapListeners.scala | 146 ++++++++++- .../CarbonAlterTableDropHivePartitionCommand.scala | 7 +- .../preaaggregate/PreAggregateListeners.scala | 6 +- .../preaaggregate/PreAggregateTableHelper.scala | 102 +------- .../schema/CarbonAlterTableRenameCommand.scala | 7 +- .../spark/sql/execution/strategy/DDLStrategy.scala | 4 +- .../spark/sql/hive/CarbonAnalysisRules.scala | 10 +- .../scala/org/apache/spark/util/DataMapUtil.scala | 160 ++++++++++++ 27 files changed, 1054 insertions(+), 203 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java index 81b1fb2..89402c2 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java @@ -281,19 +281,22 @@ public final class DataMapStoreManager { dataMapCatalogs = new ConcurrentHashMap<>(); List<DataMapSchema> dataMapSchemas = getAllDataMapSchemas(); for (DataMapSchema schema : dataMapSchemas) { - DataMapCatalog dataMapCatalog = dataMapCatalogs.get(schema.getProviderName()); - if (dataMapCatalog == null) { - dataMapCatalog = dataMapProvider.createDataMapCatalog(); - if (null == dataMapCatalog) { - throw new RuntimeException("Internal Error."); + if (schema.getProviderName() + .equalsIgnoreCase(dataMapProvider.getDataMapSchema().getProviderName())) { + DataMapCatalog dataMapCatalog = dataMapCatalogs.get(schema.getProviderName()); + if (dataMapCatalog == null) { + dataMapCatalog = dataMapProvider.createDataMapCatalog(); + if (null == dataMapCatalog) { + throw new RuntimeException("Internal Error."); + } + dataMapCatalogs.put(schema.getProviderName(), dataMapCatalog); + } + try { + dataMapCatalog.registerSchema(schema); + } catch (Exception e) { + // Ignore the schema + LOGGER.error("Error while registering schema", e); } - dataMapCatalogs.put(schema.getProviderName(), dataMapCatalog); - } - try { - dataMapCatalog.registerSchema(schema); - } catch (Exception e) { - // Ignore the schema - LOGGER.error("Error while registering schema", e); } } } diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java index 0a604fb..e20f19a 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java @@ -270,4 +270,5 @@ public class DataMapUtil { } return segmentList; } + } diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java index afb5fd3..4f4475d 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java @@ -63,7 +63,6 @@ import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; -import static org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.MV; import static org.apache.carbondata.core.util.CarbonUtil.thriftColumnSchemaToWrapperColumnSchema; import com.google.common.collect.Lists; @@ -1321,22 +1320,6 @@ public class CarbonTable implements Serializable, Writable { } /** - * Return true if MV datamap present in the specified table - * @param carbonTable - * @return timeseries data map present - */ - public static boolean hasMVDataMap(CarbonTable carbonTable) throws IOException { - List<DataMapSchema> dataMapSchemaList = DataMapStoreManager.getInstance() - .getDataMapSchemasOfTable(carbonTable); - for (DataMapSchema dataMapSchema : dataMapSchemaList) { - if (dataMapSchema.getProviderName().equalsIgnoreCase(MV.toString())) { - return true; - } - } - return false; - } - - /** * Return all inverted index columns in this table */ public List<ColumnSchema> getInvertedIndexColumns() { diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java index a48b03c..b927ce0 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException; import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider; @@ -80,6 +81,11 @@ public class DataMapSchema implements Serializable, Writable { */ protected TableSchema childSchema; + /** + * main table column list mapped to datamap table + */ + private Map<String, Set<String>> mainTableColumnList; + public DataMapSchema(String dataMapName, String providerName) { this.dataMapName = dataMapName; this.providerName = providerName; @@ -250,4 +256,12 @@ public class DataMapSchema implements Serializable, Writable { @Override public int hashCode() { return Objects.hash(dataMapName); } + + public Map<String, Set<String>> getMainTableColumnList() { + return mainTableColumnList; + } + + public void setMainTableColumnList(Map<String, Set<String>> mainTableColumnList) { + this.mainTableColumnList = mainTableColumnList; + } } diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala index 26c4cb6..90b7dbc 100644 --- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala @@ -58,7 +58,21 @@ class MVDataMapProvider( "select statement is mandatory") } MVHelper.createMVDataMap(sparkSession, dataMapSchema, ctasSqlStatement, true) - DataMapStoreManager.getInstance.registerDataMapCatalog(this, dataMapSchema) + try { + DataMapStoreManager.getInstance.registerDataMapCatalog(this, dataMapSchema) + if (dataMapSchema.isLazy) { + DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName) + } + } catch { + case exception: Exception => + dropTableCommand = new CarbonDropTableCommand(true, + new Some[String](dataMapSchema.getRelationIdentifier.getDatabaseName), + dataMapSchema.getRelationIdentifier.getTableName, + true) + dropTableCommand.run(sparkSession) + DataMapStoreManager.getInstance().dropDataMapSchema(dataMapSchema.getDataMapName) + throw exception + } } override def initData(): Unit = { @@ -141,7 +155,8 @@ class MVDataMapProvider( dataFrame = Some(queryPlan), updateModel = None, tableInfoOp = None, - internalOptions = Map("mergedSegmentName" -> newLoadName), + internalOptions = Map("mergedSegmentName" -> newLoadName, + CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true"), partition = Map.empty) try { diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala index 810449c..4bcaa1d 100644 --- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala @@ -29,17 +29,17 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeRef import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Join, LogicalPlan, Project} import org.apache.spark.sql.execution.command.{Field, TableModel, TableNewProcessor} -import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand +import org.apache.spark.sql.execution.command.table.{CarbonCreateTableCommand, CarbonDropTableCommand} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.parser.CarbonSpark2SqlParser +import org.apache.spark.util.DataMapUtil import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.DataMapStoreManager -import org.apache.carbondata.core.datamap.status.DataMapStatusManager import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider -import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, RelationIdentifier} +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema, RelationIdentifier} import org.apache.carbondata.datamap.DataMapManager import org.apache.carbondata.mv.plans.modular.{GroupBy, Matchable, ModularPlan, Select} import org.apache.carbondata.mv.rewrite.{MVPlanWrapper, QueryRewrite, SummaryDatasetCatalog} @@ -58,7 +58,12 @@ object MVHelper { val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(queryString) val query = sparkSession.sql(updatedQuery) val logicalPlan = MVHelper.dropDummFuc(query.queryExecution.analyzed) - validateMVQuery(sparkSession, logicalPlan) + val selectTables = getTables(logicalPlan) + if (selectTables.isEmpty) { + throw new MalformedCarbonCommandException( + s"Non-Carbon table does not support creating MV datamap") + } + val updatedQueryWithDb = validateMVQuery(sparkSession, logicalPlan) val fullRebuild = isFullReload(logicalPlan) val fields = logicalPlan.output.map { attr => val name = updateColumnName(attr) @@ -81,29 +86,41 @@ object MVHelper { } } val tableProperties = mutable.Map[String, String]() - dmProperties.foreach(t => tableProperties.put(t._1, t._2)) - - val selectTables = getTables(logicalPlan) val parentTables = new util.ArrayList[String]() + val parentTablesList = new util.ArrayList[CarbonTable](selectTables.size) selectTables.foreach { selectTable => val mainCarbonTable = try { Some(CarbonEnv.getCarbonTable(selectTable.identifier.database, selectTable.identifier.table)(sparkSession)) } catch { // Exception handling if it's not a CarbonTable - case ex : Exception => None + case ex: Exception => + throw new MalformedCarbonCommandException( + s"Non-Carbon table does not support creating MV datamap") } parentTables.add(mainCarbonTable.get.getTableName) if (!mainCarbonTable.isEmpty && mainCarbonTable.get.isStreamingSink) { throw new MalformedCarbonCommandException( s"Streaming table does not support creating MV datamap") } + parentTablesList.add(mainCarbonTable.get) } tableProperties.put(CarbonCommonConstants.DATAMAP_NAME, dataMapSchema.getDataMapName) tableProperties.put(CarbonCommonConstants.PARENT_TABLES, parentTables.asScala.mkString(",")) - // TODO inherit the table properties like sort order, sort scope and block size from parent - // tables to mv datamap table + val fieldRelationMap = MVUtil.getFieldsAndDataMapFieldsFromPlan( + logicalPlan, queryString, sparkSession) + // If dataMap is mapped to single main table, then inherit table properties from main table, + // else, will use default table properties. If DMProperties contains table properties, then + // table properties of datamap table will be updated + if (parentTablesList.size() == 1) { + DataMapUtil + .inheritTablePropertiesFromMainTable(parentTablesList.get(0), + fields, + fieldRelationMap, + tableProperties) + } + dmProperties.foreach(t => tableProperties.put(t._1, t._2)) // TODO Use a proper DB val tableIdentifier = TableIdentifier(dataMapSchema.getDataMapName + "_table", @@ -129,7 +146,28 @@ object MVHelper { CarbonCreateTableCommand(TableNewProcessor(tableModel), tableModel.ifNotExistsSet, Some(tablePath), isVisible = false).run(sparkSession) - dataMapSchema.setCtasQuery(queryString) + // Map list of main table columns mapped to datamap table and add to dataMapSchema + val mainTableToColumnsMap = new java.util.HashMap[String, util.Set[String]]() + val mainTableFieldIterator = fieldRelationMap.values.asJava.iterator() + while (mainTableFieldIterator.hasNext) { + val value = mainTableFieldIterator.next() + value.columnTableRelationList.foreach { + columnTableRelation => + columnTableRelation.foreach { + mainTable => + if (null == mainTableToColumnsMap.get(mainTable.parentTableName)) { + val columns = new util.HashSet[String]() + columns.add(mainTable.parentColumnName) + mainTableToColumnsMap.put(mainTable.parentTableName, columns) + } else { + mainTableToColumnsMap.get(mainTable.parentTableName) + .add(mainTable.parentColumnName) + } + } + } + } + dataMapSchema.setMainTableColumnList(mainTableToColumnsMap) + dataMapSchema.setCtasQuery(updatedQueryWithDb) dataMapSchema .setRelationIdentifier(new RelationIdentifier(tableIdentifier.database.get, tableIdentifier.table, @@ -143,14 +181,44 @@ object MVHelper { dataMapSchema.getRelationIdentifier.setTablePath(tablePath) dataMapSchema.setParentTables(new util.ArrayList[RelationIdentifier](parentIdents.asJava)) dataMapSchema.getProperties.put("full_refresh", fullRebuild.toString) - DataMapStoreManager.getInstance().saveDataMapSchema(dataMapSchema) - if (dataMapSchema.isLazy) { - DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName) + try { + DataMapStoreManager.getInstance().saveDataMapSchema(dataMapSchema) + } catch { + case ex: Exception => + val dropTableCommand = CarbonDropTableCommand(true, + new Some[String](dataMapSchema.getRelationIdentifier.getDatabaseName), + dataMapSchema.getRelationIdentifier.getTableName, + true) + dropTableCommand.run(sparkSession) + throw ex + } + } + + private def isValidSelect(isValidExp: Boolean, + s: Select): Boolean = { + // Make sure all predicates are present in projections. + var predicateList: Seq[AttributeReference] = Seq.empty + s.predicateList.map { f => + f.children.collect { + case p: AttributeReference => + predicateList = predicateList.+:(p) + } + } + if (predicateList.nonEmpty) { + predicateList.forall { p => + s.outputList.exists { + case a: Alias => + a.semanticEquals(p) || a.child.semanticEquals(p) + case other => other.semanticEquals(p) + } + } + } else { + isValidExp } } private def validateMVQuery(sparkSession: SparkSession, - logicalPlan: LogicalPlan): Unit = { + logicalPlan: LogicalPlan): String = { val dataMapProvider = DataMapManager.get().getDataMapProvider(null, new DataMapSchema("", DataMapClassProvider.MV.getShortName), sparkSession) var catalog = DataMapStoreManager.getInstance().getDataMapCatalog(dataMapProvider, @@ -169,17 +237,24 @@ object MVHelper { val isValid = modularPlan match { case g: GroupBy => // Make sure all predicates are present in projections. - g.predicateList.forall{p => + val isValidExp = g.predicateList.forall{p => g.outputList.exists{ case a: Alias => a.semanticEquals(p) || a.child.semanticEquals(p) case other => other.semanticEquals(p) } } + g.child match { + case s: Select => + isValidSelect(isValidExp, s) + } + case s: Select => + isValidSelect(true, s) case _ => true } if (!isValid) { - throw new UnsupportedOperationException("Group by columns must be present in project columns") + throw new UnsupportedOperationException( + "Group by/Filter columns must be present in project columns") } if (catalog.isMVWithSameQueryPresent(logicalPlan)) { throw new UnsupportedOperationException("MV with same query present") @@ -196,6 +271,7 @@ object MVHelper { if (!expressionValid) { throw new UnsupportedOperationException("MV doesn't support Coalesce") } + modularPlan.asCompactSQL } def updateColumnName(attr: Attribute): String = { diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala new file mode 100644 index 0000000..6852695 --- /dev/null +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.datamap + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, SparkSession} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Count} +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.command.{ColumnTableRelation, DataMapField, Field} +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types.DataType + +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.spark.util.CommonUtil + +/** + * Utility class for keeping all the utility method for mv datamap + */ +object MVUtil { + + /** + * Below method will be used to validate and get the required fields from select plan + */ + def getFieldsAndDataMapFieldsFromPlan(plan: LogicalPlan, + selectStmt: String, + sparkSession: SparkSession): scala.collection.mutable.LinkedHashMap[Field, DataMapField] = { + plan match { + case Project(projectList, child: Sort) => + getFieldsFromProject(projectList, plan, child) + case Project(projectList, _) => + getFieldsFromProject(projectList, plan) + case Aggregate(groupByExp, aggExp, _) => + getFieldsFromAggregate(groupByExp, aggExp, plan) + } + } + + def getFieldsFromProject(projectList: Seq[NamedExpression], + plan: LogicalPlan, sort: LogicalPlan): mutable.LinkedHashMap[Field, DataMapField] = { + var fieldToDataMapFieldMap = scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField] + sort.transformDown { + case agg@Aggregate(groupByExp, aggExp, _) => + fieldToDataMapFieldMap ++== getFieldsFromAggregate(groupByExp, aggExp, plan) + agg + } + fieldToDataMapFieldMap ++== getFieldsFromProject(projectList, plan) + fieldToDataMapFieldMap + } + + def getFieldsFromProject(projectList: Seq[NamedExpression], + plan: LogicalPlan): mutable.LinkedHashMap[Field, DataMapField] = { + var fieldToDataMapFieldMap = scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField] + val logicalRelation = + plan.collect { + case lr: LogicalRelation => + lr + } + projectList.map { + case attr: AttributeReference => + val carbonTable = getCarbonTable(logicalRelation, attr) + if (null != carbonTable) { + val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new ArrayBuffer[ColumnTableRelation]() + val relation = getColumnRelation(attr.name, + carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId, + carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName, + carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName, + carbonTable) + if (null != relation) { + arrayBuffer += relation + } + fieldToDataMapFieldMap += + getFieldToDataMapFields(attr.name, + attr.dataType, + attr.qualifier, + "", + arrayBuffer, + carbonTable.getTableName) + } + case Alias(attr: AttributeReference, name) => + val carbonTable = getCarbonTable(logicalRelation, attr) + if (null != carbonTable) { + val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new ArrayBuffer[ColumnTableRelation]() + val relation = getColumnRelation(attr.name, + carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId, + carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName, + carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName, + carbonTable) + if (null != relation) { + arrayBuffer += relation + } + fieldToDataMapFieldMap += + getFieldToDataMapFields(name, attr.dataType, attr.qualifier, "", arrayBuffer, "") + } + } + fieldToDataMapFieldMap + } + + def getFieldsFromAggregate(groupByExp: Seq[Expression], + aggExp: Seq[NamedExpression], + plan: LogicalPlan): mutable.LinkedHashMap[Field, DataMapField] = { + var fieldToDataMapFieldMap = scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField] + val logicalRelation = + plan.collect { + case lr: LogicalRelation => + lr + } + aggExp.map { agg => + var aggregateType: String = "" + val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new ArrayBuffer[ColumnTableRelation]() + agg.collect { + case Alias(attr: AggregateExpression, name) => + if (attr.aggregateFunction.isInstanceOf[Count]) { + fieldToDataMapFieldMap += + getFieldToDataMapFields(name, + attr.aggregateFunction.dataType, + None, + attr.aggregateFunction.nodeName, + arrayBuffer, + "") + } else { + aggregateType = attr.aggregateFunction.nodeName + } + case Alias(_, name) => + // In case of arithmetic expressions like sum(a)+sum(b) + aggregateType = "arithmetic" + } + agg.collect { + case attr: AttributeReference => + val carbonTable: CarbonTable = getCarbonTable(logicalRelation, attr) + if (null != carbonTable) { + val relation = getColumnRelation(attr.name, + carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId, + carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName, + carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName, + carbonTable) + if (null != relation) { + arrayBuffer += relation + } + if (aggregateType.isEmpty && arrayBuffer.nonEmpty) { + val tableName = carbonTable.getTableName + fieldToDataMapFieldMap += + getFieldToDataMapFields(agg.name, + agg.dataType, + attr.qualifier, + aggregateType, + arrayBuffer, + tableName) + } + } + } + if (!aggregateType.isEmpty && arrayBuffer.nonEmpty) { + fieldToDataMapFieldMap += + getFieldToDataMapFields(agg.name, + agg.dataType, + agg.qualifier, + aggregateType, + arrayBuffer, + "") + } + } + groupByExp map { grp => + grp.collect { + case attr: AttributeReference => + val carbonTable: CarbonTable = getCarbonTable(logicalRelation, attr) + if (null != carbonTable) { + val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new + ArrayBuffer[ColumnTableRelation]() + arrayBuffer += getColumnRelation(attr.name, + carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId, + carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName, + carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName, + carbonTable) + fieldToDataMapFieldMap += + getFieldToDataMapFields(attr.name, + attr.dataType, + attr.qualifier, + "", + arrayBuffer, + carbonTable.getTableName) + } + } + } + fieldToDataMapFieldMap + } + + /** + * Below method will be used to get the column relation with the parent column + */ + def getColumnRelation(parentColumnName: String, + parentTableId: String, + parentTableName: String, + parentDatabaseName: String, + carbonTable: CarbonTable): ColumnTableRelation = { + val parentColumn = carbonTable.getColumnByName(parentTableName, parentColumnName) + var columnTableRelation: ColumnTableRelation = null + if (null != parentColumn) { + val parentColumnId = parentColumn.getColumnId + columnTableRelation = ColumnTableRelation(parentColumnName = parentColumnName, + parentColumnId = parentColumnId, + parentTableName = parentTableName, + parentDatabaseName = parentDatabaseName, parentTableId = parentTableId) + columnTableRelation + } else { + columnTableRelation + } + } + + /** + * This method is used to get carbon table for corresponding attribute reference + * from logical relation + */ + private def getCarbonTable(logicalRelation: Seq[LogicalRelation], + attr: AttributeReference) = { + val relations = logicalRelation + .filter(lr => lr.output + .exists(attrRef => attrRef.name.equalsIgnoreCase(attr.name) && + attrRef.exprId.equals(attr.exprId))) + if (relations.nonEmpty) { + relations + .head.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation + .metaData.carbonTable + } else { + null + } + } + + /** + * Below method will be used to get the fields object for mv table + */ + private def getFieldToDataMapFields(name: String, + dataType: DataType, + qualifier: Option[String], + aggregateType: String, + columnTableRelationList: ArrayBuffer[ColumnTableRelation], + parenTableName: String) = { + var actualColumnName = + name.replace("(", "_") + .replace(")", "") + .replace(" ", "_") + .replace("=", "") + .replace(",", "") + if (qualifier.isDefined) { + actualColumnName = qualifier.map(qualifier => qualifier + "_" + name) + .getOrElse(actualColumnName) + } + if (qualifier.isEmpty) { + if (aggregateType.isEmpty && !parenTableName.isEmpty) { + actualColumnName = parenTableName + "_" + actualColumnName + } + } + val rawSchema = '`' + actualColumnName + '`' + ' ' + dataType.typeName + val dataMapField = DataMapField(aggregateType, Some(columnTableRelationList)) + if (dataType.typeName.startsWith("decimal")) { + val (precision, scale) = CommonUtil.getScaleAndPrecision(dataType.catalogString) + (Field(column = actualColumnName, + dataType = Some(dataType.typeName), + name = Some(actualColumnName), + children = None, + precision = precision, + scale = scale, + rawSchema = rawSchema), dataMapField) + } else { + (Field(column = actualColumnName, + dataType = Some(dataType.typeName), + name = Some(actualColumnName), + children = None, + rawSchema = rawSchema), dataMapField) + } + } +} diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCountAndCaseTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCountAndCaseTestCase.scala index 567d6a9..af4afb6 100644 --- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCountAndCaseTestCase.scala +++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCountAndCaseTestCase.scala @@ -47,8 +47,6 @@ class MVCountAndCaseTestCase extends QueryTest with BeforeAndAfterAll{ | FROM data_table | GROUP BY STARTTIME,LAYER4ID""".stripMargin) - sql("rebuild datamap data_table_mv") - var frame = sql(s"""SELECT MT.`3600` AS `3600`, | MT.`2250410101` AS `2250410101`, | count(1) over() as countNum, diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala index 5016bbe..4f5423e 100644 --- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala +++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala @@ -180,7 +180,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll { } test("test create datamap with simple and same projection with datamap filter on non projection column and extra column filter") { - sql("create datamap datamap9 using 'mv' as select empname, designation from fact_table1 where deptname='cloud'") + sql("create datamap datamap9 using 'mv' as select empname, designation,deptname from fact_table1 where deptname='cloud'") val frame = sql("select empname,designation from fact_table1 where deptname='cloud'") val analyzed = frame.queryExecution.analyzed assert(verifyMVDataMap(analyzed, "datamap9")) @@ -189,7 +189,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll { } test("test create datamap with simple and same projection with datamap filter on non projection column and no column filter") { - sql("create datamap datamap10 using 'mv' as select empname, designation from fact_table1 where deptname='cloud'") + sql("create datamap datamap10 using 'mv' as select empname, designation,deptname from fact_table1 where deptname='cloud'") val frame = sql("select empname,designation from fact_table1") val analyzed = frame.queryExecution.analyzed assert(!verifyMVDataMap(analyzed, "datamap10")) @@ -198,7 +198,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll { } test("test create datamap with simple and same projection with datamap filter on non projection column and different column filter") { - sql("create datamap datamap11 using 'mv' as select empname, designation from fact_table1 where deptname='cloud'") + sql("create datamap datamap11 using 'mv' as select empname, designation,deptname from fact_table1 where deptname='cloud'") val frame = sql("select empname,designation from fact_table1 where designation='SA'") val analyzed = frame.queryExecution.analyzed assert(!verifyMVDataMap(analyzed, "datamap11")) @@ -327,7 +327,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll { test("test create datamap with simple join and filter on query") { sql("drop datamap if exists datamap22") - sql("create datamap datamap22 using 'mv' as select t1.empname, t2.designation from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname)") + sql("create datamap datamap22 using 'mv' as select t1.empname, t2.designation,t2.empname from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname)") val frame = sql( "select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2 where t1.empname = " + "t2.empname and t1.empname='shivani'") @@ -341,7 +341,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll { test("test create datamap with simple join and filter on query and datamap") { sql("drop datamap if exists datamap23") - sql("create datamap datamap23 using 'mv' as select t1.empname, t2.designation from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) where t1.empname='shivani'") + sql("create datamap datamap23 using 'mv' as select t1.empname, t2.designation, t2.empname from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) where t1.empname='shivani'") val frame = sql( "select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2 where t1.empname = " + "t2.empname and t1.empname='shivani'") @@ -354,7 +354,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll { test("test create datamap with simple join and filter on datamap and no filter on query") { sql("drop datamap if exists datamap24") - sql("create datamap datamap24 using 'mv' as select t1.empname, t2.designation from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) where t1.empname='shivani'") + sql("create datamap datamap24 using 'mv' as select t1.empname, t2.designation, t2.empname from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) where t1.empname='shivani'") val frame = sql( "select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2 where t1.empname = t2.empname") val analyzed = frame.queryExecution.analyzed @@ -365,7 +365,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll { test("test create datamap with multiple join") { sql("drop datamap if exists datamap25") - sql("create datamap datamap25 using 'mv' as select t1.empname as c1, t2.designation from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) inner join fact_table3 t3 on (t1.empname=t3.empname)") + sql("create datamap datamap25 using 'mv' as select t1.empname as c1, t2.designation, t2.empname, t3.empname from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) inner join fact_table3 t3 on (t1.empname=t3.empname)") val frame = sql( "select t1.empname as c1, t2.designation from fact_table1 t1,fact_table2 t2 where t1.empname = t2.empname") val analyzed = frame.queryExecution.analyzed @@ -379,7 +379,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll { } ignore("test create datamap with simple join on datamap and multi join on query") { - sql("create datamap datamap26 using 'mv' as select t1.empname, t2.designation from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname)") + sql("create datamap datamap26 using 'mv' as select t1.empname, t2.designation, t2.empname from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname)") val frame = sql( "select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2,fact_table3 " + "t3 where t1.empname = t2.empname and t1.empname=t3.empname") @@ -391,7 +391,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll { } test("test create datamap with join with group by") { - sql("create datamap datamap27 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) group by t1.empname, t2.designation") + sql("create datamap datamap27 using 'mv' as select t1.empname , t2.designation, sum(t1.utilization), sum(t2.empname) from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) group by t1.empname, t2.designation") val frame = sql( "select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2 " + "where t1.empname = t2.empname group by t1.empname, t2.designation") @@ -404,7 +404,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll { test("test create datamap with join with group by and sub projection") { sql("drop datamap if exists datamap28") - sql("create datamap datamap28 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) group by t1.empname, t2.designation") + sql("create datamap datamap28 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization),sum(t2.empname) from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) group by t1.empname, t2.designation") val frame = sql( "select t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2 where " + "t1.empname = t2.empname group by t2.designation") @@ -417,7 +417,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll { test("test create datamap with join with group by and sub projection with filter") { sql("drop datamap if exists datamap29") - sql("create datamap datamap29 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) group by t1.empname, t2.designation") + sql("create datamap datamap29 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization),sum(t2.empname) from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) group by t1.empname, t2.designation") val frame = sql( "select t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2 where " + "t1.empname = t2.empname and t1.empname='shivani' group by t2.designation") @@ -430,7 +430,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll { ignore("test create datamap with join with group by with filter") { sql("drop datamap if exists datamap30") - sql("create datamap datamap30 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) group by t1.empname, t2.designation") + sql("create datamap datamap30 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization),sum(t2.empname) from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) group by t1.empname, t2.designation") val frame = sql( "select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2 " + "where t1.empname = t2.empname and t2.designation='SA' group by t1.empname, t2.designation") @@ -612,7 +612,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll { test("test create datamap with left join on query and equi join on mv with group by with filter") { sql("drop datamap if exists datamap45") - sql("create datamap datamap45 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 join fact_table2 t2 on t1.empname = t2.empname group by t1.empname, t2.designation") + sql("create datamap datamap45 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization),sum(t2.empname) from fact_table1 t1 join fact_table2 t2 on t1.empname = t2.empname group by t1.empname, t2.designation") // During spark optimizer it converts the left outer join queries with equi join if any filter present on right side table val frame = sql( "select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2 " + @@ -649,7 +649,6 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll { } test("jira carbondata-2528-2") { - sql("drop datamap if exists MV_order") sql("drop datamap if exists MV_desc_order") sql("create datamap MV_order using 'mv' as select empname,sum(salary)+sum(utilization) as total from fact_table1 group by empname") @@ -910,7 +909,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll { val exception_tb_mv2: Exception = intercept[Exception] { sql("create datamap dm_stream_test2 using 'mv' as select t1.empname as c1, t2.designation, " + - "t2.empname as c2 from (fact_table1 t1 inner join fact_streaming_table2 t2 " + + "t2.empname as c2,t3.empname from (fact_table1 t1 inner join fact_streaming_table2 t2 " + "on (t1.empname = t2.empname)) inner join fact_table_parquet t3 " + "on (t1.empname = t3.empname)") } diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala index bbd7b4c..2e64055 100644 --- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala +++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala @@ -182,7 +182,6 @@ class MVIncrementalLoadingTestcase extends QueryTest with BeforeAndAfterAll { sql(s"rebuild datamap datamap1") loadDataToFactTable("test_table") sql(s"rebuild datamap datamap1") - checkExistence(sql("show segments for table datamap1_table"), false, "0.1") sql("alter datamap datamap1 compact 'major'") val dataMapTable = CarbonMetadata.getInstance().getCarbonTable( CarbonCommonConstants.DATABASE_DEFAULT_NAME, diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVMultiJoinTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVMultiJoinTestCase.scala index bfd621d..4e3eb10 100644 --- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVMultiJoinTestCase.scala +++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVMultiJoinTestCase.scala @@ -48,8 +48,9 @@ class MVMultiJoinTestCase extends QueryTest with BeforeAndAfterAll { |inner join areas as c on c.pid=p.aid |where p.title = 'hebei' """.stripMargin - sql("create datamap table_mv using 'mv' as " + mvSQL) - sql("rebuild datamap table_mv") + sql("create datamap table_mv using 'mv' as " + + "select p.title,c.title,c.pid,p.aid from areas as p inner join areas as c on " + + "c.pid=p.aid where p.title = 'hebei'") val frame = sql(mvSQL) assert(verifyMVDataMap(frame.queryExecution.analyzed, "table_mv")) checkAnswer(frame, Seq(Row("hebei","shijiazhuang"), Row("hebei","handan"))) @@ -70,8 +71,7 @@ class MVMultiJoinTestCase extends QueryTest with BeforeAndAfterAll { | left join dim_table dim_other on sdr.name = dim_other.name | group by sdr.name,dim.age,dim_other.height """.stripMargin - sql("create datamap table_mv using 'mv' as " + mvSQL) - sql("rebuild datamap table_mv") + sql("create datamap table_mv using 'mv' as " + "select sdr.name,sum(sdr.score),dim.age,dim_other.height,count(dim.name) as c1, count(dim_other.name) as c2 from sdr_table sdr left join dim_table dim on sdr.name = dim.name left join dim_table dim_other on sdr.name = dim_other.name group by sdr.name,dim.age,dim_other.height") val frame = sql(mvSQL) assert(verifyMVDataMap(frame.queryExecution.analyzed, "table_mv")) checkAnswer(frame, Seq(Row("lily",80,30,160),Row("tom",120,20,170))) diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTpchTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTpchTestCase.scala index 5788a23..b5d874a 100644 --- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTpchTestCase.scala +++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTpchTestCase.scala @@ -110,7 +110,7 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll { test("test create datamap with tpch3 with no filters on mv") { sql(s"drop datamap if exists datamap5") - sql("create datamap datamap5 using 'mv' as select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority,c_mktsegment,l_shipdate from customer, orders, lineitem where c_custkey = o_custkey and l_orderkey = o_orderkey group by l_orderkey, o_orderdate, o_shippriority,c_mktsegment,l_shipdate") + sql("create datamap datamap5 using 'mv' as select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority,c_mktsegment,l_shipdate, c_custkey as c1, o_custkey as c2,o_orderkey as o1 from customer, orders, lineitem where c_custkey = o_custkey and l_orderkey = o_orderkey group by l_orderkey, o_orderdate, o_shippriority,c_mktsegment,l_shipdate,c_custkey,o_custkey, o_orderkey ") val df = sql("select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority from customer, orders, lineitem where c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = o_orderkey and o_orderdate < date('1995-03-15') and l_shipdate > date('1995-03-15') group by l_orderkey, o_orderdate, o_shippriority order by revenue desc, o_orderdate limit 10") val analyzed = df.queryExecution.analyzed assert(verifyMVDataMap(analyzed, "datamap5")) @@ -150,7 +150,7 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll { test("test create datamap with tpch5 with no filters on mv") { sql(s"drop datamap if exists datamap8") - sql("create datamap datamap8 using 'mv' as select n_name,o_orderdate,r_name, sum(l_extendedprice * (1 - l_discount)) as revenue from customer, orders, lineitem, supplier, nation, region where c_custkey = o_custkey and l_orderkey = o_orderkey and l_suppkey = s_suppkey and c_nationkey = s_nationkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey group by n_name,o_orderdate,r_name") + sql("create datamap datamap8 using 'mv' as select n_name,o_orderdate,r_name, sum(l_extendedprice * (1 - l_discount)) as revenue, sum(c_custkey), sum(o_custkey), sum(l_orderkey),sum(o_orderkey), sum(l_suppkey), sum(s_suppkey), sum(c_nationkey), sum(s_nationkey), sum(n_nationkey), sum(n_regionkey), sum(r_regionkey) from customer, orders, lineitem, supplier, nation, region where c_custkey = o_custkey and l_orderkey = o_orderkey and l_suppkey = s_suppkey and c_nationkey = s_nationkey an [...] val df = sql("select n_name, sum(l_extendedprice * (1 - l_discount)) as revenue from customer, orders, lineitem, supplier, nation, region where c_custkey = o_custkey and l_orderkey = o_orderkey and l_suppkey = s_suppkey and c_nationkey = s_nationkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'ASIA' and o_orderdate >= date('1994-01-01') and o_orderdate < date('1995-01-01') group by n_name order by revenue desc") val analyzed = df.queryExecution.analyzed assert(verifyMVDataMap(analyzed, "datamap8")) @@ -160,7 +160,7 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll { test("test create datamap with tpch6") { sql(s"drop datamap if exists datamap9") - sql("create datamap datamap9 using 'mv' as select sum(l_extendedprice * l_discount) as revenue from lineitem where l_shipdate >= date('1994-01-01') and l_shipdate < date('1995-01-01') and l_discount between 0.05 and 0.07 and l_quantity < 24") + sql("create datamap datamap9 using 'mv' as select sum(l_extendedprice * l_discount) as revenue, count(l_shipdate), sum(l_discount),sum(l_quantity) from lineitem where l_shipdate >= date('1994-01-01') and l_shipdate < date('1995-01-01') and l_discount between 0.05 and 0.07 and l_quantity < 24") val df = sql("select sum(l_extendedprice * l_discount) as revenue from lineitem where l_shipdate >= date('1994-01-01') and l_shipdate < date('1995-01-01') and l_discount between 0.05 and 0.07 and l_quantity < 24") val analyzed = df.queryExecution.analyzed assert(verifyMVDataMap(analyzed, "datamap9")) @@ -182,7 +182,7 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll { test("test create datamap with tpch7 part of query1") { sql(s"drop datamap if exists datamap11") - sql("create datamap datamap11 using 'mv' as select l_shipdate,n_name , l_extendedprice , l_discount from supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey") + sql("create datamap datamap11 using 'mv' as select l_shipdate,n_name , l_extendedprice , l_discount, s_suppkey,l_suppkey, o_orderkey,l_orderkey, c_custkey, o_custkey, s_nationkey, n1.n_nationkey, c_nationkey from supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey") val df = sql("select year(l_shipdate) as l_year, l_extendedprice * (1 - l_discount) as volume from supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey and ( (n1.n_name = 'FRANCE') or (n1.n_name = 'GERMANY') ) and l_shipdate between date('1995-01-01') and date('1996-12-31')") val analyzed = df.queryExecution.analyzed assert(verifyMVDataMap(analyzed, "datamap11")) @@ -192,7 +192,7 @@ class MVTpchTestCase extends QueryTest with BeforeAndAfterAll { test("test create datamap with tpch7 part of query2 (core issue)") { sql(s"drop datamap if exists datamap12") - sql("create datamap datamap12 using 'mv' as select n1.n_name, l_shipdate, l_extendedprice ,l_discount from supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey") + sql("create datamap datamap12 using 'mv' as select n1.n_name, l_shipdate, l_extendedprice ,l_discount,s_suppkey, l_suppkey,o_orderkey,l_orderkey, c_custkey,o_custkey,s_nationkey, n1.n_nationkey,c_nationkey from supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey") val df = sql("select supp_nation, l_year, sum(volume) as revenue from ( select n1.n_name as supp_nation, year(l_shipdate) as l_year, l_extendedprice * (1 - l_discount) as volume from supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey and ( (n1.n_name = 'FRANCE' ) or (n1.n_name = 'GERMANY') ) and l_shipdate between date('1995-01-01') and date('19 [...] val analyzed = df.queryExecution.analyzed assert(verifyMVDataMap(analyzed, "datamap12")) diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala new file mode 100644 index 0000000..3978bd1 --- /dev/null +++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala @@ -0,0 +1,255 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.carbondata.mv.rewrite + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterEach + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.spark.exception.ProcessMetaDataException + +/** + * Test Class for MV Datamap to verify all scenerios + */ +class TestAllOperationsOnMV extends QueryTest with BeforeAndAfterEach { + + override def beforeEach(): Unit = { + sql("drop table IF EXISTS maintable") + sql("create table maintable(name string, c_code int, price int) stored by 'carbondata'") + sql("insert into table maintable select 'abc',21,2000") + sql("drop table IF EXISTS testtable") + sql("create table testtable(name string, c_code int, price int) stored by 'carbondata'") + sql("insert into table testtable select 'abc',21,2000") + sql("drop datamap if exists dm1") + sql("create datamap dm1 using 'mv' WITH DEFERRED REBUILD as select name,sum(price) " + + "from maintable group by name") + sql("rebuild datamap dm1") + checkResult() + } + + private def checkResult(): Unit = { + checkAnswer(sql("select name,sum(price) from maintable group by name"), + sql("select name,sum(price) from maintable group by name")) + } + + override def afterEach(): Unit = { + sql("drop table IF EXISTS maintable") + sql("drop table IF EXISTS testtable") + sql("drop table if exists par_table") + } + + test("test alter add column on maintable") { + sql("alter table maintable add columns(d int)") + sql("insert into table maintable select 'abc',21,2000,30") + sql("rebuild datamap dm1") + checkResult() + } + + test("test alter add column on datamaptable") { + intercept[ProcessMetaDataException] { + sql("alter table dm1_table add columns(d int)") + }.getMessage.contains("Cannot add columns in a DataMap table default.dm1_table") + } + + test("test drop column on maintable") { + // check drop column not present in datamap table + sql("alter table maintable drop columns(c_code)") + checkResult() + // check drop column present in datamap table + intercept[ProcessMetaDataException] { + sql("alter table maintable drop columns(name)") + }.getMessage.contains("Column name cannot be dropped because it exists in mv datamap: dm1") + } + + test("test alter drop column on datamaptable") { + intercept[ProcessMetaDataException] { + sql("alter table dm1_table drop columns(maintable_name)") + }.getMessage.contains("Cannot drop columns present in a datamap table default.dm1_table") + } + + test("test rename column on maintable") { + // check rename column not present in datamap table + sql("alter table maintable change c_code d_code int") + checkResult() + // check rename column present in mv datamap table + intercept[ProcessMetaDataException] { + sql("alter table maintable change name name1 string") + }.getMessage.contains("Column name exists in a MV datamap. Drop MV datamap to continue") + } + + test("test alter rename column on datamaptable") { + intercept[ProcessMetaDataException] { + sql("alter table dm1_table change sum_price sum_cost int") + }.getMessage.contains("Cannot change data type or rename column for columns " + + "present in mv datamap table default.dm1_table") + } + + test("test alter rename table") { + //check rename maintable + intercept[MalformedCarbonCommandException] { + sql("alter table maintable rename to maintable_rename") + }.getMessage.contains("alter rename is not supported for datamap table or for tables which have child datamap") + //check rename datamaptable + intercept[MalformedCarbonCommandException] { + sql("alter table dm1_table rename to dm11_table") + }.getMessage.contains("alter rename is not supported for datamap table or for tables which have child datamap") + } + + test("test alter change datatype") { + //change datatype for column + intercept[ProcessMetaDataException] { + sql("alter table maintable change price price bigint") + }.getMessage.contains("Column price exists in a MV datamap. Drop MV datamap to continue") + //change datatype for column not present in datamap table + sql("alter table maintable change c_code c_code bigint") + checkResult() + //change datatype for column present in datamap table + intercept[ProcessMetaDataException] { + sql("alter table dm1_table change sum_price sum_price bigint") + }.getMessage.contains("Cannot change data type or rename column for columns " + + "present in mv datamap table default.dm1_table") + } + + test("test dmproperties") { + sql("drop datamap if exists dm1") + sql("create datamap dm1 on table maintable using 'mv' WITH DEFERRED REBUILD dmproperties" + + "('LOCAL_DICTIONARY_ENABLE'='false') as select name,price from maintable") + checkExistence(sql("describe formatted dm1_table"), true, "Local Dictionary Enabled false") + sql("drop datamap if exists dm1") + sql("create datamap dm1 on table maintable using 'mv' WITH DEFERRED REBUILD dmproperties('TABLE_BLOCKSIZE'='256 MB') " + + "as select name,price from maintable") + checkExistence(sql("describe formatted dm1_table"), true, "Table Block Size 256 MB") + } + + test("test table properties") { + sql("drop table IF EXISTS maintable") + sql("create table maintable(name string, c_code int, price int) stored by 'carbondata' tblproperties('LOCAL_DICTIONARY_ENABLE'='false')") + sql("drop datamap if exists dm1") + sql("create datamap dm1 using 'mv' WITH DEFERRED REBUILD as select name,price from maintable") + checkExistence(sql("describe formatted dm1_table"), true, "Local Dictionary Enabled false") + sql("drop table IF EXISTS maintable") + sql("create table maintable(name string, c_code int, price int) stored by 'carbondata' tblproperties('TABLE_BLOCKSIZE'='256 MB')") + sql("drop datamap if exists dm1") + sql("create datamap dm1 using 'mv' WITH DEFERRED REBUILD as select name,price from maintable") + checkExistence(sql("describe formatted dm1_table"), true, "Table Block Size 256 MB") + } + + test("test delete segment by id on main table") { + sql("drop table IF EXISTS maintable") + sql("create table maintable(name string, c_code int, price int) stored by 'carbondata'") + sql("insert into table maintable select 'abc',21,2000") + sql("insert into table maintable select 'abc',21,2000") + sql("Delete from table maintable where segment.id in (0)") + sql("drop datamap if exists dm1") + sql("create datamap dm1 using 'mv' WITH DEFERRED REBUILD as select name,sum(price) " + + "from maintable group by name") + sql("rebuild datamap dm1") + intercept[UnsupportedOperationException] { + sql("Delete from table maintable where segment.id in (1)") + }.getMessage.contains("Delete segment operation is not supported on tables which have mv datamap") + intercept[UnsupportedOperationException] { + sql("Delete from table dm1_table where segment.id in (0)") + }.getMessage.contains("Delete segment operation is not supported on mv table") + } + + test("test delete segment by date on main table") { + sql("drop table IF EXISTS maintable") + sql("create table maintable(name string, c_code int, price int) stored by 'carbondata'") + sql("insert into table maintable select 'abc',21,2000") + sql("insert into table maintable select 'abc',21,2000") + sql("Delete from table maintable where segment.id in (0)") + sql("drop datamap if exists dm1") + sql("create datamap dm1 using 'mv' WITH DEFERRED REBUILD as select name,sum(price) " + + "from maintable group by name") + sql("rebuild datamap dm1") + intercept[UnsupportedOperationException] { + sql("DELETE FROM TABLE maintable WHERE SEGMENT.STARTTIME BEFORE '2017-06-01 12:05:06'") + }.getMessage.contains("Delete segment operation is not supported on tables which have mv datamap") + intercept[UnsupportedOperationException] { + sql("DELETE FROM TABLE dm1_table WHERE SEGMENT.STARTTIME BEFORE '2017-06-01 12:05:06'") + }.getMessage.contains("Delete segment operation is not supported on mv table") + } + + test("test partition table with mv") { + sql("drop table if exists par_table") + sql("CREATE TABLE par_table(id INT, name STRING, age INT) PARTITIONED BY(city string) STORED BY 'carbondata'") + sql("insert into par_table values(1,'abc',3,'def')") + sql("drop datamap if exists p1") + sql("create datamap p1 using 'mv' WITH DEFERRED REBUILD as select city, id from par_table") + sql("rebuild datamap p1") + intercept[MalformedCarbonCommandException] { + sql("alter table par_table drop partition (city='def')") + }.getMessage.contains("Drop Partition is not supported for datamap table or for tables which have child datamap") + sql("drop datamap if exists p1") + } + + test("test direct load to mv datamap table") { + sql("drop table IF EXISTS maintable") + sql("create table maintable(name string, c_code int, price int) stored by 'carbondata'") + sql("insert into table maintable select 'abc',21,2000") + sql("drop datamap if exists dm1") + sql("create datamap dm1 using 'mv' WITH DEFERRED REBUILD as select name " + + "from maintable") + sql("rebuild datamap dm1") + intercept[UnsupportedOperationException] { + sql("insert into dm1_table select 2") + }.getMessage.contains("Cannot insert/load data directly into pre-aggregate/child table") + sql("drop table IF EXISTS maintable") + } + + + test("test drop datamap with tablename") { + sql("drop table IF EXISTS maintable") + sql("create table maintable(name string, c_code int, price int) stored by 'carbondata'") + sql("insert into table maintable select 'abc',21,2000") + sql("drop datamap if exists dm1 on table maintable") + sql("create datamap dm1 using 'mv' WITH DEFERRED REBUILD as select price " + + "from maintable") + sql("rebuild datamap dm1") + checkAnswer(sql("select price from maintable"), Seq(Row(2000))) + checkExistence(sql("show datamap on table maintable"), true, "dm1") + sql("drop datamap dm1 on table maintable") + checkExistence(sql("show datamap on table maintable"), false, "dm1") + sql("drop table IF EXISTS maintable") + } + + test("test mv with attribute having qualifier") { + sql("drop table if exists maintable") + sql("create table maintable (product string) partitioned by (amount int) stored by 'carbondata' ") + sql("insert into maintable values('Mobile',2000)") + sql("drop datamap if exists p") + sql("Create datamap p using 'mv' as Select p.product, p.amount from maintable p where p.product = 'Mobile'") + sql("rebuild datamap p") + checkAnswer(sql("Select p.product, p.amount from maintable p where p.product = 'Mobile'"), Seq(Row("Mobile", 2000))) + sql("drop table IF EXISTS maintable") + } + + test("test mv with non-carbon table") { + sql("drop table if exists noncarbon") + sql("create table noncarbon (product string,amount int)") + sql("insert into noncarbon values('Mobile',2000)") + sql("drop datamap if exists p") + intercept[MalformedCarbonCommandException] { + sql("Create datamap p using 'mv' as Select product from noncarbon") + }.getMessage.contains("Non-Carbon table does not support creating MV datamap") + sql("drop table if exists noncarbon") + } + +} + diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/matching/TestSQLBatch.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/matching/TestSQLBatch.scala index 96f1816..07cd232 100644 --- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/matching/TestSQLBatch.scala +++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/matching/TestSQLBatch.scala @@ -23,9 +23,9 @@ object TestSQLBatch { val sampleTestCases = Seq( ("case_1", s""" - |SELECT i_item_id + |SELECT i_item_id, i_item_sk |FROM Item - |WHERE i_item_sk = 1 + |WHERE i_item_sk = 2 """.stripMargin.trim, s""" |SELECT i_item_id, i_item_sk diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala index 49a62a2..7ba8300 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala @@ -211,7 +211,7 @@ class TestPreAggregateLoad extends SparkQueryTest with BeforeAndAfterAll with Be .stripMargin) assert(intercept[RuntimeException] { sql(s"insert into maintable_preagg_sum values(1, 30)") - }.getMessage.equalsIgnoreCase("Cannot insert/load data directly into pre-aggregate table")) + }.getMessage.equalsIgnoreCase("Cannot insert/load data directly into pre-aggregate/child table")) } test("test whether all segments are loaded into pre-aggregate table if segments are set on main table") { diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesUnsupportedSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesUnsupportedSuite.scala index a7e425c..7fa2672 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesUnsupportedSuite.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesUnsupportedSuite.scala @@ -71,7 +71,7 @@ class TestTimeSeriesUnsupportedSuite extends QueryTest with BeforeAndAfterAll wi sql(s"INSERT INTO maintable_agg1_minute VALUES('2016-02-23 09:01:00.0', 60)") } assert(e.getMessage.equalsIgnoreCase( - "Cannot insert/load data directly into pre-aggregate table")) + "Cannot insert/load data directly into pre-aggregate/child table")) // check value after inserting checkAnswer(sql("SELECT * FROM maintable_agg1_minute"), @@ -94,7 +94,7 @@ class TestTimeSeriesUnsupportedSuite extends QueryTest with BeforeAndAfterAll wi sql(s"INSERT INTO maintable_agg1_minute VALUES('2016-02-23 09:01:00.0', 60)") } assert(e.getMessage.equalsIgnoreCase( - "Cannot insert/load data directly into pre-aggregate table")) + "Cannot insert/load data directly into pre-aggregate/child table")) } test("test timeseries unsupported 3: don't support insert") { @@ -118,7 +118,7 @@ class TestTimeSeriesUnsupportedSuite extends QueryTest with BeforeAndAfterAll wi sql(s"INSERT INTO maintable_agg1_minute VALUES('2016-02-23 09:01:00.0', 'hello', 60)") } assert(e.getMessage.equalsIgnoreCase( - "Cannot insert/load data directly into pre-aggregate table")) + "Cannot insert/load data directly into pre-aggregate/child table")) } test("test timeseries unsupported 4: don't support load") { @@ -147,7 +147,7 @@ class TestTimeSeriesUnsupportedSuite extends QueryTest with BeforeAndAfterAll wi """.stripMargin) } assert(e.getMessage.equalsIgnoreCase( - "Cannot insert/load data directly into pre-aggregate table")) + "Cannot insert/load data directly into pre-aggregate/child table")) } test("test timeseries unsupported 5: don't support update") { diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala index d9dec68..3c6b265 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.events.{MergeBloomIndexEventListener, MergeIndexEventListener} import org.apache.spark.sql.execution.command.cache._ -import org.apache.spark.sql.execution.command.mv.{AlterDataMaptableCompactionPostListener, LoadPostDataMapListener} +import org.apache.spark.sql.execution.command.mv._ import org.apache.spark.sql.execution.command.preaaggregate._ import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction import org.apache.spark.sql.hive._ @@ -195,6 +195,13 @@ object CarbonEnv { .addListener(classOf[DropTableCacheEvent], DropCacheBloomEventListener) .addListener(classOf[ShowTableCacheEvent], ShowCachePreAggEventListener) .addListener(classOf[ShowTableCacheEvent], ShowCacheBloomEventListener) + .addListener(classOf[DeleteSegmentByIdPreEvent], DataMapDeleteSegmentPreListener) + .addListener(classOf[DeleteSegmentByDatePreEvent], DataMapDeleteSegmentPreListener) + .addListener(classOf[AlterTableDropColumnPreEvent], DataMapDropColumnPreListener) + .addListener(classOf[AlterTableColRenameAndDataTypeChangePreEvent], + DataMapChangeDataTypeorRenameColumnPreListener) + .addListener(classOf[AlterTableAddColumnPreEvent], DataMapAddColumnsPreListener) + } /** diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala index 0bafe04..b4e60fb 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala @@ -122,6 +122,15 @@ case class CarbonDropDataMapCommand( dropDataMapFromSystemFolder(sparkSession) return Seq.empty } + } else if (mainTable != null) { + // If table is defined and datamap is MV datamap, then drop the datamap + val dmSchema = DataMapStoreManager.getInstance().getAllDataMapSchemas.asScala + .filter(dataMapSchema => dataMapSchema.getDataMapName.equalsIgnoreCase(dataMapName)) + if (dmSchema.nonEmpty && (!dmSchema.head.isIndexDataMap && + null != dmSchema.head.getRelationIdentifier)) { + dropDataMapFromSystemFolder(sparkSession) + return Seq.empty + } } // drop preaggregate datamap. diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala index c142398..a2a9d3c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, Checker, DataCommand} import org.apache.spark.sql.optimizer.CarbonFilters +import org.apache.spark.util.DataMapUtil import org.apache.carbondata.api.CarbonStore import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException @@ -71,7 +72,7 @@ case class CarbonCleanFilesCommand( isInternalCleanCall = true) }.toList cleanFileCommands.foreach(_.processMetadata(sparkSession)) - } else if (CarbonTable.hasMVDataMap(carbonTable)) { + } else if (DataMapUtil.hasMVDataMap(carbonTable)) { val allDataMapSchemas = DataMapStoreManager.getInstance .getDataMapSchemasOfTable(carbonTable).asScala .filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier && diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mv/DataMapListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mv/DataMapListeners.scala index f94b73a..5b1d7e5 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mv/DataMapListeners.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mv/DataMapListeners.scala @@ -18,22 +18,32 @@ package org.apache.spark.sql.execution.command.mv import scala.collection.JavaConverters._ +import scala.collection.mutable import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.command.AlterTableModel import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand +import org.apache.spark.util.DataMapUtil import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.datamap.status.DataMapStatusManager -import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} import org.apache.carbondata.datamap.DataMapManager -import org.apache.carbondata.events.{ - AlterTableCompactionPreStatusUpdateEvent, - DeleteFromTablePostEvent, Event, OperationContext, OperationEventListener, UpdateTablePostEvent -} +import org.apache.carbondata.events._ import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePostExecutionEvent import org.apache.carbondata.processing.merger.CompactionType + +object DataMapListeners { + def getDataMapTableColumns(dataMapSchema: DataMapSchema, + carbonTable: CarbonTable): mutable.Buffer[String] = { + val listOfColumns: mutable.Buffer[String] = new mutable.ArrayBuffer[String]() + listOfColumns.asJava + .addAll(dataMapSchema.getMainTableColumnList.get(carbonTable.getTableName)) + listOfColumns + } +} + /** * Listener to trigger compaction on mv datamap after main table compaction */ @@ -139,3 +149,129 @@ object LoadPostDataMapListener extends OperationEventListener { } } } + +/** + * Listeners to block operations like delete segment on id or by date on tables + * having an mv datamap or on mv datamap tables + */ +object DataMapDeleteSegmentPreListener extends OperationEventListener { + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override def onEvent(event: Event, operationContext: OperationContext): Unit = { + val carbonTable = event match { + case e: DeleteSegmentByIdPreEvent => + e.asInstanceOf[DeleteSegmentByIdPreEvent].carbonTable + case e: DeleteSegmentByDatePreEvent => + e.asInstanceOf[DeleteSegmentByDatePreEvent].carbonTable + } + if (null != carbonTable) { + if (DataMapUtil.hasMVDataMap(carbonTable)) { + throw new UnsupportedOperationException( + "Delete segment operation is not supported on tables having child datamap") + } + if (carbonTable.isChildTable) { + throw new UnsupportedOperationException( + "Delete segment operation is not supported on datamap table") + } + } + } +} + +object DataMapAddColumnsPreListener extends OperationEventListener { + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override def onEvent(event: Event, operationContext: OperationContext): Unit = { + val dataTypeChangePreListener = event.asInstanceOf[AlterTableAddColumnPreEvent] + val carbonTable = dataTypeChangePreListener.carbonTable + if (carbonTable.isChildTable) { + throw new UnsupportedOperationException( + s"Cannot add columns in a DataMap table " + + s"${ carbonTable.getDatabaseName }.${ carbonTable.getTableName }") + } + } +} + + +object DataMapDropColumnPreListener extends OperationEventListener { + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override def onEvent(event: Event, operationContext: OperationContext): Unit = { + val dropColumnChangePreListener = event.asInstanceOf[AlterTableDropColumnPreEvent] + val carbonTable = dropColumnChangePreListener.carbonTable + val alterTableDropColumnModel = dropColumnChangePreListener.alterTableDropColumnModel + val columnsToBeDropped = alterTableDropColumnModel.columns + if (DataMapUtil.hasMVDataMap(carbonTable)) { + val dataMapSchemaList = DataMapStoreManager.getInstance + .getDataMapSchemasOfTable(carbonTable).asScala + for (dataMapSchema <- dataMapSchemaList) { + if (null != dataMapSchema && !dataMapSchema.isIndexDataMap) { + val listOfColumns = DataMapListeners.getDataMapTableColumns(dataMapSchema, carbonTable) + val columnExistsInChild = listOfColumns.collectFirst { + case parentColumnName if columnsToBeDropped.contains(parentColumnName) => + parentColumnName + } + if (columnExistsInChild.isDefined) { + throw new UnsupportedOperationException( + s"Column ${ columnExistsInChild.head } cannot be dropped because it exists " + + s"in " + dataMapSchema.getProviderName + " datamap:" + + s"${ dataMapSchema.getDataMapName }") + } + } + } + } + if (carbonTable.isChildTable) { + throw new UnsupportedOperationException( + s"Cannot drop columns present in a datamap table ${ carbonTable.getDatabaseName }." + + s"${ carbonTable.getTableName }") + } + } +} + +object DataMapChangeDataTypeorRenameColumnPreListener + extends OperationEventListener { + /** + * Called on a specified event occurrence + * + * @param event + * @param operationContext + */ + override def onEvent(event: Event, operationContext: OperationContext): Unit = { + val colRenameDataTypeChangePreListener = event + .asInstanceOf[AlterTableColRenameAndDataTypeChangePreEvent] + val carbonTable = colRenameDataTypeChangePreListener.carbonTable + val alterTableDataTypeChangeModel = colRenameDataTypeChangePreListener + .alterTableDataTypeChangeModel + val columnToBeAltered: String = alterTableDataTypeChangeModel.columnName + if (DataMapUtil.hasMVDataMap(carbonTable)) { + val dataMapSchemaList = DataMapStoreManager.getInstance + .getDataMapSchemasOfTable(carbonTable).asScala + for (dataMapSchema <- dataMapSchemaList) { + if (null != dataMapSchema && !dataMapSchema.isIndexDataMap) { + val listOfColumns = DataMapListeners.getDataMapTableColumns(dataMapSchema, carbonTable) + if (listOfColumns.contains(columnToBeAltered)) { + throw new UnsupportedOperationException( + s"Column $columnToBeAltered exists in a " + dataMapSchema.getProviderName + + " datamap. Drop " + dataMapSchema.getProviderName + " datamap to continue") + } + } + } + } + if (carbonTable.isChildTable) { + throw new UnsupportedOperationException( + s"Cannot change data type or rename column for columns present in mv datamap table " + + s"${ carbonTable.getDatabaseName }.${ carbonTable.getTableName }") + } + } +} diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala index a9b581c..9119375 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala @@ -26,8 +26,9 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand, AtomicRunnableCommand} -import org.apache.spark.util.AlterTableUtil +import org.apache.spark.util.{AlterTableUtil, DataMapUtil} +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.indexstore.PartitionSpec @@ -69,6 +70,10 @@ case class CarbonAlterTableDropHivePartitionCommand( table = CarbonEnv.getCarbonTable(tableName)(sparkSession) setAuditTable(table) setAuditInfo(Map("partition" -> specs.mkString(","))) + if (DataMapUtil.hasMVDataMap(table) || table.isChildTable) { + throw new MalformedCarbonCommandException( + "Drop Partition is not supported for datamap table or for tables which have child datamap") + } if (table.isHivePartitionTable) { var locks = List.empty[ICarbonLock] try { diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala index 4a0b492..c4c3539 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.parser.CarbonSpark2SqlParser import org.apache.carbondata.common.exceptions.MetadataProcessException import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment} +import org.apache.carbondata.core.datamap.{DataMapStoreManager, DataMapUtil, Segment} import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock} @@ -717,9 +717,9 @@ object LoadPreAggregateTablePreListener extends OperationEventListener { val carbonLoadModel = loadEvent.getCarbonLoadModel val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val isInternalLoadCall = carbonLoadModel.isAggLoadRequest - if (table.isChildDataMap && !isInternalLoadCall) { + if ((table.isChildDataMap || table.isChildTable) && !isInternalLoadCall) { throw new UnsupportedOperationException( - "Cannot insert/load data directly into pre-aggregate table") + "Cannot insert/load data directly into pre-aggregate/child table") } } } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala index b729347..86b2d00 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.sql.parser.CarbonSpark2SqlParser -import org.apache.spark.util.PartitionUtils +import org.apache.spark.util.{DataMapUtil, PartitionUtils} import org.apache.carbondata.common.exceptions.MetadataProcessException import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException @@ -90,104 +90,8 @@ case class PreAggregateTableHelper( throw new MalformedDataMapCommandException( "Parent table name is different in select and create") } - var neworder = Seq[String]() - val parentOrder = parentTable.getSortColumns(parentTable.getTableName).asScala - parentOrder.foreach(parentcol => - fields.filter(col => fieldRelationMap(col).aggregateFunction.isEmpty && - parentcol.equals(fieldRelationMap(col). - columnTableRelationList.get(0).parentColumnName)) - .map(cols => neworder :+= cols.column)) - tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, neworder.mkString(",")) - tableProperties.put("sort_scope", parentTable.getTableInfo.getFactTable. - getTableProperties.asScala.getOrElse("sort_scope", CarbonCommonConstants - .LOAD_SORT_SCOPE_DEFAULT)) - tableProperties - .put(CarbonCommonConstants.TABLE_BLOCKSIZE, parentTable.getBlockSizeInMB.toString) - tableProperties.put(CarbonCommonConstants.FLAT_FOLDER, - parentTable.getTableInfo.getFactTable.getTableProperties.asScala.getOrElse( - CarbonCommonConstants.FLAT_FOLDER, CarbonCommonConstants.DEFAULT_FLAT_FOLDER)) - - // Datamap table name and columns are automatically added prefix with parent table name - // in carbon. For convenient, users can type column names same as the ones in select statement - // when config dmproperties, and here we update column names with prefix. - // If longStringColumn is not present in dm properties then we take long_string_columns from - // the parent table. - var longStringColumn = tableProperties.get(CarbonCommonConstants.LONG_STRING_COLUMNS) - if (longStringColumn.isEmpty) { - val longStringColumnInParents = parentTable.getTableInfo.getFactTable.getTableProperties - .asScala - .getOrElse(CarbonCommonConstants.LONG_STRING_COLUMNS, "").split(",").map(_.trim) - val varcharDatamapFields = scala.collection.mutable.ArrayBuffer.empty[String] - fieldRelationMap foreach (fields => { - val aggFunc = fields._2.aggregateFunction - val relationList = fields._2.columnTableRelationList - // check if columns present in datamap are long_string_col in parent table. If they are - // long_string_columns in parent, make them long_string_columns in datamap - if (aggFunc.isEmpty && relationList.size == 1 && longStringColumnInParents - .contains(relationList.head.head.parentColumnName)) { - varcharDatamapFields += relationList.head.head.parentColumnName - } - }) - if (!varcharDatamapFields.isEmpty) { - longStringColumn = Option(varcharDatamapFields.mkString(",")) - } - } - - if (longStringColumn != None) { - val fieldNames = fields.map(_.column) - val newLongStringColumn = longStringColumn.get.split(",").map(_.trim).map{ colName => - val newColName = parentTable.getTableName.toLowerCase() + "_" + colName - if (!fieldNames.contains(newColName)) { - throw new MalformedDataMapCommandException( - CarbonCommonConstants.LONG_STRING_COLUMNS.toUpperCase() + ":" + colName - + " does not in datamap") - } - newColName - } - tableProperties.put(CarbonCommonConstants.LONG_STRING_COLUMNS, - newLongStringColumn.mkString(",")) - } - - // inherit the local dictionary properties of main parent table - tableProperties - .put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE, - parentTable.getTableInfo.getFactTable.getTableProperties.asScala - .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE, "false")) - tableProperties - .put(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD, - parentTable.getTableInfo.getFactTable.getTableProperties.asScala - .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD, - CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT)) - val parentDictInclude = parentTable.getTableInfo.getFactTable.getTableProperties.asScala - .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE, "").split(",") - - val parentDictExclude = parentTable.getTableInfo.getFactTable.getTableProperties.asScala - .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE, "").split(",") - - val newDictInclude = - parentDictInclude.flatMap(parentcol => - fields.collect { - case col if fieldRelationMap(col).aggregateFunction.isEmpty && - parentcol.equals(fieldRelationMap(col). - columnTableRelationList.get.head.parentColumnName) => - col.column - }) - - val newDictExclude = parentDictExclude.flatMap(parentcol => - fields.collect { - case col if fieldRelationMap(col).aggregateFunction.isEmpty && - parentcol.equals(fieldRelationMap(col). - columnTableRelationList.get.head.parentColumnName) => - col.column - }) - if (newDictInclude.nonEmpty) { - tableProperties - .put(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE, newDictInclude.mkString(",")) - } - if (newDictExclude.nonEmpty) { - tableProperties - .put(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE, newDictExclude.mkString(",")) - } + DataMapUtil + .inheritTablePropertiesFromMainTable(parentTable, fields, fieldRelationMap, tableProperties) val tableIdentifier = TableIdentifier(parentTable.getTableName + "_" + dataMapName, Some(parentTable.getDatabaseName)) diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala index f41cfc1..6e3e398 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCommand} import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog} -import org.apache.spark.util.AlterTableUtil +import org.apache.spark.util.{AlterTableUtil, DataMapUtil} import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory @@ -81,8 +81,9 @@ private[sql] case class CarbonAlterTableRenameCommand( throw new MalformedCarbonCommandException("alter rename is not supported for index datamap") } // if table have create mv datamap, not support table rename - if (CarbonTable.hasMVDataMap(oldCarbonTable)) { - throw new MalformedCarbonCommandException("alter rename is not supported for mv datamap") + if (DataMapUtil.hasMVDataMap(oldCarbonTable) || oldCarbonTable.isChildTable) { + throw new MalformedCarbonCommandException( + "alter rename is not supported for datamap table or for tables which have child datamap") } var timeStamp = 0L diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index 7d449b5..91d7675 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.datasources.{RefreshResource, RefreshTable import org.apache.spark.sql.hive.{CarbonRelation, CreateCarbonSourceTableAsSelectCommand} import org.apache.spark.sql.parser.CarbonSpark2SqlParser import org.apache.spark.sql.types.StructField -import org.apache.spark.util.{CarbonReflectionUtils, FileUtils, SparkUtil} +import org.apache.spark.util.{CarbonReflectionUtils, DataMapUtil, FileUtils, SparkUtil} import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory @@ -322,7 +322,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { throw new MalformedCarbonCommandException( "Streaming property value is incorrect") } - if (CarbonTable.hasMVDataMap(carbonTable)) { + if (DataMapUtil.hasMVDataMap(carbonTable)) { throw new MalformedCarbonCommandException( "The table which has MV datamap does not support set streaming property") } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala index d875fdf..96b6000 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.hive import scala.collection.JavaConverters._ -import scala.collection.mutable import org.apache.spark.SPARK_VERSION import org.apache.spark.sql._ @@ -30,12 +29,11 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.execution.command.mutation.CarbonProjectForDeleteCommand import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, FileFormat, HadoopFsRelation, LogicalRelation, SparkCarbonTableFormat} -import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CarbonException -import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil} +import org.apache.spark.util.{CarbonReflectionUtils, DataMapUtil, SparkUtil} import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datamap.{DataMapStoreManager, DataMapUtil} +import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.datamap.status.DataMapStatusManager import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.CarbonUtil @@ -71,7 +69,7 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica "Update operation is not supported for pre-aggregate table") } val indexSchemas = DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable) - if (CarbonTable.hasMVDataMap(carbonTable)) { + if (DataMapUtil.hasMVDataMap(carbonTable)) { val allDataMapSchemas = DataMapStoreManager.getInstance .getDataMapSchemasOfTable(carbonTable).asScala .filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier && @@ -214,7 +212,7 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica "Delete operation is not supported for pre-aggregate table") } val indexSchemas = DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable) - if (CarbonTable.hasMVDataMap(carbonTable)) { + if (DataMapUtil.hasMVDataMap(carbonTable)) { val allDataMapSchemas = DataMapStoreManager.getInstance .getDataMapSchemasOfTable(carbonTable).asScala .filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier && diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DataMapUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DataMapUtil.scala new file mode 100644 index 0000000..24e28ad --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/util/DataMapUtil.scala @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.io.IOException +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.execution.command.{DataMapField, Field} + +import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.MV +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} + +/** + * Utility class for keeping all the utility methods common for pre-aggregate and mv datamap + */ +object DataMapUtil { + + def inheritTablePropertiesFromMainTable(parentTable: CarbonTable, + fields: Seq[Field], + fieldRelationMap: scala.collection.mutable.LinkedHashMap[Field, DataMapField], + tableProperties: mutable.Map[String, String]): Unit = { + var neworder = Seq[String]() + val parentOrder = parentTable.getSortColumns(parentTable.getTableName).asScala + parentOrder.foreach(parentcol => + fields.filter(col => fieldRelationMap(col).aggregateFunction.isEmpty && + fieldRelationMap(col).columnTableRelationList.size == 1 && + parentcol.equals(fieldRelationMap(col). + columnTableRelationList.get(0).parentColumnName)) + .map(cols => neworder :+= cols.column)) + tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, neworder.mkString(",")) + tableProperties.put("sort_scope", parentTable.getTableInfo.getFactTable. + getTableProperties.asScala.getOrElse("sort_scope", CarbonCommonConstants + .LOAD_SORT_SCOPE_DEFAULT)) + tableProperties + .put(CarbonCommonConstants.TABLE_BLOCKSIZE, parentTable.getBlockSizeInMB.toString) + tableProperties.put(CarbonCommonConstants.FLAT_FOLDER, + parentTable.getTableInfo.getFactTable.getTableProperties.asScala.getOrElse( + CarbonCommonConstants.FLAT_FOLDER, CarbonCommonConstants.DEFAULT_FLAT_FOLDER)) + + // Datamap table name and columns are automatically added prefix with parent table name + // in carbon. For convenient, users can type column names same as the ones in select statement + // when config dmproperties, and here we update column names with prefix. + // If longStringColumn is not present in dm properties then we take long_string_columns from + // the parent table. + var longStringColumn = tableProperties.get(CarbonCommonConstants.LONG_STRING_COLUMNS) + if (longStringColumn.isEmpty) { + val longStringColumnInParents = parentTable.getTableInfo.getFactTable.getTableProperties + .asScala + .getOrElse(CarbonCommonConstants.LONG_STRING_COLUMNS, "").split(",").map(_.trim) + val varcharDatamapFields = scala.collection.mutable.ArrayBuffer.empty[String] + fieldRelationMap foreach (fields => { + val aggFunc = fields._2.aggregateFunction + val relationList = fields._2.columnTableRelationList + // check if columns present in datamap are long_string_col in parent table. If they are + // long_string_columns in parent, make them long_string_columns in datamap + if (aggFunc.isEmpty && relationList.size == 1 && longStringColumnInParents + .contains(relationList.head.head.parentColumnName)) { + varcharDatamapFields += relationList.head.head.parentColumnName + } + }) + if (!varcharDatamapFields.isEmpty) { + longStringColumn = Option(varcharDatamapFields.mkString(",")) + } + } + + if (longStringColumn != None) { + val fieldNames = fields.map(_.column) + val newLongStringColumn = longStringColumn.get.split(",").map(_.trim).map { colName => + val newColName = parentTable.getTableName.toLowerCase() + "_" + colName + if (!fieldNames.contains(newColName)) { + throw new MalformedDataMapCommandException( + CarbonCommonConstants.LONG_STRING_COLUMNS.toUpperCase() + ":" + colName + + " does not in datamap") + } + newColName + } + tableProperties.put(CarbonCommonConstants.LONG_STRING_COLUMNS, + newLongStringColumn.mkString(",")) + } + + // inherit the local dictionary properties of main parent table + tableProperties + .put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE, + parentTable.getTableInfo.getFactTable.getTableProperties.asScala + .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE, "false")) + tableProperties + .put(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD, + parentTable.getTableInfo.getFactTable.getTableProperties.asScala + .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD, + CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT)) + val parentDictInclude = parentTable.getTableInfo.getFactTable.getTableProperties.asScala + .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE, "").split(",") + + val parentDictExclude = parentTable.getTableInfo.getFactTable.getTableProperties.asScala + .getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE, "").split(",") + + val newDictInclude = + parentDictInclude.flatMap(parentcol => + fields.collect { + case col if fieldRelationMap(col).aggregateFunction.isEmpty && + fieldRelationMap(col).columnTableRelationList.size == 1 && + parentcol.equals(fieldRelationMap(col). + columnTableRelationList.get.head.parentColumnName) => + col.column + }) + + val newDictExclude = parentDictExclude.flatMap(parentcol => + fields.collect { + case col if fieldRelationMap(col).aggregateFunction.isEmpty && + fieldRelationMap(col).columnTableRelationList.size == 2 && + parentcol.equals(fieldRelationMap(col). + columnTableRelationList.get.head.parentColumnName) => + col.column + }) + if (newDictInclude.nonEmpty) { + tableProperties + .put(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE, newDictInclude.mkString(",")) + } + if (newDictExclude.nonEmpty) { + tableProperties + .put(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE, newDictExclude.mkString(",")) + } + } + + /** + * Return true if MV datamap present in the specified table + */ + @throws[IOException] + def hasMVDataMap(carbonTable: CarbonTable): Boolean = { + val dataMapSchemaList = DataMapStoreManager.getInstance. + getDataMapSchemasOfTable(carbonTable).asScala + dataMapSchemaList.foreach { dataMapSchema => + if (dataMapSchema.getProviderName.equalsIgnoreCase(MV.toString)) { + return true + } + } + false + } +}