Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 392bc290e -> 35f74248f


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30ee42cb/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index f020a1c..8d50324 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -26,7 +26,6 @@ import scala.collection.mutable.ArrayBuffer
 import scala.collection.JavaConverters._
 import scala.util.parsing.combinator.RegexParsers
 
-import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{RuntimeConfig, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, 
NoSuchTableException}
@@ -39,8 +38,9 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.carbon.CarbonTableIdentifier
 import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
 import 
org.apache.carbondata.core.carbon.metadata.converter.ThriftWrapperSchemaConverterImpl
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType.DECIMAL
 import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
-import 
org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
+import 
org.apache.carbondata.core.carbon.metadata.schema.table.column.{CarbonColumn, 
CarbonDimension}
 import org.apache.carbondata.core.carbon.path.{CarbonStorePath, 
CarbonTablePath}
 import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, 
QueryStatisticsConstants}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -782,7 +782,48 @@ case class CarbonRelation(
         nullable = true)())
   }
 
-  override val output = dimensionsAttr ++ measureAttr
+  override val output = {
+    val columns = 
tableMeta.carbonTable.getCreateOrderColumn(tableMeta.carbonTable.getFactTableName)
+      .asScala
+    // convert each column to Attribute
+    columns.filter(!_.isInvisible).map { column =>
+      if (column.isDimesion()) {
+        val output: DataType = column.getDataType.toString.toLowerCase match {
+          case "array" =>
+            
CarbonMetastoreTypes.toDataType(s"array<${getArrayChildren(column.getColName)}>")
+          case "struct" =>
+            
CarbonMetastoreTypes.toDataType(s"struct<${getStructChildren(column.getColName)}>")
+          case dType =>
+            val dataType = addDecimalScaleAndPrecision(column, dType)
+            CarbonMetastoreTypes.toDataType(dataType)
+        }
+        AttributeReference(column.getColName, output, nullable = true )(
+          qualifier = Option(tableName + "." + column.getColName))
+      } else {
+        val output = CarbonMetastoreTypes.toDataType {
+          column.getDataType.toString
+            .toLowerCase match {
+            case "int" => "long"
+            case "short" => "long"
+            case "decimal" => "decimal(" + column.getColumnSchema.getPrecision 
+ "," + column
+              .getColumnSchema.getScale + ")"
+            case others => others
+          }
+        }
+        AttributeReference(column.getColName, output, nullable = true)(
+          qualifier = Option(tableName + "." + column.getColName))
+      }
+    }
+  }
+
+  def addDecimalScaleAndPrecision(dimval: CarbonColumn, dataType: String): 
String = {
+    var dType = dataType
+    if (dimval.getDataType == DECIMAL) {
+      dType +=
+      "(" + dimval.getColumnSchema.getPrecision + "," + 
dimval.getColumnSchema.getScale + ")"
+    }
+    dType
+  }
 
   // TODO: Use data from the footers.
   override lazy val statistics = Statistics(sizeInBytes = this.sizeInBytes)
