This is an automated email from the ASF dual-hosted git repository. jackylk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new c31a809 [CARBONDATA-3715]Fix Timeseries Query Rollup failure for timeseries column of Date type c31a809 is described below commit c31a809ba26b9f8cd7d84d50d220d750e3994c24 Author: Indhumathi27 <indhumathi...@gmail.com> AuthorDate: Fri Feb 21 11:29:32 2020 +0530 [CARBONDATA-3715]Fix Timeseries Query Rollup failure for timeseries column of Date type Why is this PR needed? Issue 1: Timeseries query with timeseries column as date,is throwing parsing exception after rollup, because while forming sql for cast expression, it is taking wrong attribute name Issue 2: to_date() function has case sensitive issues while rewriting the plan What changes were proposed in this PR? Issue 1: If query is rolled up for date, then take attribute name for forming sql for cast expression. Issue 2: Convert cast expression child to lower case during rewrite Does this PR introduce any user interface change? No Is any new testcase added? Yes This closes #3630 --- .../carbondata/mv/extension/MVDataMapProvider.scala | 2 -- .../carbondata/mv/rewrite/DefaultMatchMaker.scala | 4 ++++ .../org/apache/carbondata/mv/rewrite/Utils.scala | 17 +++++++++++++++++ .../carbondata/mv/rewrite/TestAllOperationsOnMV.scala | 6 ++++++ .../mv/timeseries/TestMVTimeSeriesQueryRollUp.scala | 19 +++++++++++++++++++ .../apache/carbondata/mv/plans/util/Printers.scala | 9 +++++++++ 6 files changed, 55 insertions(+), 2 deletions(-) diff --git a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVDataMapProvider.scala b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVDataMapProvider.scala index 5b400b2..5d5da68 100644 --- a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVDataMapProvider.scala +++ b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVDataMapProvider.scala @@ -23,8 +23,6 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.{CarbonUtils, SparkSession} import org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand -import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil -import org.apache.spark.sql.util.SparkSQLUtil import org.apache.carbondata.common.annotations.InterfaceAudience import org.apache.carbondata.common.exceptions.sql.MalformedMaterializedViewException diff --git a/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala b/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala index 3646ae2..e72ce94 100644 --- a/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala +++ b/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala @@ -62,6 +62,8 @@ abstract class DefaultMatchPattern extends MatchPattern[ModularPlan] { a.child match { case s: ScalaUDF if s.function.isInstanceOf[TimeSeriesFunction] => Utils.getTransformedTimeSeriesUDF(s) -> a.toAttribute + case cast: Cast if cast.child.isInstanceOf[AttributeReference] => + Utils.getTransformedCastExpression(cast) -> a.toAttribute case _ => a.child -> a.toAttribute } @@ -89,6 +91,8 @@ abstract class DefaultMatchPattern extends MatchPattern[ModularPlan] { val newExp = a transform { case s: ScalaUDF if s.function.isInstanceOf[TimeSeriesFunction] => Utils.getTransformedTimeSeriesUDF(s) + case cast: Cast if cast.child.isInstanceOf[AttributeReference] => + Utils.getTransformedCastExpression(cast) } attribute = aliasMapExp.get(newExp) } diff --git a/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala b/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala index 716afbc..fdbad3a 100644 --- a/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala +++ b/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala @@ -17,6 +17,7 @@ package org.apache.carbondata.mv.rewrite +import org.apache.spark.sql.CarbonToSparkAdapter import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, Cast, Divide, Expression, Literal, Multiply, NamedExpression, PredicateHelper, ScalaUDF} import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project} @@ -591,6 +592,22 @@ object Utils extends PredicateHelper { } /** + * transform cast expression to change it's child attribute reference name to lower case + */ + def getTransformedCastExpression(cast: Cast): Expression = { + cast.transform { + case attr: AttributeReference => + CarbonToSparkAdapter.createAttributeReference( + attr.name.toLowerCase, + attr.dataType, + attr.nullable, + attr.metadata, + attr.exprId, + attr.qualifier) + } + } + + /** * Check if expr1 and expr2 matches TimeSeriesUDF function. If both expressions are * timeseries udf functions, then check it's childrens are same irrespective of case. */ diff --git a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala index a8c9af5..66b0c38 100644 --- a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala +++ b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala @@ -423,6 +423,12 @@ class TestAllOperationsOnMV extends QueryTest with BeforeAndAfterEach { sql("drop materialized view if exists dm ") sql("create materialized view dm as select max(to_date(dob)) , min(to_date(dob)) from maintable where to_date(dob)='1975-06-11' or to_date(dob)='1975-06-23'") checkExistence(sql("select max(to_date(dob)) , min(to_date(dob)) from maintable where to_date(dob)='1975-06-11' or to_date(dob)='1975-06-23'"), true, "1975-06-11 1975-06-11") + sql("drop materialized view if exists dm2 ") + sql("create materialized view dm2 as select to_date(dob) from maintable where CUST_ID IS NULL or DOB IS NOT NULL or BIGINT_COLUMN1 =120 or DECIMAL_COLUMN1 = 4.34 or Double_COLUMN1 =12345 or INTEGER_COLUMN1 IS NULL") + checkExistence(sql("select to_date(DOB) from maintable where CUST_ID IS NULL or DOB IS NOT NULL or BIGINT_COLUMN1 =120 or DECIMAL_COLUMN1 = 4.34 or Double_COLUMN1 =12345 or INTEGER_COLUMN1 IS NULL"), true, "1975-06-11") + val df = sql("select to_date(DOB) from maintable where CUST_ID IS NULL or DOB IS NOT NULL or BIGINT_COLUMN1 =120 or DECIMAL_COLUMN1 = 4.34 or Double_COLUMN1 =12345 or INTEGER_COLUMN1 IS NULL") + TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "dm2") + sql("drop table IF EXISTS maintable") } test("test preagg and mv") { diff --git a/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesQueryRollUp.scala b/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesQueryRollUp.scala index 71c0286..5ea4ae3 100644 --- a/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesQueryRollUp.scala +++ b/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesQueryRollUp.scala @@ -261,6 +261,25 @@ sql("drop materialized view if exists datamap2") } + test("test rollup for timeseries column of Date type") { + drop() + sql("CREATE TABLE maintable (empno int,empname string, projectcode int, projectjoindate " + + "date,salary double) STORED AS CARBONDATA") + sql("insert into maintable select 11,'joey',2,'2016-02-23',300") + sql("insert into maintable select 13,'pheobe',1,'2016-02-23',450") + sql("insert into maintable select 22,'cathy',1,'2016-02-25',450.5") + sql("drop materialized view if exists datamap1") + val result = sql("select timeseries(projectjoindate,'week'),sum(projectcode) from maintable group by timeseries(projectjoindate,'week')") + sql("create materialized view datamap1 as select timeseries(projectjoindate,'day'),sum(projectcode) from maintable group by timeseries(projectjoindate,'day')") + val dayDF= sql("select timeseries(projectjoindate,'day'),sum(projectcode) from maintable group by timeseries(projectjoindate,'day')") + assert(TestUtil.verifyMVDataMap(dayDF.queryExecution.optimizedPlan, "datamap1")) + val weekDF = sql("select timeseries(projectjoindate,'week'),sum(projectcode) from maintable group by timeseries(projectjoindate,'week')") + assert(TestUtil.verifyMVDataMap(weekDF.queryExecution.optimizedPlan, "datamap1")) + checkAnswer(result, weekDF) + sql("drop materialized view if exists datamap1") + drop() + } + def drop(): Unit = { sql("drop table if exists maintable") } diff --git a/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala b/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala index 9112005..af8d184 100644 --- a/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala +++ b/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala @@ -362,6 +362,15 @@ trait Printers { def formatExpressionsInUDF(exp: Seq[Expression]): String = { val result = exp.map { + case cast: Cast => + // for rolledUp queries of timeseries column with Date type, make + // Cast sql with attribute name + cast.child match { + case attr: AttributeReference if attr.name.startsWith("gen_subsumer_") => + s"CAST(${ attr.name } AS ${ cast.dataType.sql })" + case _ => + cast.sql + } case attr: AttributeReference => if (attr.name.startsWith("gen_subsumer_")) { attr.name