This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f35e4f  [CARBONDATA-3762] Block creating Materialized view's with 
duplicate column
6f35e4f is described below

commit 6f35e4f294874adce9e14aef9d9726b2a196a157
Author: Indhumathi27 <indhumathi...@gmail.com>
AuthorDate: Wed Apr 1 19:04:08 2020 +0530

    [CARBONDATA-3762] Block creating Materialized view's with duplicate column
    
    Why is this PR needed?
    Currently, materialized views with duplicate column
    1. On creation, we are taking distinct of Project columns.
    2. Because of this, during load, data is inserted wrongly and query gives 
wrong results.
    
    Materilaized views can be mapped against queries having duplicate columns, 
without having duplicate columns in mv table.
    
    What changes were proposed in this PR?
    1. Block creating materialized views with duplicate columns
    2. Fix bug in matching mv against queries having duplicate columns.
    
    Does this PR introduce any user interface change?
    Yes.
    
    Is any new testcase added?
    Yes
    
    This closes #3690
---
 .../org/apache/carbondata/view/MVHelper.scala      | 45 +++++++++++++++++++-
 .../command/view/CarbonCreateMVCommand.scala       |  2 -
 .../org/apache/spark/sql/optimizer/MVMatcher.scala | 49 +++++++++++++++++++---
 .../carbondata/mv/rewrite/MVCreateTestCase.scala   | 11 ++++-
 .../mv/rewrite/TestAllOperationsOnMV.scala         | 45 +++++++++++++++++++-
 .../timeseries/TestMVTimeSeriesLoadAndQuery.scala  |  8 +++-
 6 files changed, 147 insertions(+), 13 deletions(-)

diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/view/MVHelper.scala 
b/integration/spark/src/main/scala/org/apache/carbondata/view/MVHelper.scala
index 76c4ec4..af6d276 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/view/MVHelper.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/view/MVHelper.scala
@@ -17,8 +17,10 @@
 
 package org.apache.carbondata.view
 
+import java.util
 import java.util.concurrent.atomic.AtomicInteger
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
@@ -31,6 +33,7 @@ import org.apache.spark.sql.execution.command.Field
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.types.DataType
 
+import org.apache.carbondata.common.exceptions.sql.MalformedMVCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.mv.plans.modular.{GroupBy, ModularPlan, 
ModularRelation, Select}
 import org.apache.carbondata.spark.util.CommonUtil
