http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/PartitionSpliterRawResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/PartitionSpliterRawResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/PartitionSpliterRawResultIterator.java index 553f85e..773fbd7 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/PartitionSpliterRawResultIterator.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/PartitionSpliterRawResultIterator.java @@ -17,20 +17,15 @@ package org.apache.carbondata.core.scan.result.iterator; import org.apache.carbondata.common.CarbonIterator; -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.scan.result.BatchResult; +import org.apache.carbondata.core.scan.result.RowBatch; public class PartitionSpliterRawResultIterator extends CarbonIterator<Object[]> { - private CarbonIterator<BatchResult> iterator; - private BatchResult batch; + private CarbonIterator<RowBatch> iterator; + private RowBatch batch; private int counter; - private static final LogService LOGGER = - LogServiceFactory.getLogService(PartitionSpliterRawResultIterator.class.getName()); - - public PartitionSpliterRawResultIterator(CarbonIterator<BatchResult> iterator) { + public PartitionSpliterRawResultIterator(CarbonIterator<RowBatch> iterator) { this.iterator = iterator; } @@ -65,7 +60,7 @@ public class PartitionSpliterRawResultIterator extends CarbonIterator<Object[]> * @param batch * @return */ - private boolean checkBatchEnd(BatchResult batch) { + private boolean checkBatchEnd(RowBatch batch) { return !(counter < batch.getSize()); }
http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java index 70d0958..1dd1595 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java @@ -21,7 +21,7 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.keygenerator.KeyGenException; -import org.apache.carbondata.core.scan.result.BatchResult; +import org.apache.carbondata.core.scan.result.RowBatch; import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper; /** @@ -37,7 +37,7 @@ public class RawResultIterator extends CarbonIterator<Object[]> { /** * Iterator of the Batch raw result. */ - private CarbonIterator<BatchResult> detailRawQueryResultIterator; + private CarbonIterator<RowBatch> detailRawQueryResultIterator; /** * Counter to maintain the row counter. @@ -55,9 +55,9 @@ public class RawResultIterator extends CarbonIterator<Object[]> { /** * batch of the result. */ - private BatchResult batch; + private RowBatch batch; - public RawResultIterator(CarbonIterator<BatchResult> detailRawQueryResultIterator, + public RawResultIterator(CarbonIterator<RowBatch> detailRawQueryResultIterator, SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties) { this.detailRawQueryResultIterator = detailRawQueryResultIterator; this.sourceSegProperties = sourceSegProperties; @@ -155,7 +155,7 @@ public class RawResultIterator extends CarbonIterator<Object[]> { * @param batch * @return */ - private boolean checkIfBatchIsProcessedCompletely(BatchResult batch) { + private boolean checkIfBatchIsProcessedCompletely(RowBatch batch) { if (counter < batch.getSize()) { return false; } else { http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java index cc9710e..c7cb00d 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/VectorDetailQueryResultIterator.java @@ -35,10 +35,12 @@ public class VectorDetailQueryResultIterator extends AbstractDetailQueryResultIt super(infos, queryModel, execService); } - @Override public Object next() { + @Override + public Object next() { throw new UnsupportedOperationException("call processNextBatch instead"); } + @Override public void processNextBatch(CarbonColumnarBatch columnarBatch) { synchronized (lock) { updateDataBlockIterator(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java index cfc2f16..973ce0f 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java @@ -87,7 +87,4 @@ public class CarbonColumnarBatch { } } - public int getRowsFilteredCount() { - return rowsFiltered; - } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java index a5f81b9..59117dd 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java @@ -18,16 +18,16 @@ package org.apache.carbondata.core.scan.result.vector; import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; import org.apache.carbondata.core.scan.filter.GenericQueryType; -import org.apache.carbondata.core.scan.model.QueryDimension; -import org.apache.carbondata.core.scan.model.QueryMeasure; +import org.apache.carbondata.core.scan.model.ProjectionDimension; +import org.apache.carbondata.core.scan.model.ProjectionMeasure; public class ColumnVectorInfo implements Comparable<ColumnVectorInfo> { public int offset; public int size; public CarbonColumnVector vector; public int vectorOffset; - public QueryDimension dimension; - public QueryMeasure measure; + public ProjectionDimension dimension; + public ProjectionMeasure measure; public int ordinal; public DirectDictionaryGenerator directDictionaryGenerator; public MeasureDataVectorProcessor.MeasureVectorFiller measureVectorFiller; http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java index db4c982..8902dfb 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/MeasureDataVectorProcessor.java @@ -29,7 +29,7 @@ public class MeasureDataVectorProcessor { void fillMeasureVector(ColumnPage dataChunk, ColumnVectorInfo info); - void fillMeasureVectorForFilter(int[] rowMapping, ColumnPage dataChunk, + void fillMeasureVector(int[] filteredRowId, ColumnPage dataChunk, ColumnVectorInfo info); } @@ -60,7 +60,7 @@ public class MeasureDataVectorProcessor { } @Override - public void fillMeasureVectorForFilter(int[] rowMapping, ColumnPage dataChunk, + public void fillMeasureVector(int[] filteredRowId, ColumnPage dataChunk, ColumnVectorInfo info) { int offset = info.offset; int len = offset + info.size; @@ -69,13 +69,13 @@ public class MeasureDataVectorProcessor { BitSet nullBitSet = dataChunk.getNullBits(); if (nullBitSet.isEmpty()) { for (int i = offset; i < len; i++) { - int currentRow = rowMapping[i]; + int currentRow = filteredRowId[i]; vector.putInt(vectorOffset, (int)dataChunk.getLong(currentRow)); vectorOffset++; } } else { for (int i = offset; i < len; i++) { - int currentRow = rowMapping[i]; + int currentRow = filteredRowId[i]; if (nullBitSet.get(currentRow)) { vector.putNull(vectorOffset); } else { @@ -117,7 +117,7 @@ public class MeasureDataVectorProcessor { } @Override - public void fillMeasureVectorForFilter(int[] rowMapping, + public void fillMeasureVector(int[] filteredRowId, ColumnPage dataChunk, ColumnVectorInfo info) { int offset = info.offset; int len = offset + info.size; @@ -126,13 +126,13 @@ public class MeasureDataVectorProcessor { BitSet nullBitSet = dataChunk.getNullBits(); if (nullBitSet.isEmpty()) { for (int i = offset; i < len; i++) { - int currentRow = rowMapping[i]; + int currentRow = filteredRowId[i]; vector.putBoolean(vectorOffset, dataChunk.getBoolean(currentRow)); vectorOffset++; } } else { for (int i = offset; i < len; i++) { - int currentRow = rowMapping[i]; + int currentRow = filteredRowId[i]; if (nullBitSet.get(currentRow)) { vector.putNull(vectorOffset); } else { @@ -171,7 +171,7 @@ public class MeasureDataVectorProcessor { } @Override - public void fillMeasureVectorForFilter(int[] rowMapping, ColumnPage dataChunk, + public void fillMeasureVector(int[] filteredRowId, ColumnPage dataChunk, ColumnVectorInfo info) { int offset = info.offset; int len = offset + info.size; @@ -180,13 +180,13 @@ public class MeasureDataVectorProcessor { BitSet nullBitSet = dataChunk.getNullBits(); if (nullBitSet.isEmpty()) { for (int i = offset; i < len; i++) { - int currentRow = rowMapping[i]; + int currentRow = filteredRowId[i]; vector.putShort(vectorOffset, (short) dataChunk.getLong(currentRow)); vectorOffset++; } } else { for (int i = offset; i < len; i++) { - int currentRow = rowMapping[i]; + int currentRow = filteredRowId[i]; if (nullBitSet.get(currentRow)) { vector.putNull(vectorOffset); } else { @@ -225,7 +225,7 @@ public class MeasureDataVectorProcessor { } @Override - public void fillMeasureVectorForFilter(int[] rowMapping, ColumnPage dataChunk, + public void fillMeasureVector(int[] filteredRowId, ColumnPage dataChunk, ColumnVectorInfo info) { int offset = info.offset; int len = offset + info.size; @@ -234,13 +234,13 @@ public class MeasureDataVectorProcessor { BitSet nullBitSet = dataChunk.getNullBits(); if (nullBitSet.isEmpty()) { for (int i = offset; i < len; i++) { - int currentRow = rowMapping[i]; + int currentRow = filteredRowId[i]; vector.putLong(vectorOffset, dataChunk.getLong(currentRow)); vectorOffset++; } } else { for (int i = offset; i < len; i++) { - int currentRow = rowMapping[i]; + int currentRow = filteredRowId[i]; if (nullBitSet.get(currentRow)) { vector.putNull(vectorOffset); } else { @@ -279,7 +279,7 @@ public class MeasureDataVectorProcessor { } @Override - public void fillMeasureVectorForFilter(int[] rowMapping, ColumnPage dataChunk, + public void fillMeasureVector(int[] filteredRowId, ColumnPage dataChunk, ColumnVectorInfo info) { int offset = info.offset; int len = offset + info.size; @@ -288,7 +288,7 @@ public class MeasureDataVectorProcessor { int precision = info.measure.getMeasure().getPrecision(); BitSet nullBitSet = dataChunk.getNullBits(); for (int i = offset; i < len; i++) { - int currentRow = rowMapping[i]; + int currentRow = filteredRowId[i]; if (nullBitSet.get(currentRow)) { vector.putNull(vectorOffset); } else { @@ -330,7 +330,7 @@ public class MeasureDataVectorProcessor { } @Override - public void fillMeasureVectorForFilter(int[] rowMapping, ColumnPage dataChunk, + public void fillMeasureVector(int[] filteredRowId, ColumnPage dataChunk, ColumnVectorInfo info) { int offset = info.offset; int len = offset + info.size; @@ -339,13 +339,13 @@ public class MeasureDataVectorProcessor { BitSet nullBitSet = dataChunk.getNullBits(); if (nullBitSet.isEmpty()) { for (int i = offset; i < len; i++) { - int currentRow = rowMapping[i]; + int currentRow = filteredRowId[i]; vector.putDouble(vectorOffset, dataChunk.getDouble(currentRow)); vectorOffset++; } } else { for (int i = offset; i < len; i++) { - int currentRow = rowMapping[i]; + int currentRow = filteredRowId[i]; if (nullBitSet.get(currentRow)) { vector.putNull(vectorOffset); } else { http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java deleted file mode 100644 index bf26ca3..0000000 --- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.core.scan.scanner; - -import java.io.IOException; - -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; -import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk; -import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk; -import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk; -import org.apache.carbondata.core.datastore.page.ColumnPage; -import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; -import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; -import org.apache.carbondata.core.scan.processor.BlocksChunkHolder; -import org.apache.carbondata.core.scan.result.AbstractScannedResult; -import org.apache.carbondata.core.scan.result.impl.NonFilterQueryScannedResult; -import org.apache.carbondata.core.stats.QueryStatistic; -import org.apache.carbondata.core.stats.QueryStatisticsConstants; -import org.apache.carbondata.core.stats.QueryStatisticsModel; - -/** - * Blocklet scanner class to process the block - */ -public abstract class AbstractBlockletScanner implements BlockletScanner { - - /** - * block execution info - */ - protected BlockExecutionInfo blockExecutionInfo; - - public QueryStatisticsModel queryStatisticsModel; - - private AbstractScannedResult emptyResult; - - public AbstractBlockletScanner(BlockExecutionInfo tableBlockExecutionInfos) { - this.blockExecutionInfo = tableBlockExecutionInfos; - } - - @Override public AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder) - throws IOException, FilterUnsupportedException { - long startTime = System.currentTimeMillis(); - AbstractScannedResult scannedResult = new NonFilterQueryScannedResult(blockExecutionInfo); - QueryStatistic totalBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap() - .get(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM); - totalBlockletStatistic.addCountStatistic(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM, - totalBlockletStatistic.getCount() + 1); - QueryStatistic validScannedBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap() - .get(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM); - validScannedBlockletStatistic - .addCountStatistic(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM, - validScannedBlockletStatistic.getCount() + 1); - // adding statistics for valid number of pages - QueryStatistic validPages = queryStatisticsModel.getStatisticsTypeAndObjMap() - .get(QueryStatisticsConstants.VALID_PAGE_SCANNED); - validPages.addCountStatistic(QueryStatisticsConstants.VALID_PAGE_SCANNED, - validPages.getCount() + blocksChunkHolder.getDataBlock().numberOfPages()); - // adding statistics for number of pages - QueryStatistic totalPagesScanned = queryStatisticsModel.getStatisticsTypeAndObjMap() - .get(QueryStatisticsConstants.TOTAL_PAGE_SCANNED); - totalPagesScanned.addCountStatistic(QueryStatisticsConstants.TOTAL_PAGE_SCANNED, - totalPagesScanned.getCount() + blocksChunkHolder.getDataBlock().numberOfPages()); - scannedResult.setBlockletId( - blockExecutionInfo.getBlockId() + CarbonCommonConstants.FILE_SEPARATOR + blocksChunkHolder - .getDataBlock().blockletId()); - if (!blockExecutionInfo.isPrefetchBlocklet()) { - readBlocklet(blocksChunkHolder); - } - DimensionRawColumnChunk[] dimensionRawColumnChunks = - blocksChunkHolder.getDimensionRawDataChunk(); - DimensionColumnDataChunk[][] dimensionColumnDataChunks = - new DimensionColumnDataChunk[dimensionRawColumnChunks.length][blocksChunkHolder - .getDataBlock().numberOfPages()]; - MeasureRawColumnChunk[] measureRawColumnChunks = blocksChunkHolder.getMeasureRawDataChunk(); - ColumnPage[][] columnPages = - new ColumnPage[measureRawColumnChunks.length][blocksChunkHolder.getDataBlock() - .numberOfPages()]; - scannedResult.setDimensionChunks(dimensionColumnDataChunks); - scannedResult.setMeasureChunks(columnPages); - scannedResult.setDimRawColumnChunks(dimensionRawColumnChunks); - scannedResult.setMsrRawColumnChunks(measureRawColumnChunks); - if (blockExecutionInfo.isPrefetchBlocklet()) { - for (int i = 0; i < dimensionRawColumnChunks.length; i++) { - if (dimensionRawColumnChunks[i] != null) { - dimensionColumnDataChunks[i] = dimensionRawColumnChunks[i].convertToDimColDataChunks(); - } - } - for (int i = 0; i < measureRawColumnChunks.length; i++) { - if (measureRawColumnChunks[i] != null) { - columnPages[i] = measureRawColumnChunks[i].convertToColumnPage(); - } - } - } - int[] numberOfRows = null; - if (blockExecutionInfo.getAllSelectedDimensionBlocksIndexes().length > 0) { - for (int i = 0; i < dimensionRawColumnChunks.length; i++) { - if (dimensionRawColumnChunks[i] != null) { - numberOfRows = dimensionRawColumnChunks[i].getRowCount(); - break; - } - } - } else if (blockExecutionInfo.getAllSelectedMeasureBlocksIndexes().length > 0) { - for (int i = 0; i < measureRawColumnChunks.length; i++) { - if (measureRawColumnChunks[i] != null) { - numberOfRows = measureRawColumnChunks[i].getRowCount(); - break; - } - } - } - - // count(*) case there would not be any dimensions are measures selected. - if (numberOfRows == null) { - numberOfRows = new int[blocksChunkHolder.getDataBlock().numberOfPages()]; - for (int i = 0; i < numberOfRows.length; i++) { - numberOfRows[i] = - CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT; - } - int lastPageSize = blocksChunkHolder.getDataBlock().nodeSize() - % CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT; - ; - if (lastPageSize > 0) { - numberOfRows[numberOfRows.length - 1] = lastPageSize; - } - } - scannedResult.setNumberOfRows(numberOfRows); - if (!blockExecutionInfo.isPrefetchBlocklet()) { - scannedResult.fillDataChunks(); - } - // adding statistics for carbon scan time - QueryStatistic scanTime = queryStatisticsModel.getStatisticsTypeAndObjMap() - .get(QueryStatisticsConstants.SCAN_BLOCKlET_TIME); - scanTime.addCountStatistic(QueryStatisticsConstants.SCAN_BLOCKlET_TIME, - scanTime.getCount() + (System.currentTimeMillis() - startTime)); - return scannedResult; - } - - @Override public void readBlocklet(BlocksChunkHolder blocksChunkHolder) throws IOException { - long startTime = System.currentTimeMillis(); - DimensionRawColumnChunk[] dimensionRawColumnChunks = blocksChunkHolder.getDataBlock() - .getDimensionChunks(blocksChunkHolder.getFileReader(), - blockExecutionInfo.getAllSelectedDimensionBlocksIndexes()); - blocksChunkHolder.setDimensionRawDataChunk(dimensionRawColumnChunks); - MeasureRawColumnChunk[] measureRawColumnChunks = blocksChunkHolder.getDataBlock() - .getMeasureChunks(blocksChunkHolder.getFileReader(), - blockExecutionInfo.getAllSelectedMeasureBlocksIndexes()); - blocksChunkHolder.setMeasureRawDataChunk(measureRawColumnChunks); - // adding statistics for carbon read time - QueryStatistic readTime = queryStatisticsModel.getStatisticsTypeAndObjMap() - .get(QueryStatisticsConstants.READ_BLOCKlET_TIME); - readTime.addCountStatistic(QueryStatisticsConstants.READ_BLOCKlET_TIME, - readTime.getCount() + (System.currentTimeMillis() - startTime)); - } - - @Override public AbstractScannedResult createEmptyResult() { - if (emptyResult == null) { - emptyResult = new NonFilterQueryScannedResult(blockExecutionInfo); - emptyResult.setNumberOfRows(new int[0]); - emptyResult.setIndexes(new int[0][]); - } - return emptyResult; - } - - @Override public boolean isScanRequired(BlocksChunkHolder blocksChunkHolder) throws IOException { - // For non filter it is always true - return true; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java index 0ed0d43..0a41032 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/BlockletScanner.java @@ -18,9 +18,10 @@ package org.apache.carbondata.core.scan.scanner; import java.io.IOException; +import org.apache.carbondata.core.datastore.DataRefNode; import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; -import org.apache.carbondata.core.scan.processor.BlocksChunkHolder; -import org.apache.carbondata.core.scan.result.AbstractScannedResult; +import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks; +import org.apache.carbondata.core.scan.result.BlockletScannedResult; /** * Interface for processing the block @@ -30,31 +31,26 @@ public interface BlockletScanner { /** * Checks whether this blocklet required to scan or not based on min max of each blocklet. - * @param blocksChunkHolder + * @param dataBlock * @return * @throws IOException */ - boolean isScanRequired(BlocksChunkHolder blocksChunkHolder) throws IOException; + boolean isScanRequired(DataRefNode dataBlock); /** * Below method will used to process the block data and get the scanned result * - * @param blocksChunkHolder block chunk which holds the block data + * @param rawBlockletColumnChunks block chunk which holds the block data * @return scannerResult * result after processing */ - AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder) + BlockletScannedResult scanBlocklet(RawBlockletColumnChunks rawBlockletColumnChunks) throws IOException, FilterUnsupportedException; /** * Just reads the blocklet from file, does not uncompress it. - * @param blocksChunkHolder + * @param rawBlockletColumnChunks */ - void readBlocklet(BlocksChunkHolder blocksChunkHolder) throws IOException; + void readBlocklet(RawBlockletColumnChunks rawBlockletColumnChunks) throws IOException; - /** - * In case if there is no filter satisfies. - * @return AbstractScannedResult - */ - AbstractScannedResult createEmptyResult(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java new file mode 100644 index 0000000..1c73d63 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.scan.scanner.impl; + +import java.io.IOException; +import java.util.BitSet; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.DataRefNode; +import org.apache.carbondata.core.datastore.FileReader; +import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage; +import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk; +import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk; +import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor; +import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks; +import org.apache.carbondata.core.scan.result.BlockletScannedResult; +import org.apache.carbondata.core.scan.result.impl.FilterQueryScannedResult; +import org.apache.carbondata.core.stats.QueryStatistic; +import org.apache.carbondata.core.stats.QueryStatisticsConstants; +import org.apache.carbondata.core.stats.QueryStatisticsModel; +import org.apache.carbondata.core.util.BitSetGroup; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonUtil; + +/** + * Below class will be used for filter query processing + * this class will be first apply the filter then it will read the column page if + * required and return the scanned result + */ +public class BlockletFilterScanner extends BlockletFullScanner { + + /** + * filter executer to evaluate filter condition + */ + private FilterExecuter filterExecuter; + /** + * this will be used to apply min max + * this will be useful for dimension column which is on the right side + * as node finder will always give tentative blocks, if column data stored individually + * and data is in sorted order then we can check whether filter is in the range of min max or not + * if it present then only we can apply filter on complete data. + * this will be very useful in case of sparse data when rows are + * repeating. + */ + private boolean isMinMaxEnabled; + + private QueryStatisticsModel queryStatisticsModel; + + private boolean useBitSetPipeLine; + + public BlockletFilterScanner(BlockExecutionInfo blockExecutionInfo, + QueryStatisticsModel queryStatisticsModel) { + super(blockExecutionInfo, queryStatisticsModel); + // to check whether min max is enabled or not + String minMaxEnableValue = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_QUERY_MIN_MAX_ENABLED, + CarbonCommonConstants.MIN_MAX_DEFAULT_VALUE); + if (null != minMaxEnableValue) { + isMinMaxEnabled = Boolean.parseBoolean(minMaxEnableValue); + } + // get the filter tree + this.filterExecuter = blockExecutionInfo.getFilterExecuterTree(); + this.queryStatisticsModel = queryStatisticsModel; + + String useBitSetPipeLine = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.BITSET_PIPE_LINE, + CarbonCommonConstants.BITSET_PIPE_LINE_DEFAULT); + if (null != useBitSetPipeLine) { + this.useBitSetPipeLine = Boolean.parseBoolean(useBitSetPipeLine); + } + } + + /** + * Below method will be used to process the block + * + * @param rawBlockletColumnChunks block chunk holder which holds the data + * @throws FilterUnsupportedException + */ + @Override + public BlockletScannedResult scanBlocklet(RawBlockletColumnChunks rawBlockletColumnChunks) + throws IOException, FilterUnsupportedException { + return executeFilter(rawBlockletColumnChunks); + } + + @Override + public boolean isScanRequired(DataRefNode dataBlock) { + // adding statistics for number of pages + QueryStatistic totalPagesScanned = queryStatisticsModel.getStatisticsTypeAndObjMap() + .get(QueryStatisticsConstants.TOTAL_PAGE_SCANNED); + totalPagesScanned.addCountStatistic(QueryStatisticsConstants.TOTAL_PAGE_SCANNED, + totalPagesScanned.getCount() + dataBlock.numberOfPages()); + // apply min max + if (isMinMaxEnabled) { + BitSet bitSet = null; + // check for implicit include filter instance + if (filterExecuter instanceof ImplicitColumnFilterExecutor) { + String blockletId = blockExecutionInfo.getBlockIdString() + + CarbonCommonConstants.FILE_SEPARATOR + dataBlock.blockletIndex(); + bitSet = ((ImplicitColumnFilterExecutor) filterExecuter) + .isFilterValuesPresentInBlockOrBlocklet( + dataBlock.getColumnsMaxValue(), + dataBlock.getColumnsMinValue(), blockletId); + } else { + bitSet = this.filterExecuter + .isScanRequired(dataBlock.getColumnsMaxValue(), + dataBlock.getColumnsMinValue()); + } + return !bitSet.isEmpty(); + } + return true; + } + + @Override + public void readBlocklet(RawBlockletColumnChunks rawBlockletColumnChunks) throws IOException { + long startTime = System.currentTimeMillis(); + this.filterExecuter.readColumnChunks(rawBlockletColumnChunks); + // adding statistics for carbon read time + QueryStatistic readTime = queryStatisticsModel.getStatisticsTypeAndObjMap() + .get(QueryStatisticsConstants.READ_BLOCKlET_TIME); + readTime.addCountStatistic(QueryStatisticsConstants.READ_BLOCKlET_TIME, + readTime.getCount() + (System.currentTimeMillis() - startTime)); + } + + /** + * This method will process the data in below order + * 1. first apply min max on the filter tree and check whether any of the filter + * is fall on the range of min max, if not then return empty result + * 2. If filter falls on min max range then apply filter on actual + * data and get the filtered row index + * 3. if row index is empty then return the empty result + * 4. if row indexes is not empty then read only those blocks(measure or dimension) + * which was present in the query but not present in the filter, as while applying filter + * some of the blocks where already read and present in chunk holder so not need to + * read those blocks again, this is to avoid reading of same blocks which was already read + * 5. Set the blocks and filter indexes to result + * + * @param rawBlockletColumnChunks + * @throws FilterUnsupportedException + */ + private BlockletScannedResult executeFilter(RawBlockletColumnChunks rawBlockletColumnChunks) + throws FilterUnsupportedException, IOException { + long startTime = System.currentTimeMillis(); + QueryStatistic totalBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap() + .get(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM); + totalBlockletStatistic.addCountStatistic(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM, + totalBlockletStatistic.getCount() + 1); + // apply filter on actual data, for each page + BitSetGroup bitSetGroup = this.filterExecuter.applyFilter(rawBlockletColumnChunks, + useBitSetPipeLine); + // if filter result is empty then return with empty result + if (bitSetGroup.isEmpty()) { + CarbonUtil.freeMemory(rawBlockletColumnChunks.getDimensionRawColumnChunks(), + rawBlockletColumnChunks.getMeasureRawColumnChunks()); + + QueryStatistic scanTime = queryStatisticsModel.getStatisticsTypeAndObjMap() + .get(QueryStatisticsConstants.SCAN_BLOCKlET_TIME); + scanTime.addCountStatistic(QueryStatisticsConstants.SCAN_BLOCKlET_TIME, + scanTime.getCount() + (System.currentTimeMillis() - startTime)); + + QueryStatistic scannedPages = queryStatisticsModel.getStatisticsTypeAndObjMap() + .get(QueryStatisticsConstants.PAGE_SCANNED); + scannedPages.addCountStatistic(QueryStatisticsConstants.PAGE_SCANNED, + scannedPages.getCount() + bitSetGroup.getScannedPages()); + return createEmptyResult(); + } + + BlockletScannedResult scannedResult = new FilterQueryScannedResult(blockExecutionInfo); + scannedResult.setBlockletId( + blockExecutionInfo.getBlockIdString() + CarbonCommonConstants.FILE_SEPARATOR + + rawBlockletColumnChunks.getDataBlock().blockletIndex()); + // valid scanned blocklet + QueryStatistic validScannedBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap() + .get(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM); + validScannedBlockletStatistic + .addCountStatistic(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM, + validScannedBlockletStatistic.getCount() + 1); + // adding statistics for valid number of pages + QueryStatistic validPages = queryStatisticsModel.getStatisticsTypeAndObjMap() + .get(QueryStatisticsConstants.VALID_PAGE_SCANNED); + validPages.addCountStatistic(QueryStatisticsConstants.VALID_PAGE_SCANNED, + validPages.getCount() + bitSetGroup.getValidPages()); + QueryStatistic scannedPages = queryStatisticsModel.getStatisticsTypeAndObjMap() + .get(QueryStatisticsConstants.PAGE_SCANNED); + scannedPages.addCountStatistic(QueryStatisticsConstants.PAGE_SCANNED, + scannedPages.getCount() + bitSetGroup.getScannedPages()); + int[] pageFilteredRowCount = new int[bitSetGroup.getNumberOfPages()]; + // get the row indexes from bit set for each page + int[][] pageFilteredRowId = new int[bitSetGroup.getNumberOfPages()][]; + int numPages = pageFilteredRowId.length; + for (int pageId = 0; pageId < numPages; pageId++) { + BitSet bitSet = bitSetGroup.getBitSet(pageId); + if (bitSet != null && !bitSet.isEmpty()) { + int[] matchedRowId = new int[bitSet.cardinality()]; + int index = 0; + for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) { + matchedRowId[index++] = i; + } + pageFilteredRowCount[pageId] = matchedRowId.length; + pageFilteredRowId[pageId] = matchedRowId; + } + } + + long dimensionReadTime = System.currentTimeMillis(); + dimensionReadTime = System.currentTimeMillis() - dimensionReadTime; + + FileReader fileReader = rawBlockletColumnChunks.getFileReader(); + + + DimensionRawColumnChunk[] dimensionRawColumnChunks = + new DimensionRawColumnChunk[blockExecutionInfo.getTotalNumberDimensionToRead()]; + int numDimensionChunks = dimensionRawColumnChunks.length; + // read dimension chunk blocks from file which is not present + for (int chunkIndex = 0; chunkIndex < numDimensionChunks; chunkIndex++) { + dimensionRawColumnChunks[chunkIndex] = + rawBlockletColumnChunks.getDimensionRawColumnChunks()[chunkIndex]; + } + int[][] allSelectedDimensionColumnIndexRange = + blockExecutionInfo.getAllSelectedDimensionColumnIndexRange(); + DimensionRawColumnChunk[] projectionListDimensionChunk = rawBlockletColumnChunks.getDataBlock() + .readDimensionChunks(fileReader, allSelectedDimensionColumnIndexRange); + for (int[] columnIndexRange : allSelectedDimensionColumnIndexRange) { + System.arraycopy(projectionListDimensionChunk, columnIndexRange[0], + dimensionRawColumnChunks, columnIndexRange[0], + columnIndexRange[1] + 1 - columnIndexRange[0]); + } + + /* + * in case projection if the projected dimension are not loaded in the dimensionColumnDataChunk + * then loading them + */ + int[] projectionListDimensionIndexes = blockExecutionInfo.getProjectionListDimensionIndexes(); + for (int projectionListDimensionIndex : projectionListDimensionIndexes) { + if (null == dimensionRawColumnChunks[projectionListDimensionIndex]) { + dimensionRawColumnChunks[projectionListDimensionIndex] = + rawBlockletColumnChunks.getDataBlock().readDimensionChunk( + fileReader, projectionListDimensionIndex); + } + } + + DimensionColumnPage[][] dimensionColumnPages = + new DimensionColumnPage[numDimensionChunks][numPages]; + for (int chunkIndex = 0; chunkIndex < numDimensionChunks; chunkIndex++) { + if (dimensionRawColumnChunks[chunkIndex] != null) { + for (int pageId = 0; pageId < numPages; pageId++) { + dimensionColumnPages[chunkIndex][pageId] = + dimensionRawColumnChunks[chunkIndex].decodeColumnPage(pageId); + } + } + } + + + MeasureRawColumnChunk[] measureRawColumnChunks = + new MeasureRawColumnChunk[blockExecutionInfo.getTotalNumberOfMeasureToRead()]; + int numMeasureChunks = measureRawColumnChunks.length; + + // read the measure chunk blocks which is not present + for (int chunkIndex = 0; chunkIndex < numMeasureChunks; chunkIndex++) { + if (null != rawBlockletColumnChunks.getMeasureRawColumnChunks()[chunkIndex]) { + measureRawColumnChunks[chunkIndex] = + rawBlockletColumnChunks.getMeasureRawColumnChunks()[chunkIndex]; + } + } + + int[][] allSelectedMeasureColumnIndexRange = + blockExecutionInfo.getAllSelectedMeasureIndexRange(); + MeasureRawColumnChunk[] projectionListMeasureChunk = rawBlockletColumnChunks.getDataBlock() + .readMeasureChunks(fileReader, allSelectedMeasureColumnIndexRange); + for (int[] columnIndexRange : allSelectedMeasureColumnIndexRange) { + System.arraycopy(projectionListMeasureChunk, columnIndexRange[0], measureRawColumnChunks, + columnIndexRange[0], columnIndexRange[1] + 1 - columnIndexRange[0]); + } + /* + * in case projection if the projected measure are not loaded in the ColumnPage + * then loading them + */ + int[] projectionListMeasureIndexes = blockExecutionInfo.getProjectionListMeasureIndexes(); + for (int projectionListMeasureIndex : projectionListMeasureIndexes) { + if (null == measureRawColumnChunks[projectionListMeasureIndex]) { + measureRawColumnChunks[projectionListMeasureIndex] = rawBlockletColumnChunks.getDataBlock() + .readMeasureChunk(fileReader, projectionListMeasureIndex); + } + } + ColumnPage[][] measureColumnPages = new ColumnPage[numMeasureChunks][numPages]; + for (int chunkIndex = 0; chunkIndex < numMeasureChunks; chunkIndex++) { + if (measureRawColumnChunks[chunkIndex] != null) { + for (int pageId = 0; pageId < numPages; pageId++) { + measureColumnPages[chunkIndex][pageId] = + measureRawColumnChunks[chunkIndex].decodeColumnPage(pageId); + } + } + } + + scannedResult.setDimensionColumnPages(dimensionColumnPages); + scannedResult.setPageFilteredRowId(pageFilteredRowId); + scannedResult.setMeasureColumnPages(measureColumnPages); + scannedResult.setDimRawColumnChunks(dimensionRawColumnChunks); + scannedResult.setMsrRawColumnChunks(measureRawColumnChunks); + scannedResult.setPageFilteredRowCount(pageFilteredRowCount); + // adding statistics for carbon scan time + QueryStatistic scanTime = queryStatisticsModel.getStatisticsTypeAndObjMap() + .get(QueryStatisticsConstants.SCAN_BLOCKlET_TIME); + scanTime.addCountStatistic(QueryStatisticsConstants.SCAN_BLOCKlET_TIME, + scanTime.getCount() + (System.currentTimeMillis() - startTime - dimensionReadTime)); + QueryStatistic readTime = queryStatisticsModel.getStatisticsTypeAndObjMap() + .get(QueryStatisticsConstants.READ_BLOCKlET_TIME); + readTime.addCountStatistic(QueryStatisticsConstants.READ_BLOCKlET_TIME, + readTime.getCount() + dimensionReadTime); + return scannedResult; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java new file mode 100644 index 0000000..f0211dc --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.scan.scanner.impl; + +import java.io.IOException; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; +import org.apache.carbondata.core.datastore.DataRefNode; +import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage; +import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk; +import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk; +import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; +import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks; +import org.apache.carbondata.core.scan.result.BlockletScannedResult; +import org.apache.carbondata.core.scan.result.impl.NonFilterQueryScannedResult; +import org.apache.carbondata.core.scan.scanner.BlockletScanner; +import org.apache.carbondata.core.stats.QueryStatistic; +import org.apache.carbondata.core.stats.QueryStatisticsConstants; +import org.apache.carbondata.core.stats.QueryStatisticsModel; + +/** + * Blocklet scanner to do full scan of a blocklet, + * returning all projection and filter column chunks + */ +public class BlockletFullScanner implements BlockletScanner { + + /** + * block execution info + */ + protected BlockExecutionInfo blockExecutionInfo; + + private QueryStatisticsModel queryStatisticsModel; + + private BlockletScannedResult emptyResult; + + public BlockletFullScanner(BlockExecutionInfo tableBlockExecutionInfos, + QueryStatisticsModel queryStatisticsModel) { + this.blockExecutionInfo = tableBlockExecutionInfos; + this.queryStatisticsModel = queryStatisticsModel; + } + + @Override + public BlockletScannedResult scanBlocklet( + RawBlockletColumnChunks rawBlockletColumnChunks) + throws IOException, FilterUnsupportedException { + long startTime = System.currentTimeMillis(); + BlockletScannedResult scannedResult = new NonFilterQueryScannedResult(blockExecutionInfo); + QueryStatistic totalBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap() + .get(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM); + totalBlockletStatistic.addCountStatistic(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM, + totalBlockletStatistic.getCount() + 1); + QueryStatistic validScannedBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap() + .get(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM); + validScannedBlockletStatistic + .addCountStatistic(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM, + validScannedBlockletStatistic.getCount() + 1); + // adding statistics for valid number of pages + QueryStatistic validPages = queryStatisticsModel.getStatisticsTypeAndObjMap() + .get(QueryStatisticsConstants.VALID_PAGE_SCANNED); + validPages.addCountStatistic(QueryStatisticsConstants.VALID_PAGE_SCANNED, + validPages.getCount() + rawBlockletColumnChunks.getDataBlock().numberOfPages()); + // adding statistics for number of pages + QueryStatistic totalPagesScanned = queryStatisticsModel.getStatisticsTypeAndObjMap() + .get(QueryStatisticsConstants.TOTAL_PAGE_SCANNED); + totalPagesScanned.addCountStatistic(QueryStatisticsConstants.TOTAL_PAGE_SCANNED, + totalPagesScanned.getCount() + rawBlockletColumnChunks.getDataBlock().numberOfPages()); + scannedResult.setBlockletId( + blockExecutionInfo.getBlockIdString() + CarbonCommonConstants.FILE_SEPARATOR + + rawBlockletColumnChunks.getDataBlock().blockletIndex()); + if (!blockExecutionInfo.isPrefetchBlocklet()) { + readBlocklet(rawBlockletColumnChunks); + } + DimensionRawColumnChunk[] dimensionRawColumnChunks = + rawBlockletColumnChunks.getDimensionRawColumnChunks(); + DimensionColumnPage[][] dimensionColumnDataChunks = + new DimensionColumnPage[dimensionRawColumnChunks.length][rawBlockletColumnChunks + .getDataBlock().numberOfPages()]; + MeasureRawColumnChunk[] measureRawColumnChunks = + rawBlockletColumnChunks.getMeasureRawColumnChunks(); + ColumnPage[][] measureColumnPages = + new ColumnPage[measureRawColumnChunks.length][rawBlockletColumnChunks.getDataBlock() + .numberOfPages()]; + scannedResult.setDimensionColumnPages(dimensionColumnDataChunks); + scannedResult.setMeasureColumnPages(measureColumnPages); + scannedResult.setDimRawColumnChunks(dimensionRawColumnChunks); + scannedResult.setMsrRawColumnChunks(measureRawColumnChunks); + if (blockExecutionInfo.isPrefetchBlocklet()) { + for (int i = 0; i < dimensionRawColumnChunks.length; i++) { + if (dimensionRawColumnChunks[i] != null) { + dimensionColumnDataChunks[i] = dimensionRawColumnChunks[i].decodeAllColumnPages(); + } + } + for (int i = 0; i < measureRawColumnChunks.length; i++) { + if (measureRawColumnChunks[i] != null) { + measureColumnPages[i] = measureRawColumnChunks[i].decodeAllColumnPages(); + } + } + } + int[] numberOfRows = null; + if (blockExecutionInfo.getAllSelectedDimensionColumnIndexRange().length > 0) { + for (int i = 0; i < dimensionRawColumnChunks.length; i++) { + if (dimensionRawColumnChunks[i] != null) { + numberOfRows = dimensionRawColumnChunks[i].getRowCount(); + break; + } + } + } else if (blockExecutionInfo.getAllSelectedMeasureIndexRange().length > 0) { + for (int i = 0; i < measureRawColumnChunks.length; i++) { + if (measureRawColumnChunks[i] != null) { + numberOfRows = measureRawColumnChunks[i].getRowCount(); + break; + } + } + } + + // count(*) case there would not be any dimensions are measures selected. + if (numberOfRows == null) { + numberOfRows = new int[rawBlockletColumnChunks.getDataBlock().numberOfPages()]; + for (int i = 0; i < numberOfRows.length; i++) { + numberOfRows[i] = + CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT; + } + int lastPageSize = rawBlockletColumnChunks.getDataBlock().numRows() + % CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT; + ; + if (lastPageSize > 0) { + numberOfRows[numberOfRows.length - 1] = lastPageSize; + } + } + scannedResult.setPageFilteredRowCount(numberOfRows); + if (!blockExecutionInfo.isPrefetchBlocklet()) { + scannedResult.fillDataChunks(); + } + // adding statistics for carbon scan time + QueryStatistic scanTime = queryStatisticsModel.getStatisticsTypeAndObjMap() + .get(QueryStatisticsConstants.SCAN_BLOCKlET_TIME); + scanTime.addCountStatistic(QueryStatisticsConstants.SCAN_BLOCKlET_TIME, + scanTime.getCount() + (System.currentTimeMillis() - startTime)); + return scannedResult; + } + + @Override + public void readBlocklet(RawBlockletColumnChunks rawBlockletColumnChunks) + throws IOException { + long startTime = System.currentTimeMillis(); + DimensionRawColumnChunk[] dimensionRawColumnChunks = rawBlockletColumnChunks.getDataBlock() + .readDimensionChunks(rawBlockletColumnChunks.getFileReader(), + blockExecutionInfo.getAllSelectedDimensionColumnIndexRange()); + rawBlockletColumnChunks.setDimensionRawColumnChunks(dimensionRawColumnChunks); + MeasureRawColumnChunk[] measureRawColumnChunks = rawBlockletColumnChunks.getDataBlock() + .readMeasureChunks(rawBlockletColumnChunks.getFileReader(), + blockExecutionInfo.getAllSelectedMeasureIndexRange()); + rawBlockletColumnChunks.setMeasureRawColumnChunks(measureRawColumnChunks); + // adding statistics for carbon read time + QueryStatistic readTime = queryStatisticsModel.getStatisticsTypeAndObjMap() + .get(QueryStatisticsConstants.READ_BLOCKlET_TIME); + readTime.addCountStatistic(QueryStatisticsConstants.READ_BLOCKlET_TIME, + readTime.getCount() + (System.currentTimeMillis() - startTime)); + } + + BlockletScannedResult createEmptyResult() { + if (emptyResult == null) { + emptyResult = new NonFilterQueryScannedResult(blockExecutionInfo); + emptyResult.setPageFilteredRowCount(new int[0]); + emptyResult.setPageFilteredRowId(new int[0][]); + } + return emptyResult; + } + + @Override public boolean isScanRequired(DataRefNode dataBlock) { + // For non filter it is always true + return true; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java deleted file mode 100644 index e77093b..0000000 --- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java +++ /dev/null @@ -1,326 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.core.scan.scanner.impl; - -import java.io.IOException; -import java.util.BitSet; - -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastore.FileHolder; -import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk; -import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk; -import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk; -import org.apache.carbondata.core.datastore.page.ColumnPage; -import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; -import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; -import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; -import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor; -import org.apache.carbondata.core.scan.processor.BlocksChunkHolder; -import org.apache.carbondata.core.scan.result.AbstractScannedResult; -import org.apache.carbondata.core.scan.result.impl.FilterQueryScannedResult; -import org.apache.carbondata.core.scan.scanner.AbstractBlockletScanner; -import org.apache.carbondata.core.stats.QueryStatistic; -import org.apache.carbondata.core.stats.QueryStatisticsConstants; -import org.apache.carbondata.core.stats.QueryStatisticsModel; -import org.apache.carbondata.core.util.BitSetGroup; -import org.apache.carbondata.core.util.CarbonProperties; -import org.apache.carbondata.core.util.CarbonUtil; - -/** - * Below class will be used for filter query processing - * this class will be first apply the filter then it will read the block if - * required and return the scanned result - */ -public class FilterScanner extends AbstractBlockletScanner { - - /** - * filter tree - */ - private FilterExecuter filterExecuter; - /** - * this will be used to apply min max - * this will be useful for dimension column which is on the right side - * as node finder will always give tentative blocks, if column data stored individually - * and data is in sorted order then we can check whether filter is in the range of min max or not - * if it present then only we can apply filter on complete data. - * this will be very useful in case of sparse data when rows are - * repeating. - */ - private boolean isMinMaxEnabled; - - private QueryStatisticsModel queryStatisticsModel; - - private boolean useBitSetPipeLine; - - public FilterScanner(BlockExecutionInfo blockExecutionInfo, - QueryStatisticsModel queryStatisticsModel) { - super(blockExecutionInfo); - // to check whether min max is enabled or not - String minMaxEnableValue = CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.CARBON_QUERY_MIN_MAX_ENABLED, - CarbonCommonConstants.MIN_MAX_DEFAULT_VALUE); - if (null != minMaxEnableValue) { - isMinMaxEnabled = Boolean.parseBoolean(minMaxEnableValue); - } - // get the filter tree - this.filterExecuter = blockExecutionInfo.getFilterExecuterTree(); - this.queryStatisticsModel = queryStatisticsModel; - - String useBitSetPipeLine = CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.BITSET_PIPE_LINE, - CarbonCommonConstants.BITSET_PIPE_LINE_DEFAULT); - if (null != useBitSetPipeLine) { - this.useBitSetPipeLine = Boolean.parseBoolean(useBitSetPipeLine); - } - } - - /** - * Below method will be used to process the block - * - * @param blocksChunkHolder block chunk holder which holds the data - * @throws FilterUnsupportedException - */ - @Override public AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder) - throws IOException, FilterUnsupportedException { - return fillScannedResult(blocksChunkHolder); - } - - @Override public boolean isScanRequired(BlocksChunkHolder blocksChunkHolder) throws IOException { - // adding statistics for number of pages - QueryStatistic totalPagesScanned = queryStatisticsModel.getStatisticsTypeAndObjMap() - .get(QueryStatisticsConstants.TOTAL_PAGE_SCANNED); - totalPagesScanned.addCountStatistic(QueryStatisticsConstants.TOTAL_PAGE_SCANNED, - totalPagesScanned.getCount() + blocksChunkHolder.getDataBlock().numberOfPages()); - // apply min max - if (isMinMaxEnabled) { - BitSet bitSet = null; - // check for implicit include filter instance - if (filterExecuter instanceof ImplicitColumnFilterExecutor) { - String blockletId = blockExecutionInfo.getBlockId() + CarbonCommonConstants.FILE_SEPARATOR - + blocksChunkHolder.getDataBlock().blockletId(); - bitSet = ((ImplicitColumnFilterExecutor) filterExecuter) - .isFilterValuesPresentInBlockOrBlocklet( - blocksChunkHolder.getDataBlock().getColumnsMaxValue(), - blocksChunkHolder.getDataBlock().getColumnsMinValue(), blockletId); - } else { - bitSet = this.filterExecuter - .isScanRequired(blocksChunkHolder.getDataBlock().getColumnsMaxValue(), - blocksChunkHolder.getDataBlock().getColumnsMinValue()); - } - if (bitSet.isEmpty()) { - CarbonUtil.freeMemory(blocksChunkHolder.getDimensionRawDataChunk(), - blocksChunkHolder.getMeasureRawDataChunk()); - return false; - } - } - return true; - } - - @Override public void readBlocklet(BlocksChunkHolder blocksChunkHolder) throws IOException { - long startTime = System.currentTimeMillis(); - this.filterExecuter.readBlocks(blocksChunkHolder); - // adding statistics for carbon read time - QueryStatistic readTime = queryStatisticsModel.getStatisticsTypeAndObjMap() - .get(QueryStatisticsConstants.READ_BLOCKlET_TIME); - readTime.addCountStatistic(QueryStatisticsConstants.READ_BLOCKlET_TIME, - readTime.getCount() + (System.currentTimeMillis() - startTime)); - } - - /** - * This method will process the data in below order - * 1. first apply min max on the filter tree and check whether any of the filter - * is fall on the range of min max, if not then return empty result - * 2. If filter falls on min max range then apply filter on actual - * data and get the filtered row index - * 3. if row index is empty then return the empty result - * 4. if row indexes is not empty then read only those blocks(measure or dimension) - * which was present in the query but not present in the filter, as while applying filter - * some of the blocks where already read and present in chunk holder so not need to - * read those blocks again, this is to avoid reading of same blocks which was already read - * 5. Set the blocks and filter indexes to result - * - * @param blocksChunkHolder - * @throws FilterUnsupportedException - */ - private AbstractScannedResult fillScannedResult(BlocksChunkHolder blocksChunkHolder) - throws FilterUnsupportedException, IOException { - long startTime = System.currentTimeMillis(); - QueryStatistic totalBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap() - .get(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM); - totalBlockletStatistic.addCountStatistic(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM, - totalBlockletStatistic.getCount() + 1); - // apply filter on actual data - BitSetGroup bitSetGroup = this.filterExecuter.applyFilter(blocksChunkHolder, useBitSetPipeLine); - // if indexes is empty then return with empty result - if (bitSetGroup.isEmpty()) { - CarbonUtil.freeMemory(blocksChunkHolder.getDimensionRawDataChunk(), - blocksChunkHolder.getMeasureRawDataChunk()); - - QueryStatistic scanTime = queryStatisticsModel.getStatisticsTypeAndObjMap() - .get(QueryStatisticsConstants.SCAN_BLOCKlET_TIME); - scanTime.addCountStatistic(QueryStatisticsConstants.SCAN_BLOCKlET_TIME, - scanTime.getCount() + (System.currentTimeMillis() - startTime)); - - QueryStatistic scannedPages = queryStatisticsModel.getStatisticsTypeAndObjMap() - .get(QueryStatisticsConstants.PAGE_SCANNED); - scannedPages.addCountStatistic(QueryStatisticsConstants.PAGE_SCANNED, - scannedPages.getCount() + bitSetGroup.getScannedPages()); - return createEmptyResult(); - } - - AbstractScannedResult scannedResult = new FilterQueryScannedResult(blockExecutionInfo); - scannedResult.setBlockletId( - blockExecutionInfo.getBlockId() + CarbonCommonConstants.FILE_SEPARATOR + blocksChunkHolder - .getDataBlock().blockletId()); - // valid scanned blocklet - QueryStatistic validScannedBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap() - .get(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM); - validScannedBlockletStatistic - .addCountStatistic(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM, - validScannedBlockletStatistic.getCount() + 1); - // adding statistics for valid number of pages - QueryStatistic validPages = queryStatisticsModel.getStatisticsTypeAndObjMap() - .get(QueryStatisticsConstants.VALID_PAGE_SCANNED); - validPages.addCountStatistic(QueryStatisticsConstants.VALID_PAGE_SCANNED, - validPages.getCount() + bitSetGroup.getValidPages()); - QueryStatistic scannedPages = queryStatisticsModel.getStatisticsTypeAndObjMap() - .get(QueryStatisticsConstants.PAGE_SCANNED); - scannedPages.addCountStatistic(QueryStatisticsConstants.PAGE_SCANNED, - scannedPages.getCount() + bitSetGroup.getScannedPages()); - int[] rowCount = new int[bitSetGroup.getNumberOfPages()]; - // get the row indexes from bot set - int[][] indexesGroup = new int[bitSetGroup.getNumberOfPages()][]; - for (int k = 0; k < indexesGroup.length; k++) { - BitSet bitSet = bitSetGroup.getBitSet(k); - if (bitSet != null && !bitSet.isEmpty()) { - int[] indexes = new int[bitSet.cardinality()]; - int index = 0; - for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) { - indexes[index++] = i; - } - rowCount[k] = indexes.length; - indexesGroup[k] = indexes; - } - } - FileHolder fileReader = blocksChunkHolder.getFileReader(); - int[][] allSelectedDimensionBlocksIndexes = - blockExecutionInfo.getAllSelectedDimensionBlocksIndexes(); - long dimensionReadTime = System.currentTimeMillis(); - DimensionRawColumnChunk[] projectionListDimensionChunk = blocksChunkHolder.getDataBlock() - .getDimensionChunks(fileReader, allSelectedDimensionBlocksIndexes); - dimensionReadTime = System.currentTimeMillis() - dimensionReadTime; - - DimensionRawColumnChunk[] dimensionRawColumnChunks = - new DimensionRawColumnChunk[blockExecutionInfo.getTotalNumberDimensionBlock()]; - // read dimension chunk blocks from file which is not present - for (int i = 0; i < dimensionRawColumnChunks.length; i++) { - if (null != blocksChunkHolder.getDimensionRawDataChunk()[i]) { - dimensionRawColumnChunks[i] = blocksChunkHolder.getDimensionRawDataChunk()[i]; - } - } - for (int i = 0; i < allSelectedDimensionBlocksIndexes.length; i++) { - for (int j = allSelectedDimensionBlocksIndexes[i][0]; - j <= allSelectedDimensionBlocksIndexes[i][1]; j++) { - dimensionRawColumnChunks[j] = projectionListDimensionChunk[j]; - } - } - long dimensionReadTime1 = System.currentTimeMillis(); - /** - * in case projection if the projected dimension are not loaded in the dimensionColumnDataChunk - * then loading them - */ - int[] projectionListDimensionIndexes = blockExecutionInfo.getProjectionListDimensionIndexes(); - int projectionListDimensionIndexesLength = projectionListDimensionIndexes.length; - for (int i = 0; i < projectionListDimensionIndexesLength; i++) { - if (null == dimensionRawColumnChunks[projectionListDimensionIndexes[i]]) { - dimensionRawColumnChunks[projectionListDimensionIndexes[i]] = - blocksChunkHolder.getDataBlock() - .getDimensionChunk(fileReader, projectionListDimensionIndexes[i]); - } - } - dimensionReadTime += (System.currentTimeMillis() - dimensionReadTime1); - dimensionReadTime1 = System.currentTimeMillis(); - MeasureRawColumnChunk[] measureRawColumnChunks = - new MeasureRawColumnChunk[blockExecutionInfo.getTotalNumberOfMeasureBlock()]; - int[][] allSelectedMeasureBlocksIndexes = - blockExecutionInfo.getAllSelectedMeasureBlocksIndexes(); - MeasureRawColumnChunk[] projectionListMeasureChunk = blocksChunkHolder.getDataBlock() - .getMeasureChunks(fileReader, allSelectedMeasureBlocksIndexes); - dimensionReadTime += System.currentTimeMillis() - dimensionReadTime1; - // read the measure chunk blocks which is not present - for (int i = 0; i < measureRawColumnChunks.length; i++) { - if (null != blocksChunkHolder.getMeasureRawDataChunk()[i]) { - measureRawColumnChunks[i] = blocksChunkHolder.getMeasureRawDataChunk()[i]; - } - } - for (int i = 0; i < allSelectedMeasureBlocksIndexes.length; i++) { - for (int j = allSelectedMeasureBlocksIndexes[i][0]; - j <= allSelectedMeasureBlocksIndexes[i][1]; j++) { - measureRawColumnChunks[j] = projectionListMeasureChunk[j]; - } - } - dimensionReadTime1 = System.currentTimeMillis(); - /** - * in case projection if the projected measure are not loaded in the ColumnPage - * then loading them - */ - int[] projectionListMeasureIndexes = blockExecutionInfo.getProjectionListMeasureIndexes(); - int projectionListMeasureIndexesLength = projectionListMeasureIndexes.length; - for (int i = 0; i < projectionListMeasureIndexesLength; i++) { - if (null == measureRawColumnChunks[projectionListMeasureIndexes[i]]) { - measureRawColumnChunks[projectionListMeasureIndexes[i]] = blocksChunkHolder.getDataBlock() - .getMeasureChunk(fileReader, projectionListMeasureIndexes[i]); - } - } - dimensionReadTime += System.currentTimeMillis() - dimensionReadTime1; - DimensionColumnDataChunk[][] dimensionColumnDataChunks = - new DimensionColumnDataChunk[dimensionRawColumnChunks.length][indexesGroup.length]; - ColumnPage[][] columnPages = - new ColumnPage[measureRawColumnChunks.length][indexesGroup.length]; - for (int i = 0; i < dimensionRawColumnChunks.length; i++) { - if (dimensionRawColumnChunks[i] != null) { - for (int j = 0; j < indexesGroup.length; j++) { - dimensionColumnDataChunks[i][j] = dimensionRawColumnChunks[i].convertToDimColDataChunk(j); - } - } - } - for (int i = 0; i < measureRawColumnChunks.length; i++) { - if (measureRawColumnChunks[i] != null) { - for (int j = 0; j < indexesGroup.length; j++) { - columnPages[i][j] = measureRawColumnChunks[i].convertToColumnPage(j); - } - } - } - scannedResult.setDimensionChunks(dimensionColumnDataChunks); - scannedResult.setIndexes(indexesGroup); - scannedResult.setMeasureChunks(columnPages); - scannedResult.setDimRawColumnChunks(dimensionRawColumnChunks); - scannedResult.setMsrRawColumnChunks(measureRawColumnChunks); - scannedResult.setNumberOfRows(rowCount); - // adding statistics for carbon scan time - QueryStatistic scanTime = queryStatisticsModel.getStatisticsTypeAndObjMap() - .get(QueryStatisticsConstants.SCAN_BLOCKlET_TIME); - scanTime.addCountStatistic(QueryStatisticsConstants.SCAN_BLOCKlET_TIME, - scanTime.getCount() + (System.currentTimeMillis() - startTime - dimensionReadTime)); - QueryStatistic readTime = queryStatisticsModel.getStatisticsTypeAndObjMap() - .get(QueryStatisticsConstants.READ_BLOCKlET_TIME); - readTime.addCountStatistic(QueryStatisticsConstants.READ_BLOCKlET_TIME, - readTime.getCount() + dimensionReadTime); - return scannedResult; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java deleted file mode 100644 index 1373ed5..0000000 --- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/NonFilterScanner.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.core.scan.scanner.impl; - -import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; -import org.apache.carbondata.core.scan.scanner.AbstractBlockletScanner; -import org.apache.carbondata.core.stats.QueryStatisticsModel; - -/** - * Non filter processor which will be used for non filter query - * In case of non filter query we just need to read all the blocks requested in the - * query and pass it to scanned result - */ -public class NonFilterScanner extends AbstractBlockletScanner { - - public NonFilterScanner(BlockExecutionInfo blockExecutionInfo, - QueryStatisticsModel queryStatisticsModel) { - super(blockExecutionInfo); - super.queryStatisticsModel = queryStatisticsModel; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java index 2f981b5..6faae03 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java @@ -30,22 +30,17 @@ public class ByteArrayWrapper implements Comparable<ByteArrayWrapper>, Serializa * to store key which is generated using * key generator */ - protected byte[] dictionaryKey; + private byte[] dictionaryKey; /** * to store no dictionary column data */ - protected byte[][] complexTypesKeys; + private byte[][] complexTypesKeys; /** * to store no dictionary column data */ - protected byte[][] noDictionaryKeys; - - /** - * contains value of implicit columns in byte array format - */ - protected byte[] implicitColumnByteArray; + private byte[][] noDictionaryKeys; public ByteArrayWrapper() { } @@ -91,16 +86,6 @@ public class ByteArrayWrapper implements Comparable<ByteArrayWrapper>, Serializa } /** - * to get the no dictionary column data - * - * @param index of the no dictionary key - * @return no dictionary key for the index - */ - public byte[] getComplexTypeByIndex(int index) { - return this.complexTypesKeys[index]; - } - - /** * to generate the hash code */ @Override public int hashCode() { @@ -201,30 +186,10 @@ public class ByteArrayWrapper implements Comparable<ByteArrayWrapper>, Serializa } /** - * @return the complexTypesKeys - */ - public byte[][] getComplexTypesKeys() { - return complexTypesKeys; - } - - /** * @param complexTypesKeys the complexTypesKeys to set */ public void setComplexTypesKeys(byte[][] complexTypesKeys) { this.complexTypesKeys = complexTypesKeys; } - /** - * @return - */ - public byte[] getImplicitColumnByteArray() { - return implicitColumnByteArray; - } - - /** - * @param implicitColumnByteArray - */ - public void setImplicitColumnByteArray(byte[] implicitColumnByteArray) { - this.implicitColumnByteArray = implicitColumnByteArray; - } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/core/src/main/java/org/apache/carbondata/core/stats/PartitionStatistic.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/stats/PartitionStatistic.java b/core/src/main/java/org/apache/carbondata/core/stats/PartitionStatistic.java deleted file mode 100644 index 8a37d01..0000000 --- a/core/src/main/java/org/apache/carbondata/core/stats/PartitionStatistic.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.core.stats; - -import java.io.Serializable; - -public class PartitionStatistic implements Serializable { - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsModel.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsModel.java b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsModel.java index 55f0882..ed60d37 100644 --- a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsModel.java +++ b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsModel.java @@ -37,7 +37,4 @@ public class QueryStatisticsModel { return statisticsTypeAndObjMap; } - public void setStatisticsTypeAndObjMap(Map<String, QueryStatistic> statisticsTypeAndObjMap) { - this.statisticsTypeAndObjMap = statisticsTypeAndObjMap; - } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java index b6a9e36..d6671b4 100644 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java @@ -345,15 +345,6 @@ public class LoadMetadataDetails implements Serializable { } /** - * To get isDeleted property. - * - * @return isDeleted - */ - public String getIsDeleted() { - return isDeleted; - } - - /** * To set isDeleted property. * * @param isDeleted http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java index 76c2dc7..3c991e0 100755 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java @@ -680,22 +680,6 @@ public class SegmentStatusManager { return ""; } - /** - * getting the task numbers present in the segment. - * @param segmentId - * @return - */ - public List<String> getUpdatedTasksDetailsForSegment(String segmentId, SegmentUpdateStatusManager - updateStatusManager) { - List<String> taskList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - List<String> list = updateStatusManager.getUpdateDeltaFiles(segmentId); - for (String eachFileName : list) { - taskList.add(CarbonTablePath.DataFileUtil.getTaskNo(eachFileName)); - } - return taskList; - } - - public static class ValidAndInvalidSegmentsInfo { private final List<Segment> listOfValidSegments; private final List<Segment> listOfValidUpdatedSegments; http://git-wip-us.apache.org/repos/asf/carbondata/blob/636eb799/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java index 2edb379..019a20c 100644 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java @@ -48,7 +48,6 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil; import org.apache.carbondata.core.mutate.SegmentUpdateDetails; import org.apache.carbondata.core.mutate.TupleIdEnum; import org.apache.carbondata.core.mutate.UpdateVO; -import org.apache.carbondata.core.reader.CarbonDeleteFilesDataReader; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; @@ -145,14 +144,6 @@ public class SegmentUpdateStatusManager { } /** - * - * @param loadMetadataDetails - */ - public void setLoadMetadataDetails(LoadMetadataDetails[] loadMetadataDetails) { - this.segmentDetails = loadMetadataDetails; - } - - /** * Returns the UpdateStatus Details. * @return */ @@ -179,18 +170,6 @@ public class SegmentUpdateStatusManager { } /** - * Returns all delete delta files of specified block - * - * @param tupleId - * @return - * @throws Exception - */ - public List<String> getDeleteDeltaFiles(String tupleId) throws Exception { - return getDeltaFiles(tupleId, CarbonCommonConstants.DELETE_DELTA_FILE_EXT); - } - - - /** * Returns all update delta files of specified Segment. * * @param segmentId @@ -253,20 +232,6 @@ public class SegmentUpdateStatusManager { } /** - * Returns all deleted records of specified block - * - * @param tupleId - * @return - * @throws Exception - */ - public Map<Integer, Integer[]> getDeleteDeltaDataFromAllFiles(String tupleId) throws Exception { - List<String> deltaFiles = getDeltaFiles(tupleId, CarbonCommonConstants.DELETE_DELTA_FILE_EXT); - CarbonDeleteFilesDataReader dataReader = new CarbonDeleteFilesDataReader(); - String blockletId = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.BLOCKLET_ID); - return dataReader.getDeleteDataFromAllFiles(deltaFiles, blockletId); - } - - /** * Below method will be used to get all the delete delta files based on block name * * @param blockFilePath actual block filePath @@ -788,41 +753,6 @@ public class SegmentUpdateStatusManager { } /** - * compares passed time stamp with status file delete timestamp and - * returns latest timestamp from status file if both are not equal - * returns null otherwise - * - * @param completeBlockName - * @param timestamp - * @return - */ - public String getTimestampForRefreshCache(String completeBlockName, String timestamp) { - long cacheTimestamp = 0; - if (null != timestamp) { - cacheTimestamp = CarbonUpdateUtil.getTimeStampAsLong(timestamp); - } - String blockName = CarbonTablePath.addDataPartPrefix(CarbonUpdateUtil.getBlockName( - CarbonUpdateUtil.getRequiredFieldFromTID(completeBlockName, TupleIdEnum.BLOCK_ID))); - String segmentId = - CarbonUpdateUtil.getRequiredFieldFromTID(completeBlockName, TupleIdEnum.SEGMENT_ID); - SegmentUpdateDetails[] listOfSegmentUpdateDetailsArray = - readLoadMetadata(); - for (SegmentUpdateDetails block : listOfSegmentUpdateDetailsArray) { - if (segmentId.equalsIgnoreCase(block.getSegmentName()) && - block.getBlockName().equalsIgnoreCase(blockName) && - !CarbonUpdateUtil.isBlockInvalid(block.getSegmentStatus())) { - long deleteTimestampFromStatusFile = block.getDeleteDeltaEndTimeAsLong(); - if (Long.compare(deleteTimestampFromStatusFile, cacheTimestamp) == 0) { - return null; - } else { - return block.getDeleteDeltaEndTimestamp(); - } - } - } - return null; - } - - /** * This method closes the streams * * @param streams - streams to close. @@ -841,85 +771,7 @@ public class SegmentUpdateStatusManager { } } } - /** - * Get the invalid tasks in that segment. - * @param segmentId - * @return - */ - public List<String> getInvalidBlockList(String segmentId) { - - // get the original fact file timestamp from the table status file. - List<String> listOfInvalidBlocks = new ArrayList<String>(); - SegmentStatusManager ssm = new SegmentStatusManager(absoluteTableIdentifier); - CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(absoluteTableIdentifier.getTablePath(), - absoluteTableIdentifier.getCarbonTableIdentifier()); - LoadMetadataDetails[] segmentDetails = - ssm.readLoadMetadata(carbonTablePath.getMetadataDirectoryPath()); - long timestampOfOriginalFacts = 0; - - String startTimestampOfUpdate = "" ; - String endTimestampOfUpdate = ""; - - for (LoadMetadataDetails segment : segmentDetails) { - // find matching segment and return timestamp. - if (segment.getLoadName().equalsIgnoreCase(segmentId)) { - timestampOfOriginalFacts = segment.getLoadStartTime(); - startTimestampOfUpdate = segment.getUpdateDeltaStartTimestamp(); - endTimestampOfUpdate = segment.getUpdateDeltaEndTimestamp(); - } - } - - if (startTimestampOfUpdate.isEmpty()) { - return listOfInvalidBlocks; - - } - - // now after getting the original fact timestamp, what ever is remaining - // files need to cross check it with table status file. - - // filter out the fact files. - - String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId); - CarbonFile segDir = - FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath)); - - final Long endTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(endTimestampOfUpdate); - final Long startTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(startTimestampOfUpdate); - final Long timeStampOriginalFactFinal = - timestampOfOriginalFacts; - - CarbonFile[] files = segDir.listFiles(new CarbonFileFilter() { - - @Override public boolean accept(CarbonFile pathName) { - String fileName = pathName.getName(); - if (fileName.endsWith(CarbonCommonConstants.UPDATE_DELTA_FILE_EXT)) { - String firstPart = fileName.substring(0, fileName.indexOf('.')); - - long timestamp = Long.parseLong(firstPart - .substring(firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1, - firstPart.length())); - if (Long.compare(timestamp, endTimeStampFinal) <= 0 - && Long.compare(timestamp, startTimeStampFinal) >= 0) { - return false; - } - if (Long.compare(timestamp, timeStampOriginalFactFinal) == 0) { - return false; - } - // take the rest of files as they are invalid. - return true; - } - return false; - } - }); - // gather the task numbers. - for (CarbonFile updateFiles : files) { - listOfInvalidBlocks.add(updateFiles.getName()); - } - - return listOfInvalidBlocks; - } /** * Returns the invalid timestamp range of a segment. * @param segmentId @@ -945,12 +797,11 @@ public class SegmentUpdateStatusManager { } /** * - * @param segmentId * @param block * @param needCompleteList * @return */ - public CarbonFile[] getDeleteDeltaInvalidFilesList(final String segmentId, + public CarbonFile[] getDeleteDeltaInvalidFilesList( final SegmentUpdateDetails block, final boolean needCompleteList, CarbonFile[] allSegmentFiles, boolean isAbortedFile) { @@ -996,12 +847,11 @@ public class SegmentUpdateStatusManager { /** * - * @param blockName * @param allSegmentFiles * @return */ - public CarbonFile[] getAllBlockRelatedFiles(String blockName, CarbonFile[] allSegmentFiles, - String actualBlockName) { + public CarbonFile[] getAllBlockRelatedFiles(CarbonFile[] allSegmentFiles, + String actualBlockName) { List<CarbonFile> files = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); for (CarbonFile eachFile : allSegmentFiles) {