This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit 6f90b28dd3d3f008f33668884807cc63cb6b5db5
Author: ajantha-bhat <ajanthab...@gmail.com>
AuthorDate: Wed Aug 14 20:36:13 2019 +0530

    [CARBONDATA-3452] dictionary include udf handle all the scenarios
    
    Problem: select query failure when substring on dictionary column with join.
    Cause: when dictionary include is present, data type is updated to int from
    string in plan attribute. so substring was unresolved on int column.
    Join operation try to reference this attribute which is unresolved.
    Solution: Need to handle this for all the scenarios in CarbonLateDecodeRule
    
    This closes #3358
---
 .../hadoop/api/CarbonTableOutputFormat.java        |   5 +-
 .../spark/sql/optimizer/CarbonLateDecodeRule.scala | 141 ++++++++++++++-------
 .../carbondata/query/SubQueryJoinTestSuite.scala   |  19 +++
 .../processing/util/CarbonDataProcessorUtil.java   |   5 +-
 4 files changed, 120 insertions(+), 50 deletions(-)

diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index 9ba5e97..16703bf 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.hadoop.api;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -221,8 +222,8 @@ public class CarbonTableOutputFormat extends 
FileOutputFormat<NullWritable, Obje
       return (String[]) 
ObjectSerializationUtil.convertStringToObject(encodedString);
     }
     return new String[] {
-        System.getProperty("java.io.tmpdir") + "/" + System.nanoTime() + "_" + 
taskAttemptContext
-            .getTaskAttemptID().toString() };
+        System.getProperty("java.io.tmpdir") + "/" + 
UUID.randomUUID().toString().replace("-", "")
+            + "_" + taskAttemptContext.getTaskAttemptID().toString() };
   }
 
   @Override
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index 961bf11..99a8e70 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -619,6 +619,21 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with 
PredicateHelper {
     }
   }
 