@@ -175,8 +178,13 @@ object MVHelper {
       projectList: Seq[NamedExpression],
       fieldCounter: AtomicInteger): mutable.LinkedHashMap[Field, MVField] = {
     val fieldsMap = scala.collection.mutable.LinkedHashMap.empty[Field, 
MVField]
+    // map of qualified name with list of column names
+    val fieldColumnsMap = new util.HashMap[String, 
java.util.ArrayList[String]]()
     projectList.map {
       case reference: AttributeReference =>
+        val columns = new util.ArrayList[String]()
+        columns.add(reference.qualifiedName)
+        findDuplicateColumns(fieldColumnsMap, reference.sql, columns, false)
         val relation = getRelation(relationList, reference)
         if (null != relation) {
           val relatedFields: ArrayBuffer[RelatedFieldWrapper] =
@@ -205,7 +213,10 @@ object MVHelper {
           )
         }
 
-      case Alias(reference: AttributeReference, name) =>
+      case a@Alias(reference: AttributeReference, name) =>
+        val columns = new util.ArrayList[String]()
+        columns.add(reference.qualifiedName)
+        findDuplicateColumns(fieldColumnsMap, a.sql, columns, true)
         val relation = getRelation(relationList, reference)
         if (null != relation) {
           val relatedFields: ArrayBuffer[RelatedFieldWrapper] =
@@ -236,8 +247,10 @@ object MVHelper {
         }
         val relatedFields: ArrayBuffer[RelatedFieldWrapper] =
           new ArrayBuffer[RelatedFieldWrapper]()
+        val columns = new util.ArrayList[String]()
         alias.collect {
           case reference: AttributeReference =>
+            columns.add(reference.qualifiedName)
             val relation = getRelation(relationList, reference)
             if (null != relation) {
               relatedFields += RelatedFieldWrapper(
@@ -246,6 +259,7 @@ object MVHelper {
                 reference.name)
             }
         }
+       findDuplicateColumns(fieldColumnsMap, alias.sql, columns, true)
         fieldsMap.put(
           newField(
             "",
@@ -267,8 +281,10 @@ object MVHelper {
         }
         val relatedFields: ArrayBuffer[RelatedFieldWrapper] =
           new ArrayBuffer[RelatedFieldWrapper]()
+        val columns = new util.ArrayList[String]()
         alias.collect {
           case reference: AttributeReference =>
+            columns.add(reference.qualifiedName)
             val relation = getRelation(relationList, reference)
             if (null != relation) {
               relatedFields += RelatedFieldWrapper(
@@ -277,6 +293,7 @@ object MVHelper {
                 relation.identifier.table)
             }
         }
+        findDuplicateColumns(fieldColumnsMap, alias.sql, columns, true)
         fieldsMap.put(
           newField(
             "",
@@ -291,6 +308,32 @@ object MVHelper {
     fieldsMap
   }
 
+  private def findDuplicateColumns(
+      fieldColumnsMap: util.HashMap[String, util.ArrayList[String]],
+      columnName: String,
+      columns: util.ArrayList[String],
+      isAlias: Boolean): Unit = {
+    // get qualified name without alias name
+    val qualifiedName = if (isAlias) {
+      columnName.substring(0, columnName.indexOf(" AS"))
+    } else {
+      columnName
+    }
+    if (null == fieldColumnsMap.get(qualifiedName)) {
+      // case to check create mv with same column and different alias names
+      if (fieldColumnsMap.containsKey(qualifiedName)) {
+        throw new MalformedMVCommandException(
+          "Cannot create mv having duplicate column with different alias name: 
" + columnName)
+      }
+      fieldColumnsMap.put(qualifiedName, columns)
+    } else {
+      if (fieldColumnsMap.get(qualifiedName).containsAll(columns)) {
+        throw new MalformedMVCommandException(
+          "Cannot create mv with duplicate column: " + columnName)
+      }
+    }
+  }
+
   /**
    * Return the catalog table after matching the attr in logicalRelation
    */
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
index b5430ce..12f1b27 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
@@ -154,8 +154,6 @@ case class CarbonCreateMVCommand(
     val logicalPlan = MVHelper.dropDummyFunction(
       MVQueryParser.getQueryPlan(queryString, session))
     val modularPlan = checkQuery(session, logicalPlan)
-    // the ctas query can have duplicate columns, so we should take distinct 
and create fields,
-    // so that it won't fail during create mv table
     val viewSchema = getOutputSchema(logicalPlan)
     val relatedTables = getRelatedTables(logicalPlan)
     val relatedTableList = toCarbonTables(session, relatedTables)
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala
index 29471e3..9003627 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala
@@ -1258,11 +1258,12 @@ private object SelectSelectGroupbyChildDelta
         Some(gb_2c@modular.GroupBy(
         _, _, _, _, sel_2c@modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, 
_))
         ) =>
+        val distinctGrpByOList = getDistinctOutputList(gb_2q.outputList)
         if (sel_3q.predicateList.contains(exprE)) {
           val expr1E = exprE.transform {
             case attr: Attribute =>
               gb_2c.outputList.lift(
-                gb_2q.outputList.indexWhere {
+                distinctGrpByOList.indexWhere {
                   case alias: Alias if alias.toAttribute.semanticEquals(attr) 
=> true;
                   case _ => false
                 }).getOrElse { attr }
@@ -1277,14 +1278,14 @@ private object SelectSelectGroupbyChildDelta
           exprE match {
             case attr: Attribute => // this subexpression must in subsumee 
select output list
               gb_2c.outputList.lift(
-                gb_2q.outputList.indexWhere {
+                distinctGrpByOList.indexWhere {
                   case a if a.toAttribute.semanticEquals(attr) => true;
                   case _ => false
                 })
 
             case alias: Alias =>
               gb_2c.outputList.lift(
-                gb_2q.outputList.indexWhere {
+                distinctGrpByOList.indexWhere {
                   case a if a.toAttribute.semanticEquals(alias.toAttribute) => 
true;
                   case _ => false
                 })
@@ -1342,6 +1343,40 @@ private object SelectSelectGroupbyChildDelta
     }
   }
 
+  /**
+   * Removes duplicate projection in the output list for query matching
+   */
+  def getDistinctOutputList(outputList: Seq[NamedExpression]): 
Seq[NamedExpression] = {
+    var distinctOList: Seq[NamedExpression] = Seq.empty
+    outputList.foreach { output =>
+      if (distinctOList.isEmpty) {
+        distinctOList = distinctOList :+ output
+      } else {
+        // get output name
+        var outputName = output.name
+        if (output.isInstanceOf[Alias]) {
+          // In case of queries with join on more than one table and 
projection list having
+          // aggregation of same column name on join tables like 
sum(t1.column), sum(t2.column),
+          // in that case, compare alias name with column id, as alias name 
will be same for
+          // both output(sum(t1))
+          val projectName = output.simpleString
+          outputName = projectName.substring(0, projectName.indexOf(" AS"))
+        }
+        if (!distinctOList.exists(distinctOutput =>
+          if (distinctOutput.isInstanceOf[Alias]) {
+            val projectName = distinctOutput.simpleString
+            val aliasName = projectName.substring(0, projectName.indexOf(" 
AS"))
+            aliasName.equalsIgnoreCase(outputName)
+          } else {
+            distinctOutput.qualifiedName.equalsIgnoreCase(output.qualifiedName)
+          })) {
+          distinctOList = distinctOList :+ output
+        }
+      }
+    }
+    distinctOList
+  }
+
   def apply(
       subsumer: ModularPlan,
       subsumee: ModularPlan,
@@ -1352,14 +1387,16 @@ private object SelectSelectGroupbyChildDelta
         sel_3a@modular.Select(
         _, _, Nil, _, _,
         Seq(_@modular.GroupBy(_, _, _, _, _, _, _, _)), _, _, _, _),
-        sel_3q@modular.Select(
+        sel_3q_dup@modular.Select(
         _, _, _, _, _,
         Seq(_@modular.GroupBy(_, _, _, _, _, _, _, _)), _, _, _, _),
         Some(gb_2c@modular.GroupBy(_, _, _, _, _, _, _, _)),
         _ :: Nil,
         _ :: Nil) =>
         val tbls_sel_3a = sel_3a.collect { case tbl: modular.LeafNode => tbl }
-        val tbls_sel_3q = sel_3q.collect { case tbl: modular.LeafNode => tbl }
+        val tbls_sel_3q = sel_3q_dup.collect { case tbl: modular.LeafNode => 
tbl }
+        val distinctSelOList = getDistinctOutputList(sel_3q_dup.outputList)
+        val sel_3q = sel_3q_dup.copy(outputList = distinctSelOList)
 
         val extrajoin = tbls_sel_3a.filterNot(tbls_sel_3q.contains)
         val rejoin = tbls_sel_3q.filterNot(tbls_sel_3a.contains)
@@ -1423,7 +1460,7 @@ private object SelectSelectGroupbyChildDelta
               val aliasMap_exp = AttributeMap(
                 gb_2c.outputList.collect {
                   case a: Alias => (a.toAttribute, AliasWrapper(a)) })
-              val sel_3q_exp = sel_3q.transformExpressions({
+              val sel_3q_exp = sel_3q_dup.transformExpressions({
                 case attr: Attribute if aliasMap_exp.contains(attr) => 
aliasMap_exp(attr)
               }).transformExpressions {
                 case AliasWrapper(alias: Alias) => alias
diff --git 
a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
 
b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
index 02c3cfd..8f9489a 100644
--- 
a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
+++ 
b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.{CarbonEnv, Row}
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.common.exceptions.sql.MalformedMVCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
@@ -1175,8 +1176,14 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT,
 "true")
     sql("drop table if exists maintable")
     sql("create table maintable(name string, age int, add string) STORED AS 
carbondata")
-    sql("create materialized view dupli_mv as select name, sum(age),sum(age) 
from maintable group by name")
-    sql("create materialized view dupli_projection as select age, age,add from 
maintable")
+    intercept[MalformedMVCommandException] {
+      sql("create materialized view dupli_mv as select name, sum(age),sum(age) 
from maintable group by name")
+    }.getMessage.contains("Cannot create mv with duplicate column: 
sum(maintable.age)")
+    sql("create materialized view dupli_mv as select name, sum(age) from 
maintable group by name")
+    intercept[MalformedMVCommandException] {
+      sql("create materialized view dupli_projection as select age, age,add 
from maintable")
+    }.getMessage.contains("Cannot create mv with duplicate column: 
maintable.age")
+    sql("create materialized view dupli_projection as select age,add from 
maintable")
     sql("create materialized view constant_mv as select name, sum(1) ex1 from 
maintable group by name")
     sql("insert into maintable select 'pheobe',31,'NY'")
     val df1 = sql("select sum(age),name from maintable group by name")
diff --git 
a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
 
b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
index 8b7a082..3ed7d43 100644
--- 
a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
+++ 
b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterEach
 
-import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import 
org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, 
MalformedMVCommandException}
 import org.apache.carbondata.core.cache.CacheProvider
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -627,6 +627,49 @@ class TestAllOperationsOnMV extends QueryTest with 
BeforeAndAfterEach {
     sql("drop table IF EXISTS maintable")
   }
 
+  test("test duplicate column name in mv") {
+    sql("drop table IF EXISTS maintable")
+    sql("create table maintable(name string, c_code int, price int) STORED AS 
carbondata")
+    sql("insert into table maintable values('abc',21,2000),('mno',24,3000)")
+    sql("drop materialized view if exists mv1")
+    val res1 = sql("select name,sum(c_code) from maintable group by name")
+    val res2 = sql("select name, name,sum(c_code),sum(c_code) from maintable  
group by name")
+    val res3 = sql("select c_code,price from maintable")
+    sql("create materialized view mv1 as select name,sum(c_code) from 
maintable group by name")
+    val df1 = sql("select name,sum(c_code) from maintable group by name")
+    TestUtil.verifyMVDataMap(df1.queryExecution.optimizedPlan, "mv1")
+    checkAnswer(res1, df1)
+    val df2 = sql("select name, name,sum(c_code),sum(c_code) from maintable  
group by name")
+    TestUtil.verifyMVDataMap(df2.queryExecution.optimizedPlan, "mv1")
+    checkAnswer(df2, res2)
+    sql("drop materialized view if exists mv2")
+    sql("create materialized view mv2 as select c_code,price from maintable")
+    val df3 = sql("select c_code,price from maintable")
+    TestUtil.verifyMVDataMap(df3.queryExecution.optimizedPlan, "mv2")
+    checkAnswer(res3, df3)
+    val df4 = sql("select c_code,price,price,c_code from maintable")
+    TestUtil.verifyMVDataMap(df4.queryExecution.optimizedPlan, "mv2")
+    checkAnswer(df4, Seq(Row(21,2000,2000,21), Row(24,3000,3000,24)))
+    sql("drop table IF EXISTS maintable")
+  }
+
+  test("test duplicate column with different alias name") {
+    sql("drop table IF EXISTS maintable")
+    sql("create table maintable(name string, c_code int, price int) STORED AS 
carbondata")
+    sql("insert into table maintable values('abc',21,2000),('mno',24,3000)")
+    sql("drop materialized view if exists mv1")
+    intercept[MalformedMVCommandException] {
+      sql("create materialized view mv1 as select name,sum(c_code),sum(c_code) 
as a from maintable group by name")
+    }.getMessage.contains("Cannot create mv having duplicate column with 
different alias name: sum(CAST(maintable.`c_code` AS BIGINT)) AS `a`")
+    intercept[MalformedMVCommandException] {
+      sql("create materialized view mv1 as select name,name as a from 
maintable")
+    }.getMessage.contains("Cannot create mv having duplicate column with 
different alias name: maintable.`name` AS `a`")
+    intercept[MalformedMVCommandException] {
+      sql("create materialized view mv1 as select name as a,name as b from 
maintable")
+    }.getMessage.contains("Cannot create mv having duplicate column with 
different alias name: maintable.`name` AS `b`")
+    sql("drop table IF EXISTS maintable")
+  }
+
   test("drop meta cache on mv materialized view table") {
     defaultConfig()
     sql("drop table IF EXISTS maintable")
diff --git 
a/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
 
b/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
index c0d1b6e..fa05268 100644
--- 
a/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
+++ 
b/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
@@ -21,6 +21,7 @@ import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.common.exceptions.sql.MalformedMVCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.mv.rewrite.TestUtil
@@ -290,9 +291,14 @@ class TestMVTimeSeriesLoadAndQuery extends QueryTest with 
BeforeAndAfterAll {
     CarbonProperties.getInstance()
       
.addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT,
 "true")
     dropDataMap("datamap1")
+    intercept[MalformedMVCommandException] {
+      sql(
+        "create materialized view datamap1 as " +
+        "select timeseries(projectjoindate,'month') 
,sum(projectcode),sum(projectcode)  from maintable group by 
timeseries(projectjoindate,'month')")
+    }
     sql(
       "create materialized view datamap1 as " +
-      "select timeseries(projectjoindate,'month') 
,sum(projectcode),sum(projectcode)  from maintable group by 
timeseries(projectjoindate,'month')")
+      "select timeseries(projectjoindate,'month') ,sum(projectcode)  from 
maintable group by timeseries(projectjoindate,'month')")
     loadData("maintable")
     val df1 = sql("select timeseries(projectjoindate,'month') 
,sum(projectcode)  from maintable group by timeseries(projectjoindate,'month')")
     checkPlan("datamap1", df1)

Reply via email to