This is an automated email from the ASF dual-hosted git repository. akashrn5 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 6f35e4f [CARBONDATA-3762] Block creating Materialized view's with duplicate column 6f35e4f is described below commit 6f35e4f294874adce9e14aef9d9726b2a196a157 Author: Indhumathi27 <indhumathi...@gmail.com> AuthorDate: Wed Apr 1 19:04:08 2020 +0530 [CARBONDATA-3762] Block creating Materialized view's with duplicate column Why is this PR needed? Currently, materialized views with duplicate column 1. On creation, we are taking distinct of Project columns. 2. Because of this, during load, data is inserted wrongly and query gives wrong results. Materilaized views can be mapped against queries having duplicate columns, without having duplicate columns in mv table. What changes were proposed in this PR? 1. Block creating materialized views with duplicate columns 2. Fix bug in matching mv against queries having duplicate columns. Does this PR introduce any user interface change? Yes. Is any new testcase added? Yes This closes #3690 --- .../org/apache/carbondata/view/MVHelper.scala | 45 +++++++++++++++++++- .../command/view/CarbonCreateMVCommand.scala | 2 - .../org/apache/spark/sql/optimizer/MVMatcher.scala | 49 +++++++++++++++++++--- .../carbondata/mv/rewrite/MVCreateTestCase.scala | 11 ++++- .../mv/rewrite/TestAllOperationsOnMV.scala | 45 +++++++++++++++++++- .../timeseries/TestMVTimeSeriesLoadAndQuery.scala | 8 +++- 6 files changed, 147 insertions(+), 13 deletions(-) diff --git a/integration/spark/src/main/scala/org/apache/carbondata/view/MVHelper.scala b/integration/spark/src/main/scala/org/apache/carbondata/view/MVHelper.scala index 76c4ec4..af6d276 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/view/MVHelper.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/view/MVHelper.scala @@ -17,8 +17,10 @@ package org.apache.carbondata.view +import java.util import java.util.concurrent.atomic.AtomicInteger +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -31,6 +33,7 @@ import org.apache.spark.sql.execution.command.Field import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.types.DataType +import org.apache.carbondata.common.exceptions.sql.MalformedMVCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.mv.plans.modular.{GroupBy, ModularPlan, ModularRelation, Select} import org.apache.carbondata.spark.util.CommonUtil @@ -175,8 +178,13 @@ object MVHelper { projectList: Seq[NamedExpression], fieldCounter: AtomicInteger): mutable.LinkedHashMap[Field, MVField] = { val fieldsMap = scala.collection.mutable.LinkedHashMap.empty[Field, MVField] + // map of qualified name with list of column names + val fieldColumnsMap = new util.HashMap[String, java.util.ArrayList[String]]() projectList.map { case reference: AttributeReference => + val columns = new util.ArrayList[String]() + columns.add(reference.qualifiedName) + findDuplicateColumns(fieldColumnsMap, reference.sql, columns, false) val relation = getRelation(relationList, reference) if (null != relation) { val relatedFields: ArrayBuffer[RelatedFieldWrapper] = @@ -205,7 +213,10 @@ object MVHelper { ) } - case Alias(reference: AttributeReference, name) => + case a@Alias(reference: AttributeReference, name) => + val columns = new util.ArrayList[String]() + columns.add(reference.qualifiedName) + findDuplicateColumns(fieldColumnsMap, a.sql, columns, true) val relation = getRelation(relationList, reference) if (null != relation) { val relatedFields: ArrayBuffer[RelatedFieldWrapper] = @@ -236,8 +247,10 @@ object MVHelper { } val relatedFields: ArrayBuffer[RelatedFieldWrapper] = new ArrayBuffer[RelatedFieldWrapper]() + val columns = new util.ArrayList[String]() alias.collect { case reference: AttributeReference => + columns.add(reference.qualifiedName) val relation = getRelation(relationList, reference) if (null != relation) { relatedFields += RelatedFieldWrapper( @@ -246,6 +259,7 @@ object MVHelper { reference.name) } } + findDuplicateColumns(fieldColumnsMap, alias.sql, columns, true) fieldsMap.put( newField( "", @@ -267,8 +281,10 @@ object MVHelper { } val relatedFields: ArrayBuffer[RelatedFieldWrapper] = new ArrayBuffer[RelatedFieldWrapper]() + val columns = new util.ArrayList[String]() alias.collect { case reference: AttributeReference => + columns.add(reference.qualifiedName) val relation = getRelation(relationList, reference) if (null != relation) { relatedFields += RelatedFieldWrapper( @@ -277,6 +293,7 @@ object MVHelper { relation.identifier.table) } } + findDuplicateColumns(fieldColumnsMap, alias.sql, columns, true) fieldsMap.put( newField( "", @@ -291,6 +308,32 @@ object MVHelper { fieldsMap } + private def findDuplicateColumns( + fieldColumnsMap: util.HashMap[String, util.ArrayList[String]], + columnName: String, + columns: util.ArrayList[String], + isAlias: Boolean): Unit = { + // get qualified name without alias name + val qualifiedName = if (isAlias) { + columnName.substring(0, columnName.indexOf(" AS")) + } else { + columnName + } + if (null == fieldColumnsMap.get(qualifiedName)) { + // case to check create mv with same column and different alias names + if (fieldColumnsMap.containsKey(qualifiedName)) { + throw new MalformedMVCommandException( + "Cannot create mv having duplicate column with different alias name: " + columnName) + } + fieldColumnsMap.put(qualifiedName, columns) + } else { + if (fieldColumnsMap.get(qualifiedName).containsAll(columns)) { + throw new MalformedMVCommandException( + "Cannot create mv with duplicate column: " + columnName) + } + } + } + /** * Return the catalog table after matching the attr in logicalRelation */ diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala index b5430ce..12f1b27 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala @@ -154,8 +154,6 @@ case class CarbonCreateMVCommand( val logicalPlan = MVHelper.dropDummyFunction( MVQueryParser.getQueryPlan(queryString, session)) val modularPlan = checkQuery(session, logicalPlan) - // the ctas query can have duplicate columns, so we should take distinct and create fields, - // so that it won't fail during create mv table val viewSchema = getOutputSchema(logicalPlan) val relatedTables = getRelatedTables(logicalPlan) val relatedTableList = toCarbonTables(session, relatedTables) diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala index 29471e3..9003627 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala @@ -1258,11 +1258,12 @@ private object SelectSelectGroupbyChildDelta Some(gb_2c@modular.GroupBy( _, _, _, _, sel_2c@modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _)) ) => + val distinctGrpByOList = getDistinctOutputList(gb_2q.outputList) if (sel_3q.predicateList.contains(exprE)) { val expr1E = exprE.transform { case attr: Attribute => gb_2c.outputList.lift( - gb_2q.outputList.indexWhere { + distinctGrpByOList.indexWhere { case alias: Alias if alias.toAttribute.semanticEquals(attr) => true; case _ => false }).getOrElse { attr } @@ -1277,14 +1278,14 @@ private object SelectSelectGroupbyChildDelta exprE match { case attr: Attribute => // this subexpression must in subsumee select output list gb_2c.outputList.lift( - gb_2q.outputList.indexWhere { + distinctGrpByOList.indexWhere { case a if a.toAttribute.semanticEquals(attr) => true; case _ => false }) case alias: Alias => gb_2c.outputList.lift( - gb_2q.outputList.indexWhere { + distinctGrpByOList.indexWhere { case a if a.toAttribute.semanticEquals(alias.toAttribute) => true; case _ => false }) @@ -1342,6 +1343,40 @@ private object SelectSelectGroupbyChildDelta } } + /** + * Removes duplicate projection in the output list for query matching + */ + def getDistinctOutputList(outputList: Seq[NamedExpression]): Seq[NamedExpression] = { + var distinctOList: Seq[NamedExpression] = Seq.empty + outputList.foreach { output => + if (distinctOList.isEmpty) { + distinctOList = distinctOList :+ output + } else { + // get output name + var outputName = output.name + if (output.isInstanceOf[Alias]) { + // In case of queries with join on more than one table and projection list having + // aggregation of same column name on join tables like sum(t1.column), sum(t2.column), + // in that case, compare alias name with column id, as alias name will be same for + // both output(sum(t1)) + val projectName = output.simpleString + outputName = projectName.substring(0, projectName.indexOf(" AS")) + } + if (!distinctOList.exists(distinctOutput => + if (distinctOutput.isInstanceOf[Alias]) { + val projectName = distinctOutput.simpleString + val aliasName = projectName.substring(0, projectName.indexOf(" AS")) + aliasName.equalsIgnoreCase(outputName) + } else { + distinctOutput.qualifiedName.equalsIgnoreCase(output.qualifiedName) + })) { + distinctOList = distinctOList :+ output + } + } + } + distinctOList + } + def apply( subsumer: ModularPlan, subsumee: ModularPlan, @@ -1352,14 +1387,16 @@ private object SelectSelectGroupbyChildDelta sel_3a@modular.Select( _, _, Nil, _, _, Seq(_@modular.GroupBy(_, _, _, _, _, _, _, _)), _, _, _, _), - sel_3q@modular.Select( + sel_3q_dup@modular.Select( _, _, _, _, _, Seq(_@modular.GroupBy(_, _, _, _, _, _, _, _)), _, _, _, _), Some(gb_2c@modular.GroupBy(_, _, _, _, _, _, _, _)), _ :: Nil, _ :: Nil) => val tbls_sel_3a = sel_3a.collect { case tbl: modular.LeafNode => tbl } - val tbls_sel_3q = sel_3q.collect { case tbl: modular.LeafNode => tbl } + val tbls_sel_3q = sel_3q_dup.collect { case tbl: modular.LeafNode => tbl } + val distinctSelOList = getDistinctOutputList(sel_3q_dup.outputList) + val sel_3q = sel_3q_dup.copy(outputList = distinctSelOList) val extrajoin = tbls_sel_3a.filterNot(tbls_sel_3q.contains) val rejoin = tbls_sel_3q.filterNot(tbls_sel_3a.contains) @@ -1423,7 +1460,7 @@ private object SelectSelectGroupbyChildDelta val aliasMap_exp = AttributeMap( gb_2c.outputList.collect { case a: Alias => (a.toAttribute, AliasWrapper(a)) }) - val sel_3q_exp = sel_3q.transformExpressions({ + val sel_3q_exp = sel_3q_dup.transformExpressions({ case attr: Attribute if aliasMap_exp.contains(attr) => aliasMap_exp(attr) }).transformExpressions { case AliasWrapper(alias: Alias) => alias diff --git a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala index 02c3cfd..8f9489a 100644 --- a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala +++ b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.{CarbonEnv, Row} import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll +import org.apache.carbondata.common.exceptions.sql.MalformedMVCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.util.CarbonProperties @@ -1175,8 +1176,14 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll { CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "true") sql("drop table if exists maintable") sql("create table maintable(name string, age int, add string) STORED AS carbondata") - sql("create materialized view dupli_mv as select name, sum(age),sum(age) from maintable group by name") - sql("create materialized view dupli_projection as select age, age,add from maintable") + intercept[MalformedMVCommandException] { + sql("create materialized view dupli_mv as select name, sum(age),sum(age) from maintable group by name") + }.getMessage.contains("Cannot create mv with duplicate column: sum(maintable.age)") + sql("create materialized view dupli_mv as select name, sum(age) from maintable group by name") + intercept[MalformedMVCommandException] { + sql("create materialized view dupli_projection as select age, age,add from maintable") + }.getMessage.contains("Cannot create mv with duplicate column: maintable.age") + sql("create materialized view dupli_projection as select age,add from maintable") sql("create materialized view constant_mv as select name, sum(1) ex1 from maintable group by name") sql("insert into maintable select 'pheobe',31,'NY'") val df1 = sql("select sum(age),name from maintable group by name") diff --git a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala index 8b7a082..3ed7d43 100644 --- a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala +++ b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterEach -import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedMVCommandException} import org.apache.carbondata.core.cache.CacheProvider import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory @@ -627,6 +627,49 @@ class TestAllOperationsOnMV extends QueryTest with BeforeAndAfterEach { sql("drop table IF EXISTS maintable") } + test("test duplicate column name in mv") { + sql("drop table IF EXISTS maintable") + sql("create table maintable(name string, c_code int, price int) STORED AS carbondata") + sql("insert into table maintable values('abc',21,2000),('mno',24,3000)") + sql("drop materialized view if exists mv1") + val res1 = sql("select name,sum(c_code) from maintable group by name") + val res2 = sql("select name, name,sum(c_code),sum(c_code) from maintable group by name") + val res3 = sql("select c_code,price from maintable") + sql("create materialized view mv1 as select name,sum(c_code) from maintable group by name") + val df1 = sql("select name,sum(c_code) from maintable group by name") + TestUtil.verifyMVDataMap(df1.queryExecution.optimizedPlan, "mv1") + checkAnswer(res1, df1) + val df2 = sql("select name, name,sum(c_code),sum(c_code) from maintable group by name") + TestUtil.verifyMVDataMap(df2.queryExecution.optimizedPlan, "mv1") + checkAnswer(df2, res2) + sql("drop materialized view if exists mv2") + sql("create materialized view mv2 as select c_code,price from maintable") + val df3 = sql("select c_code,price from maintable") + TestUtil.verifyMVDataMap(df3.queryExecution.optimizedPlan, "mv2") + checkAnswer(res3, df3) + val df4 = sql("select c_code,price,price,c_code from maintable") + TestUtil.verifyMVDataMap(df4.queryExecution.optimizedPlan, "mv2") + checkAnswer(df4, Seq(Row(21,2000,2000,21), Row(24,3000,3000,24))) + sql("drop table IF EXISTS maintable") + } + + test("test duplicate column with different alias name") { + sql("drop table IF EXISTS maintable") + sql("create table maintable(name string, c_code int, price int) STORED AS carbondata") + sql("insert into table maintable values('abc',21,2000),('mno',24,3000)") + sql("drop materialized view if exists mv1") + intercept[MalformedMVCommandException] { + sql("create materialized view mv1 as select name,sum(c_code),sum(c_code) as a from maintable group by name") + }.getMessage.contains("Cannot create mv having duplicate column with different alias name: sum(CAST(maintable.`c_code` AS BIGINT)) AS `a`") + intercept[MalformedMVCommandException] { + sql("create materialized view mv1 as select name,name as a from maintable") + }.getMessage.contains("Cannot create mv having duplicate column with different alias name: maintable.`name` AS `a`") + intercept[MalformedMVCommandException] { + sql("create materialized view mv1 as select name as a,name as b from maintable") + }.getMessage.contains("Cannot create mv having duplicate column with different alias name: maintable.`name` AS `b`") + sql("drop table IF EXISTS maintable") + } + test("drop meta cache on mv materialized view table") { defaultConfig() sql("drop table IF EXISTS maintable") diff --git a/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala b/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala index c0d1b6e..fa05268 100644 --- a/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala +++ b/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.DataFrame import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll +import org.apache.carbondata.common.exceptions.sql.MalformedMVCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.mv.rewrite.TestUtil @@ -290,9 +291,14 @@ class TestMVTimeSeriesLoadAndQuery extends QueryTest with BeforeAndAfterAll { CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "true") dropDataMap("datamap1") + intercept[MalformedMVCommandException] { + sql( + "create materialized view datamap1 as " + + "select timeseries(projectjoindate,'month') ,sum(projectcode),sum(projectcode) from maintable group by timeseries(projectjoindate,'month')") + } sql( "create materialized view datamap1 as " + - "select timeseries(projectjoindate,'month') ,sum(projectcode),sum(projectcode) from maintable group by timeseries(projectjoindate,'month')") + "select timeseries(projectjoindate,'month') ,sum(projectcode) from maintable group by timeseries(projectjoindate,'month')") loadData("maintable") val df1 = sql("select timeseries(projectjoindate,'month') ,sum(projectcode) from maintable group by timeseries(projectjoindate,'month')") checkPlan("datamap1", df1)