http://git-wip-us.apache.org/repos/asf/carbondata/blob/4749ca29/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java index f51ced3..6a401d8 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java @@ -34,20 +34,16 @@ import org.apache.carbondata.core.datastore.block.TableBlockInfo; import org.apache.carbondata.core.datastore.block.TaskBlockInfo; import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.scan.executor.QueryExecutor; import org.apache.carbondata.core.scan.executor.QueryExecutorFactory; import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; -import org.apache.carbondata.core.scan.model.QueryDimension; -import org.apache.carbondata.core.scan.model.QueryMeasure; import org.apache.carbondata.core.scan.model.QueryModel; -import org.apache.carbondata.core.scan.result.BatchResult; +import org.apache.carbondata.core.scan.result.RowBatch; import org.apache.carbondata.core.scan.result.iterator.RawResultIterator; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.core.util.DataTypeConverter; /** * Executor class for executing the query on the selected segments to be merged. @@ -70,6 +66,9 @@ public class CarbonCompactionExecutor { */ private boolean restructuredBlockExists; + // converter for UTF8String and decimal conversion + private DataTypeConverter dataTypeConverter; + /** * Constructor * @@ -82,13 +81,14 @@ public class CarbonCompactionExecutor { public CarbonCompactionExecutor(Map<String, TaskBlockInfo> segmentMapping, SegmentProperties segmentProperties, CarbonTable carbonTable, Map<String, List<DataFileFooter>> dataFileMetadataSegMapping, - boolean restructuredBlockExists) { + boolean restructuredBlockExists, DataTypeConverter dataTypeConverter) { this.segmentMapping = segmentMapping; this.destinationSegProperties = segmentProperties; this.carbonTable = carbonTable; this.dataFileMetadataSegMapping = dataFileMetadataSegMapping; this.restructuredBlockExists = restructuredBlockExists; - queryExecutorList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + this.queryExecutorList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + this.dataTypeConverter = dataTypeConverter; } /** @@ -100,7 +100,9 @@ public class CarbonCompactionExecutor { List<RawResultIterator> resultList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); List<TableBlockInfo> list = null; - queryModel = prepareQueryModel(list); + queryModel = carbonTable.createQueryModelWithProjectAllColumns(dataTypeConverter); + queryModel.setReadPageByPage(enablePageLevelReaderForCompaction()); + queryModel.setForcedDetailRawQuery(true); // iterate each seg ID for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) { String segmentId = taskMap.getKey(); @@ -156,7 +158,7 @@ public class CarbonCompactionExecutor { * @param blockList * @return */ - private CarbonIterator<BatchResult> executeBlockList(List<TableBlockInfo> blockList) + private CarbonIterator<RowBatch> executeBlockList(List<TableBlockInfo> blockList) throws QueryExecutionException, IOException { queryModel.setTableBlockInfos(blockList); QueryExecutor queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel); @@ -195,48 +197,6 @@ public class CarbonCompactionExecutor { } /** - * Preparing of the query model. - * - * @param blockList - * @return - */ - private QueryModel prepareQueryModel(List<TableBlockInfo> blockList) { - QueryModel model = new QueryModel(); - model.setTableBlockInfos(blockList); - model.setForcedDetailRawQuery(true); - model.setFilterExpressionResolverTree(null); - model.setConverter(DataTypeUtil.getDataTypeConverter()); - model.setReadPageByPage(enablePageLevelReaderForCompaction()); - - List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - - List<CarbonDimension> dimensions = - carbonTable.getDimensionByTableName(carbonTable.getTableName()); - for (CarbonDimension dim : dimensions) { - // check if dimension is deleted - QueryDimension queryDimension = new QueryDimension(dim.getColName()); - queryDimension.setDimension(dim); - dims.add(queryDimension); - } - model.setQueryDimension(dims); - - List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - List<CarbonMeasure> measures = - carbonTable.getMeasureByTableName(carbonTable.getTableName()); - for (CarbonMeasure carbonMeasure : measures) { - // check if measure is deleted - QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName()); - queryMeasure.setMeasure(carbonMeasure); - msrs.add(queryMeasure); - } - model.setQueryMeasures(msrs); - model.setQueryId(System.nanoTime() + ""); - model.setAbsoluteTableIdentifier(carbonTable.getAbsoluteTableIdentifier()); - model.setTable(carbonTable); - return model; - } - - /** * Whether to enable page level reader for compaction or not. */ private boolean enablePageLevelReaderForCompaction() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4749ca29/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java index 79e9e5a..b6f12a5 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java +++ b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java @@ -23,7 +23,6 @@ import java.util.Map; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.scan.model.CarbonQueryPlan; import org.apache.carbondata.processing.partition.DataPartitioner; import org.apache.carbondata.processing.partition.Partition; @@ -46,9 +45,8 @@ public final class QueryPartitionHelper { /** * Get partitions applicable for query based on filters applied in query */ - public List<Partition> getPartitionsForQuery(CarbonQueryPlan queryPlan) { - String tableUniqueName = - CarbonTable.buildUniqueName(queryPlan.getDatabaseName(), queryPlan.getTableName()); + public List<Partition> getPartitionsForQuery(String databaseName, String tableName) { + String tableUniqueName = CarbonTable.buildUniqueName(databaseName, tableName); DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName); http://git-wip-us.apache.org/repos/asf/carbondata/blob/4749ca29/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java index 36e022b..01db4f6 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java @@ -18,7 +18,6 @@ package org.apache.carbondata.processing.partition.spliter; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -26,19 +25,14 @@ import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.cache.dictionary.Dictionary; -import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.block.TableBlockInfo; import org.apache.carbondata.core.datastore.block.TaskBlockInfo; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.scan.executor.QueryExecutor; import org.apache.carbondata.core.scan.executor.QueryExecutorFactory; import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; -import org.apache.carbondata.core.scan.model.QueryDimension; -import org.apache.carbondata.core.scan.model.QueryMeasure; import org.apache.carbondata.core.scan.model.QueryModel; -import org.apache.carbondata.core.scan.result.BatchResult; +import org.apache.carbondata.core.scan.result.RowBatch; import org.apache.carbondata.core.util.CarbonUtil; public abstract class AbstractCarbonQueryExecutor { @@ -47,8 +41,8 @@ public abstract class AbstractCarbonQueryExecutor { LogServiceFactory.getLogService(AbstractCarbonQueryExecutor.class.getName()); protected CarbonTable carbonTable; protected QueryModel queryModel; - protected QueryExecutor queryExecutor; - protected Map<String, TaskBlockInfo> segmentMapping; + private QueryExecutor queryExecutor; + Map<String, TaskBlockInfo> segmentMapping; /** * get executor and execute the query model. @@ -56,7 +50,7 @@ public abstract class AbstractCarbonQueryExecutor { * @param blockList * @return */ - protected CarbonIterator<BatchResult> executeBlockList(List<TableBlockInfo> blockList) + CarbonIterator<RowBatch> executeBlockList(List<TableBlockInfo> blockList) throws QueryExecutionException, IOException { queryModel.setTableBlockInfos(blockList); this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel); @@ -64,46 +58,6 @@ public abstract class AbstractCarbonQueryExecutor { } /** - * Preparing of the query model. - * - * @param blockList - * @return - */ - protected QueryModel prepareQueryModel(List<TableBlockInfo> blockList) { - QueryModel model = new QueryModel(); - model.setTableBlockInfos(blockList); - model.setForcedDetailRawQuery(true); - model.setFilterExpressionResolverTree(null); - - List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - - List<CarbonDimension> dimensions = - carbonTable.getDimensionByTableName(carbonTable.getTableName()); - for (CarbonDimension dim : dimensions) { - // check if dimension is deleted - QueryDimension queryDimension = new QueryDimension(dim.getColName()); - queryDimension.setDimension(dim); - dims.add(queryDimension); - } - model.setQueryDimension(dims); - - List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - List<CarbonMeasure> measures = - carbonTable.getMeasureByTableName(carbonTable.getTableName()); - for (CarbonMeasure carbonMeasure : measures) { - // check if measure is deleted - QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName()); - queryMeasure.setMeasure(carbonMeasure); - msrs.add(queryMeasure); - } - model.setQueryMeasures(msrs); - model.setQueryId(System.nanoTime() + ""); - model.setAbsoluteTableIdentifier(carbonTable.getAbsoluteTableIdentifier()); - model.setTable(carbonTable); - return model; - } - - /** * Below method will be used * for cleanup */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/4749ca29/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java index 6afec0b..b18207d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java @@ -31,6 +31,7 @@ import org.apache.carbondata.core.datastore.block.TaskBlockInfo; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; import org.apache.carbondata.core.scan.result.iterator.PartitionSpliterRawResultIterator; +import org.apache.carbondata.core.util.DataTypeConverterImpl; /** * Used to read carbon blocks when add/split partition @@ -48,7 +49,8 @@ public class CarbonSplitExecutor extends AbstractCarbonQueryExecutor { public List<PartitionSpliterRawResultIterator> processDataBlocks(String segmentId) throws QueryExecutionException, IOException { List<TableBlockInfo> list = null; - queryModel = prepareQueryModel(list); + queryModel = carbonTable.createQueryModelWithProjectAllColumns(new DataTypeConverterImpl()); + queryModel.setForcedDetailRawQuery(true); List<PartitionSpliterRawResultIterator> resultList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); TaskBlockInfo taskBlockInfo = segmentMapping.get(segmentId); http://git-wip-us.apache.org/repos/asf/carbondata/blob/4749ca29/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java index ec91472..4abdf3c 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonQueryUtil.java @@ -24,7 +24,7 @@ import java.util.List; import java.util.Map; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.scan.model.CarbonQueryPlan; +import org.apache.carbondata.core.scan.model.QueryProjection; import org.apache.carbondata.processing.partition.Partition; import org.apache.carbondata.processing.partition.impl.DefaultLoadBalancer; import org.apache.carbondata.processing.partition.impl.PartitionMultiFileImpl; @@ -46,7 +46,7 @@ public class CarbonQueryUtil { * It creates the one split for each region server. */ public static synchronized TableSplit[] getTableSplits(String databaseName, String tableName, - CarbonQueryPlan queryPlan) { + QueryProjection queryPlan) { //Just create splits depends on locations of region servers List<Partition> allPartitions = null; @@ -55,7 +55,7 @@ public class CarbonQueryUtil { QueryPartitionHelper.getInstance().getAllPartitions(databaseName, tableName); } else { allPartitions = - QueryPartitionHelper.getInstance().getPartitionsForQuery(queryPlan); + QueryPartitionHelper.getInstance().getPartitionsForQuery(databaseName, tableName); } TableSplit[] splits = new TableSplit[allPartitions.size()]; for (int i = 0; i < splits.length; i++) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/4749ca29/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala index 9088731..ec6ab1a 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala @@ -154,7 +154,7 @@ class StreamHandoffRDD[K, V]( CarbonTableInputFormat.setTableInfo(hadoopConf, carbonTable.getTableInfo) val attemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId) val format = new CarbonTableInputFormat[Array[Object]]() - val model = format.getQueryModel(inputSplit, attemptContext) + val model = format.createQueryModel(inputSplit, attemptContext) val inputFormat = new CarbonStreamInputFormat val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext) .asInstanceOf[CarbonStreamRecordReader]