This is an automated email from the ASF dual-hosted git repository. ravipesala pushed a commit to branch branch-1.6 in repository https://gitbox.apache.org/repos/asf/carbondata.git
commit 6f90b28dd3d3f008f33668884807cc63cb6b5db5 Author: ajantha-bhat <ajanthab...@gmail.com> AuthorDate: Wed Aug 14 20:36:13 2019 +0530 [CARBONDATA-3452] dictionary include udf handle all the scenarios Problem: select query failure when substring on dictionary column with join. Cause: when dictionary include is present, data type is updated to int from string in plan attribute. so substring was unresolved on int column. Join operation try to reference this attribute which is unresolved. Solution: Need to handle this for all the scenarios in CarbonLateDecodeRule This closes #3358 --- .../hadoop/api/CarbonTableOutputFormat.java | 5 +- .../spark/sql/optimizer/CarbonLateDecodeRule.scala | 141 ++++++++++++++------- .../carbondata/query/SubQueryJoinTestSuite.scala | 19 +++ .../processing/util/CarbonDataProcessorUtil.java | 5 +- 4 files changed, 120 insertions(+), 50 deletions(-) diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java index 9ba5e97..16703bf 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java @@ -19,6 +19,7 @@ package org.apache.carbondata.hadoop.api; import java.io.IOException; import java.util.List; +import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -221,8 +222,8 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje return (String[]) ObjectSerializationUtil.convertStringToObject(encodedString); } return new String[] { - System.getProperty("java.io.tmpdir") + "/" + System.nanoTime() + "_" + taskAttemptContext - .getTaskAttemptID().toString() }; + System.getProperty("java.io.tmpdir") + "/" + UUID.randomUUID().toString().replace("-", "") + + "_" + taskAttemptContext.getTaskAttemptID().toString() }; } @Override diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala index 961bf11..99a8e70 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala @@ -619,6 +619,21 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { } } + private def needDataTypeUpdate(exp: Expression): Boolean = { + var needChangeDatatype: Boolean = true + exp.transform { + case attr: AttributeReference => attr + case a@Alias(attr: AttributeReference, _) => a + case others => + // datatype need to change for dictionary columns if only alias + // or attribute ref present. + // If anything else present, no need to change data type. + needChangeDatatype = false + others + } + needChangeDatatype + } + private def updateTempDecoder(plan: LogicalPlan, aliasMapOriginal: CarbonAliasDecoderRelation, attrMap: java.util.HashMap[AttributeReferenceWrapper, CarbonDecoderRelation]): @@ -650,44 +665,71 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { cd case sort: Sort => val sortExprs = sort.order.map { s => - s.transform { - case attr: AttributeReference => - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) - }.asInstanceOf[SortOrder] + if (needDataTypeUpdate(s)) { + s.transform { + case attr: AttributeReference => + updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) + }.asInstanceOf[SortOrder] + } else { + s + } } Sort(sortExprs, sort.global, sort.child) case agg: Aggregate if !agg.child.isInstanceOf[CarbonDictionaryCatalystDecoder] => val aggExps = agg.aggregateExpressions.map { aggExp => - aggExp.transform { - case attr: AttributeReference => - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) + if (needDataTypeUpdate(aggExp)) { + aggExp.transform { + case attr: AttributeReference => + updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) + } + } else { + aggExp } }.asInstanceOf[Seq[NamedExpression]] - val grpExps = agg.groupingExpressions.map { gexp => - gexp.transform { - case attr: AttributeReference => - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) + if (needDataTypeUpdate(gexp)) { + gexp.transform { + case attr: AttributeReference => + updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) + } + } else { + gexp } } Aggregate(grpExps, aggExps, agg.child) case expand: Expand => - val ex = expand.transformExpressions { - case attr: AttributeReference => - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) + // can't use needDataTypeUpdate here as argument is of type Expand + var needChangeDatatype: Boolean = true + expand.transformExpressions { + case attr: AttributeReference => attr + case a@Alias(attr: AttributeReference, _) => a + case others => + // datatype need to change for dictionary columns if only alias + // or attribute ref present. + // If anything else present, no need to change data type. + needChangeDatatype = false + others } - // Update the datatype of literal type as per the output type, otherwise codegen fails. - val updatedProj = ex.projections.map { projs => - projs.zipWithIndex.map { case(p, index) => - p.transform { - case l: Literal - if l.dataType != ex.output(index).dataType && - !isComplexColumn(ex.output(index), ex.child.output) => - Literal(l.value, ex.output(index).dataType) + if (needChangeDatatype) { + val ex = expand.transformExpressions { + case attr: AttributeReference => + updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) + } + // Update the datatype of literal type as per the output type, otherwise codegen fails. + val updatedProj = ex.projections.map { projs => + projs.zipWithIndex.map { case (p, index) => + p.transform { + case l: Literal + if l.dataType != ex.output(index).dataType && + !isComplexColumn(ex.output(index), ex.child.output) => + Literal(l.value, ex.output(index).dataType) + } } } + Expand(updatedProj, ex.output, ex.child) + } else { + expand } - Expand(updatedProj, ex.output, ex.child) case filter: Filter => filter case j: Join => @@ -698,18 +740,7 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { u case p: Project if relations.nonEmpty => val prExps = p.projectList.map { prExp => - var needChangeDatatype = true - prExp.transform { - case attr: AttributeReference => attr - case a@Alias(attr: AttributeReference, _) => a - case others => - // datatype need to change for dictionary columns if only alias - // or attribute ref present. - // If anything else present, no need to change data type. - needChangeDatatype = false - others - } - if (needChangeDatatype) { + if (needDataTypeUpdate(prExp)) { prExp.transform { case attr: AttributeReference => updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) @@ -721,27 +752,43 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { Project(prExps, p.child) case wd: Window if relations.nonEmpty => val prExps = wd.output.map { prExp => - prExp.transform { - case attr: AttributeReference => - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) + if (needDataTypeUpdate(prExp)) { + prExp.transform { + case attr: AttributeReference => + updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) + } + } else { + prExp } }.asInstanceOf[Seq[Attribute]] val wdExps = wd.windowExpressions.map { gexp => - gexp.transform { - case attr: AttributeReference => - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) + if (needDataTypeUpdate(gexp)) { + gexp.transform { + case attr: AttributeReference => + updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) + } + } else { + gexp } }.asInstanceOf[Seq[NamedExpression]] val partitionSpec = wd.partitionSpec.map{ exp => - exp.transform { - case attr: AttributeReference => - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) + if (needDataTypeUpdate(exp)) { + exp.transform { + case attr: AttributeReference => + updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) + } + } else { + exp } } val orderSpec = wd.orderSpec.map { exp => - exp.transform { - case attr: AttributeReference => - updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) + if (needDataTypeUpdate(exp)) { + exp.transform { + case attr: AttributeReference => + updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap) + } + } else { + exp } }.asInstanceOf[Seq[SortOrder]] Window(wdExps, partitionSpec, orderSpec, wd.child) diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryJoinTestSuite.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryJoinTestSuite.scala index 635445a..4552b4f 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryJoinTestSuite.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryJoinTestSuite.scala @@ -56,4 +56,23 @@ class SubQueryJoinTestSuite extends Spark2QueryTest with BeforeAndAfterAll { sql("drop table t1") sql("drop table t2") } + + test("test join with dictionary include with udf") { + sql("drop table if exists t1") + sql("drop table if exists t2") + sql( + "create table t1 (m_month smallint, hs_code string, country smallint, dollar_value double, " + + "quantity double, unit smallint, b_country smallint, imex int, y_year smallint) stored by " + + "'carbondata' tblproperties('dictionary_include'='m_month,hs_code,b_country,unit,y_year," + + "imex', 'sort_columns'='y_year,m_month,country,b_country,imex')") + sql( + "create table t2(id bigint, hs string, hs_cn string, hs_en string) stored by 'carbondata' " + + "tblproperties ('dictionary_include'='id,hs,hs_cn,hs_en')") + checkAnswer(sql( + "select a.hs,count(*) tb from (select substring(hs_code,1,2) as hs,count(*) v2000 from t1 " + + "group by substring(hs_code,1,2),y_year) a left join t2 h on (a.hs=h.hs) group by a.hs"), + Seq()) + sql("drop table if exists t1") + sql("drop table if exists t2") + } } diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java index 9ce96d4..588d4ac 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java @@ -94,8 +94,11 @@ public final class CarbonDataProcessorUtil { if (dir.exists()) { LOGGER.warn("dir already exists, skip dir creation: " + loc); } else { - if (!dir.mkdirs()) { + if (!dir.mkdirs() && !dir.exists()) { + // concurrent scenario mkdir may fail, so checking dir LOGGER.error("Error occurs while creating dir: " + loc); + } else { + LOGGER.info("Successfully created dir: " + loc); } } }