+  private def needDataTypeUpdate(exp: Expression): Boolean = {
+    var needChangeDatatype: Boolean = true
+    exp.transform {
+      case attr: AttributeReference => attr
+      case a@Alias(attr: AttributeReference, _) => a
+      case others =>
+        // datatype need to change for dictionary columns if only alias
+        // or attribute ref present.
+        // If anything else present, no need to change data type.
+        needChangeDatatype = false
+        others
+    }
+    needChangeDatatype
+  }
+
   private def updateTempDecoder(plan: LogicalPlan,
       aliasMapOriginal: CarbonAliasDecoderRelation,
       attrMap: java.util.HashMap[AttributeReferenceWrapper, 
CarbonDecoderRelation]):
@@ -650,44 +665,71 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with 
PredicateHelper {
         cd
       case sort: Sort =>
         val sortExprs = sort.order.map { s =>
-          s.transform {
-            case attr: AttributeReference =>
-              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
-          }.asInstanceOf[SortOrder]
+          if (needDataTypeUpdate(s)) {
+            s.transform {
+              case attr: AttributeReference =>
+                updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+            }.asInstanceOf[SortOrder]
+          } else {
+            s
+          }
         }
         Sort(sortExprs, sort.global, sort.child)
       case agg: Aggregate if 
!agg.child.isInstanceOf[CarbonDictionaryCatalystDecoder] =>
         val aggExps = agg.aggregateExpressions.map { aggExp =>
-          aggExp.transform {
-            case attr: AttributeReference =>
-              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+          if (needDataTypeUpdate(aggExp)) {
+            aggExp.transform {
+              case attr: AttributeReference =>
+                updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+            }
+          } else {
+            aggExp
           }
         }.asInstanceOf[Seq[NamedExpression]]
-
         val grpExps = agg.groupingExpressions.map { gexp =>
-          gexp.transform {
-            case attr: AttributeReference =>
-              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+          if (needDataTypeUpdate(gexp)) {
+            gexp.transform {
+              case attr: AttributeReference =>
+                updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+            }
+          } else {
+            gexp
           }
         }
         Aggregate(grpExps, aggExps, agg.child)
       case expand: Expand =>
-        val ex = expand.transformExpressions {
-          case attr: AttributeReference =>
-            updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+        // can't use needDataTypeUpdate here as argument is of type Expand
+        var needChangeDatatype: Boolean = true
+        expand.transformExpressions {
+          case attr: AttributeReference => attr
+          case a@Alias(attr: AttributeReference, _) => a
+          case others =>
+            // datatype need to change for dictionary columns if only alias
+            // or attribute ref present.
+            // If anything else present, no need to change data type.
+            needChangeDatatype = false
+            others
         }
-        // Update the datatype of literal type as per the output type, 
otherwise codegen fails.
-        val updatedProj = ex.projections.map { projs =>
-          projs.zipWithIndex.map { case(p, index) =>
-            p.transform {
-              case l: Literal
-                if l.dataType != ex.output(index).dataType &&
-                   !isComplexColumn(ex.output(index), ex.child.output) =>
-                Literal(l.value, ex.output(index).dataType)
+        if (needChangeDatatype) {
+          val ex = expand.transformExpressions {
+            case attr: AttributeReference =>
+              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+          }
+          // Update the datatype of literal type as per the output type, 
otherwise codegen fails.
+          val updatedProj = ex.projections.map { projs =>
+            projs.zipWithIndex.map { case (p, index) =>
+              p.transform {
+                case l: Literal
+                  if l.dataType != ex.output(index).dataType &&
+                     !isComplexColumn(ex.output(index), ex.child.output) =>
+                  Literal(l.value, ex.output(index).dataType)
+              }
             }
           }
+          Expand(updatedProj, ex.output, ex.child)
+        } else {
+          expand
         }
-        Expand(updatedProj, ex.output, ex.child)
       case filter: Filter =>
         filter
       case j: Join =>
@@ -698,18 +740,7 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with 
PredicateHelper {
         u
       case p: Project if relations.nonEmpty =>
         val prExps = p.projectList.map { prExp =>
-          var needChangeDatatype = true
-          prExp.transform {
-            case attr: AttributeReference => attr
-            case a@Alias(attr: AttributeReference, _) => a
-            case others =>
-              // datatype need to change for dictionary columns if only alias
-              // or attribute ref present.
-              // If anything else present, no need to change data type.
-              needChangeDatatype = false
-              others
-          }
-          if (needChangeDatatype) {
+          if (needDataTypeUpdate(prExp)) {
             prExp.transform {
               case attr: AttributeReference =>
                 updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
@@ -721,27 +752,43 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with 
PredicateHelper {
         Project(prExps, p.child)
       case wd: Window if relations.nonEmpty =>
         val prExps = wd.output.map { prExp =>
-          prExp.transform {
-            case attr: AttributeReference =>
-              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+          if (needDataTypeUpdate(prExp)) {
+            prExp.transform {
+              case attr: AttributeReference =>
+                updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+            }
+          } else {
+            prExp
           }
         }.asInstanceOf[Seq[Attribute]]
         val wdExps = wd.windowExpressions.map { gexp =>
-          gexp.transform {
-            case attr: AttributeReference =>
-              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+          if (needDataTypeUpdate(gexp)) {
+            gexp.transform {
+              case attr: AttributeReference =>
+                updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+            }
+          } else {
+            gexp
           }
         }.asInstanceOf[Seq[NamedExpression]]
         val partitionSpec = wd.partitionSpec.map{ exp =>
-          exp.transform {
-            case attr: AttributeReference =>
-              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+          if (needDataTypeUpdate(exp)) {
+            exp.transform {
+              case attr: AttributeReference =>
+                updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+            }
+          } else {
+            exp
           }
         }
         val orderSpec = wd.orderSpec.map { exp =>
-          exp.transform {
-            case attr: AttributeReference =>
-              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+          if (needDataTypeUpdate(exp)) {
+            exp.transform {
+              case attr: AttributeReference =>
+                updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+            }
+          } else {
+            exp
           }
         }.asInstanceOf[Seq[SortOrder]]
         Window(wdExps, partitionSpec, orderSpec, wd.child)
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryJoinTestSuite.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryJoinTestSuite.scala
index 635445a..4552b4f 100644
--- 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryJoinTestSuite.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryJoinTestSuite.scala
@@ -56,4 +56,23 @@ class SubQueryJoinTestSuite extends Spark2QueryTest with 
BeforeAndAfterAll {
     sql("drop table t1")
     sql("drop table t2")
   }
+
+  test("test join with dictionary include with udf") {
+    sql("drop table if exists t1")
+    sql("drop table if exists t2")
+    sql(
+      "create table t1 (m_month smallint, hs_code string, country smallint, 
dollar_value double, " +
+      "quantity double, unit smallint, b_country smallint, imex int, y_year 
smallint) stored by " +
+      "'carbondata' 
tblproperties('dictionary_include'='m_month,hs_code,b_country,unit,y_year," +
+      "imex', 'sort_columns'='y_year,m_month,country,b_country,imex')")
+    sql(
+      "create table t2(id bigint, hs string, hs_cn string, hs_en string) 
stored by 'carbondata' " +
+      "tblproperties ('dictionary_include'='id,hs,hs_cn,hs_en')")
+    checkAnswer(sql(
+      "select a.hs,count(*) tb from (select substring(hs_code,1,2) as 
hs,count(*) v2000 from t1 " +
+      "group by substring(hs_code,1,2),y_year) a left join t2 h on (a.hs=h.hs) 
group by a.hs"),
+      Seq())
+    sql("drop table if exists t1")
+    sql("drop table if exists t2")
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 9ce96d4..588d4ac 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -94,8 +94,11 @@ public final class CarbonDataProcessorUtil {
       if (dir.exists()) {
         LOGGER.warn("dir already exists, skip dir creation: " + loc);
       } else {
-        if (!dir.mkdirs()) {
+        if (!dir.mkdirs() && !dir.exists()) {
+          // concurrent scenario mkdir may fail, so checking dir
           LOGGER.error("Error occurs while creating dir: " + loc);
+        } else {
+          LOGGER.info("Successfully created dir: " + loc);
         }
       }
     }

Reply via email to