Repository: incubator-carbondata Updated Branches: refs/heads/master 392bc290e -> 35f74248f
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30ee42cb/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala index f020a1c..8d50324 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala @@ -26,7 +26,6 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConverters._ import scala.util.parsing.combinator.RegexParsers -import org.apache.spark.internal.Logging import org.apache.spark.sql.{RuntimeConfig, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NoSuchTableException} @@ -39,8 +38,9 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.carbon.CarbonTableIdentifier import org.apache.carbondata.core.carbon.metadata.CarbonMetadata import org.apache.carbondata.core.carbon.metadata.converter.ThriftWrapperSchemaConverterImpl +import org.apache.carbondata.core.carbon.metadata.datatype.DataType.DECIMAL import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension +import org.apache.carbondata.core.carbon.metadata.schema.table.column.{CarbonColumn, CarbonDimension} import org.apache.carbondata.core.carbon.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsConstants} import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -782,7 +782,48 @@ case class CarbonRelation( nullable = true)()) } - override val output = dimensionsAttr ++ measureAttr + override val output = { + val columns = tableMeta.carbonTable.getCreateOrderColumn(tableMeta.carbonTable.getFactTableName) + .asScala + // convert each column to Attribute + columns.filter(!_.isInvisible).map { column => + if (column.isDimesion()) { + val output: DataType = column.getDataType.toString.toLowerCase match { + case "array" => + CarbonMetastoreTypes.toDataType(s"array<${getArrayChildren(column.getColName)}>") + case "struct" => + CarbonMetastoreTypes.toDataType(s"struct<${getStructChildren(column.getColName)}>") + case dType => + val dataType = addDecimalScaleAndPrecision(column, dType) + CarbonMetastoreTypes.toDataType(dataType) + } + AttributeReference(column.getColName, output, nullable = true )( + qualifier = Option(tableName + "." + column.getColName)) + } else { + val output = CarbonMetastoreTypes.toDataType { + column.getDataType.toString + .toLowerCase match { + case "int" => "long" + case "short" => "long" + case "decimal" => "decimal(" + column.getColumnSchema.getPrecision + "," + column + .getColumnSchema.getScale + ")" + case others => others + } + } + AttributeReference(column.getColName, output, nullable = true)( + qualifier = Option(tableName + "." + column.getColName)) + } + } + } + + def addDecimalScaleAndPrecision(dimval: CarbonColumn, dataType: String): String = { + var dType = dataType + if (dimval.getDataType == DECIMAL) { + dType += + "(" + dimval.getColumnSchema.getPrecision + "," + dimval.getColumnSchema.getScale + ")" + } + dType + } // TODO: Use data from the footers. override lazy val statistics = Statistics(sizeInBytes = this.sizeInBytes) @@ -797,8 +838,7 @@ case class CarbonRelation( def addDecimalScaleAndPrecision(dimval: CarbonDimension, dataType: String): String = { var dType = dataType - if (dimval.getDataType - == org.apache.carbondata.core.carbon.metadata.datatype.DataType.DECIMAL) { + if (dimval.getDataType == DECIMAL) { dType += "(" + dimval.getColumnSchema.getPrecision + "," + dimval.getColumnSchema.getScale + ")" } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30ee42cb/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala index 52c0eb0..a4fdc2f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala @@ -163,12 +163,15 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { Sort(sort.order, sort.global, child) } case union: Union - if !(union.children.head.isInstanceOf[CarbonDictionaryTempDecoder] || - union.children(1).isInstanceOf[CarbonDictionaryTempDecoder]) => + if !union.children.exists(_.isInstanceOf[CarbonDictionaryTempDecoder]) => val children = union.children.map { child => val condAttrs = new util.HashSet[AttributeReferenceWrapper] child.output.foreach(attr => - condAttrs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))) + if (isDictionaryEncoded(attr, attrMap, aliasMap)) { + condAttrs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))) + } + ) + if (hasCarbonRelation(child) && condAttrs.size() > 0 && !child.isInstanceOf[CarbonDictionaryCatalystDecoder]) { CarbonDictionaryTempDecoder(condAttrs, http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30ee42cb/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java index f47babc..cbae5a8 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java +++ b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java @@ -91,6 +91,8 @@ public class CsvInput extends BaseStep implements StepInterface { */ private String rddIteratorKey = null; + private CarbonIterator<CarbonIterator<String[]>> rddIterator; + public CsvInput(StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr, TransMeta transMeta, Trans trans) { super(stepMeta, stepDataInterface, copyNr, transMeta, trans); @@ -194,28 +196,27 @@ public class CsvInput extends BaseStep implements StepInterface { } class RddScanCallable implements Callable<Void> { - List<CarbonIterator<String[]>> iterList; - - RddScanCallable() { - this.iterList = new ArrayList<CarbonIterator<String[]>>(1000); - } - - public void addJavaRddIterator(CarbonIterator<String[]> iter) { - this.iterList.add(iter); - } - - @Override - public Void call() throws Exception { - StandardLogService.setThreadName(("PROCESS_DataFrame_PARTITIONS"), - Thread.currentThread().getName()); + @Override public Void call() throws Exception { + StandardLogService + .setThreadName(("PROCESS_DataFrame_PARTITIONS"), Thread.currentThread().getName()); try { String[] values = null; - for (CarbonIterator<String[]> iter: iterList) { - iter.initialize(); - while (iter.hasNext()) { - values = iter.next(); - synchronized (putRowLock) { - putRow(data.outputRowMeta, values); + boolean hasNext = true; + CarbonIterator<String[]> iter; + boolean isInitialized = false; + while (hasNext) { + // Inovke getRddIterator to get a RDD[Row] iterator of a partition. + // The RDD comes from the sub-query DataFrame in InsertInto statement. + iter = getRddIterator(isInitialized); + isInitialized = true; + if (iter == null) { + hasNext = false; + } else { + while (iter.hasNext()) { + values = iter.next(); + synchronized (putRowLock) { + putRow(data.outputRowMeta, values); + } } } } @@ -225,34 +226,34 @@ public class CsvInput extends BaseStep implements StepInterface { } return null; } - }; + } + + private synchronized CarbonIterator<String[]> getRddIterator(boolean isInitialized) { + if (!isInitialized) { + rddIterator.initialize(); + } + if (rddIterator.hasNext()) { + return rddIterator.next(); + } + return null; + } private void scanRddIterator(int numberOfNodes) throws RuntimeException { - CarbonIterator<CarbonIterator<String[]>> iter = RddInputUtils.getAndRemove(rddIteratorKey); - if (iter != null) { - iter.initialize(); + rddIterator = RddInputUtils.getAndRemove(rddIteratorKey); + if (rddIterator != null) { exec = Executors.newFixedThreadPool(numberOfNodes); List<Future<Void>> results = new ArrayList<Future<Void>>(numberOfNodes); RddScanCallable[] calls = new RddScanCallable[numberOfNodes]; for (int i = 0; i < numberOfNodes; i++ ) { calls[i] = new RddScanCallable(); - } - int index = 0 ; - while (iter.hasNext()) { - calls[index].addJavaRddIterator(iter.next()); - index = index + 1; - if (index == numberOfNodes) { - index = 0; - } - } - for (RddScanCallable call: calls) { - results.add(exec.submit(call)); + results.add(exec.submit(calls[i])); } try { for (Future<Void> futrue : results) { futrue.get(); } } catch (InterruptedException | ExecutionException e) { + LOGGER.error(e, "Thread InterruptedException"); throw new RuntimeException("Thread InterruptedException", e); } finally { exec.shutdownNow(); @@ -264,7 +265,7 @@ public class CsvInput extends BaseStep implements StepInterface { Iterator<String[]> iterator = RddInpututilsForUpdate.getAndRemove(rddIteratorKey); if (iterator != null) { try{ - while(iterator.hasNext()){ + while (iterator.hasNext()) { putRow(data.outputRowMeta, iterator.next()); } } catch (KettleException e) { @@ -430,4 +431,4 @@ public class CsvInput extends BaseStep implements StepInterface { return false; } -} \ No newline at end of file +}