@@ -797,8 +838,7 @@ case class CarbonRelation(
 
   def addDecimalScaleAndPrecision(dimval: CarbonDimension, dataType: String): 
String = {
     var dType = dataType
-    if (dimval.getDataType
-      == org.apache.carbondata.core.carbon.metadata.datatype.DataType.DECIMAL) 
{
+    if (dimval.getDataType == DECIMAL) {
       dType +=
         "(" + dimval.getColumnSchema.getPrecision + "," + 
dimval.getColumnSchema.getScale + ")"
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30ee42cb/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index 52c0eb0..a4fdc2f 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -163,12 +163,15 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with 
PredicateHelper {
             Sort(sort.order, sort.global, child)
           }
         case union: Union
-          if !(union.children.head.isInstanceOf[CarbonDictionaryTempDecoder] ||
-            union.children(1).isInstanceOf[CarbonDictionaryTempDecoder]) =>
+          if 
!union.children.exists(_.isInstanceOf[CarbonDictionaryTempDecoder]) =>
           val children = union.children.map { child =>
             val condAttrs = new util.HashSet[AttributeReferenceWrapper]
             child.output.foreach(attr =>
-              condAttrs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, 
attr))))
+              if (isDictionaryEncoded(attr, attrMap, aliasMap)) {
+                
condAttrs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
+              }
+            )
+
             if (hasCarbonRelation(child) && condAttrs.size() > 0 &&
               !child.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
               CarbonDictionaryTempDecoder(condAttrs,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30ee42cb/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java
 
b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java
index f47babc..cbae5a8 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java
@@ -91,6 +91,8 @@ public class CsvInput extends BaseStep implements 
StepInterface {
    */
   private String rddIteratorKey = null;
 
+  private CarbonIterator<CarbonIterator<String[]>> rddIterator;
+
   public CsvInput(StepMeta stepMeta, StepDataInterface stepDataInterface, int 
copyNr,
       TransMeta transMeta, Trans trans) {
     super(stepMeta, stepDataInterface, copyNr, transMeta, trans);
@@ -194,28 +196,27 @@ public class CsvInput extends BaseStep implements 
StepInterface {
   }
 
   class RddScanCallable implements Callable<Void> {
-    List<CarbonIterator<String[]>> iterList;
-
-    RddScanCallable() {
-      this.iterList = new ArrayList<CarbonIterator<String[]>>(1000);
-    }
-
-    public void addJavaRddIterator(CarbonIterator<String[]> iter) {
-      this.iterList.add(iter);
-    }
-
-    @Override
-    public Void call() throws Exception {
-      StandardLogService.setThreadName(("PROCESS_DataFrame_PARTITIONS"),
-          Thread.currentThread().getName());
+    @Override public Void call() throws Exception {
+      StandardLogService
+          .setThreadName(("PROCESS_DataFrame_PARTITIONS"), 
Thread.currentThread().getName());
       try {
         String[] values = null;
-        for (CarbonIterator<String[]> iter: iterList) {
-          iter.initialize();
-          while (iter.hasNext()) {
-            values = iter.next();
-            synchronized (putRowLock) {
-              putRow(data.outputRowMeta, values);
+        boolean hasNext = true;
+        CarbonIterator<String[]> iter;
+        boolean isInitialized = false;
+        while (hasNext) {
+          // Inovke getRddIterator to get a RDD[Row] iterator of a partition.
+          // The RDD comes from the sub-query DataFrame in InsertInto 
statement.
+          iter = getRddIterator(isInitialized);
+          isInitialized = true;
+          if (iter == null) {
+            hasNext = false;
+          } else {
+            while (iter.hasNext()) {
+              values = iter.next();
+              synchronized (putRowLock) {
+                putRow(data.outputRowMeta, values);
+              }
             }
           }
         }
@@ -225,34 +226,34 @@ public class CsvInput extends BaseStep implements 
StepInterface {
       }
       return null;
     }
-  };
+  }
+
+  private synchronized CarbonIterator<String[]> getRddIterator(boolean 
isInitialized) {
+    if (!isInitialized) {
+      rddIterator.initialize();
+    }
+    if (rddIterator.hasNext()) {
+      return rddIterator.next();
+    }
+    return null;
+  }
 
   private void scanRddIterator(int numberOfNodes) throws RuntimeException {
-    CarbonIterator<CarbonIterator<String[]>> iter = 
RddInputUtils.getAndRemove(rddIteratorKey);
-    if (iter != null) {
-      iter.initialize();
+    rddIterator = RddInputUtils.getAndRemove(rddIteratorKey);
+    if (rddIterator != null) {
       exec = Executors.newFixedThreadPool(numberOfNodes);
       List<Future<Void>> results = new ArrayList<Future<Void>>(numberOfNodes);
       RddScanCallable[] calls = new RddScanCallable[numberOfNodes];
       for (int i = 0; i < numberOfNodes; i++ ) {
         calls[i] = new RddScanCallable();
-      }
-      int index = 0 ;
-      while (iter.hasNext()) {
-        calls[index].addJavaRddIterator(iter.next());
-        index = index + 1;
-        if (index == numberOfNodes) {
-          index = 0;
-        }
-      }
-      for (RddScanCallable call: calls) {
-        results.add(exec.submit(call));
+        results.add(exec.submit(calls[i]));
       }
       try {
         for (Future<Void> futrue : results) {
           futrue.get();
         }
       } catch (InterruptedException | ExecutionException e) {
+        LOGGER.error(e, "Thread InterruptedException");
         throw new RuntimeException("Thread InterruptedException", e);
       } finally {
         exec.shutdownNow();
@@ -264,7 +265,7 @@ public class CsvInput extends BaseStep implements 
StepInterface {
     Iterator<String[]> iterator = 
RddInpututilsForUpdate.getAndRemove(rddIteratorKey);
     if (iterator != null) {
       try{
-        while(iterator.hasNext()){
+        while (iterator.hasNext()) {
           putRow(data.outputRowMeta, iterator.next());
         }
       } catch (KettleException e) {
@@ -430,4 +431,4 @@ public class CsvInput extends BaseStep implements 
StepInterface {
     return false;
   }
 
-}
\ No newline at end of file
+}

Reply via email to