http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java new file mode 100644 index 0000000..fde4e55 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/DataBlockIterator.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.scan.processor; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.core.datastore.DataRefNode; +import org.apache.carbondata.core.datastore.FileReader; +import org.apache.carbondata.core.scan.collector.ResultCollectorFactory; +import org.apache.carbondata.core.scan.collector.ScannedResultCollector; +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.result.BlockletScannedResult; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; +import org.apache.carbondata.core.scan.scanner.BlockletScanner; +import org.apache.carbondata.core.scan.scanner.impl.BlockletFilterScanner; +import org.apache.carbondata.core.scan.scanner.impl.BlockletFullScanner; +import org.apache.carbondata.core.stats.QueryStatisticsModel; +import org.apache.carbondata.core.util.TaskMetricsMap; + +/** + * This abstract class provides a skeletal implementation of the + * Block iterator. + */ +public class DataBlockIterator extends CarbonIterator<List<Object[]>> { + + /** + * iterator which will be used to iterate over blocklets + */ + private BlockletIterator blockletIterator; + + /** + * result collector which will be used to aggregate the scanned result + */ + private ScannedResultCollector scannerResultAggregator; + + /** + * processor which will be used to process the block processing can be + * filter processing or non filter processing + */ + private BlockletScanner blockletScanner; + + /** + * batch size of result + */ + private int batchSize; + + private ExecutorService executorService; + + private Future<BlockletScannedResult> future; + + private Future<RawBlockletColumnChunks> futureIo; + + private BlockletScannedResult scannedResult; + + private BlockExecutionInfo blockExecutionInfo; + + private FileReader fileReader; + + private AtomicBoolean nextBlock; + + private AtomicBoolean nextRead; + + public DataBlockIterator(BlockExecutionInfo blockExecutionInfo, FileReader fileReader, + int batchSize, QueryStatisticsModel queryStatisticsModel, ExecutorService executorService) { + this.blockExecutionInfo = blockExecutionInfo; + this.fileReader = fileReader; + blockletIterator = new BlockletIterator(blockExecutionInfo.getFirstDataBlock(), + blockExecutionInfo.getNumberOfBlockToScan()); + if (blockExecutionInfo.getFilterExecuterTree() != null) { + blockletScanner = new BlockletFilterScanner(blockExecutionInfo, queryStatisticsModel); + } else { + blockletScanner = new BlockletFullScanner(blockExecutionInfo, queryStatisticsModel); + } + this.scannerResultAggregator = + ResultCollectorFactory.getScannedResultCollector(blockExecutionInfo); + this.batchSize = batchSize; + this.executorService = executorService; + this.nextBlock = new AtomicBoolean(false); + this.nextRead = new AtomicBoolean(false); + } + + @Override + public List<Object[]> next() { + List<Object[]> collectedResult = null; + if (updateScanner()) { + collectedResult = this.scannerResultAggregator.collectResultInRow(scannedResult, batchSize); + while (collectedResult.size() < batchSize && updateScanner()) { + List<Object[]> data = this.scannerResultAggregator + .collectResultInRow(scannedResult, batchSize - collectedResult.size()); + collectedResult.addAll(data); + } + } else { + collectedResult = new ArrayList<>(); + } + return collectedResult; + } + + @Override + public boolean hasNext() { + if (scannedResult != null && scannedResult.hasNext()) { + return true; + } else { + if (null != scannedResult) { + scannedResult.freeMemory(); + } + return blockletIterator.hasNext() || nextBlock.get() || nextRead.get(); + } + } + + /** + * Return true if scan result if non-empty + */ + private boolean updateScanner() { + try { + if (scannedResult != null && scannedResult.hasNext()) { + return true; + } else { + scannedResult = processNextBlocklet(); + while (scannedResult != null) { + if (scannedResult.hasNext()) { + return true; + } + scannedResult = processNextBlocklet(); + } + nextBlock.set(false); + nextRead.set(false); + return false; + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + private BlockletScannedResult processNextBlocklet() throws Exception { + BlockletScannedResult result = null; + if (blockExecutionInfo.isPrefetchBlocklet()) { + if (blockletIterator.hasNext() || nextBlock.get() || nextRead.get()) { + if (future == null) { + future = scanNextBlockletAsync(); + } + result = future.get(); + nextBlock.set(false); + if (blockletIterator.hasNext() || nextRead.get()) { + nextBlock.set(true); + future = scanNextBlockletAsync(); + } + } + } else { + if (blockletIterator.hasNext()) { + RawBlockletColumnChunks rawChunks = readNextBlockletColumnChunks(); + if (rawChunks != null) { + result = blockletScanner.scanBlocklet(rawChunks); + } + } + } + return result; + } + + private RawBlockletColumnChunks readNextBlockletColumnChunks() throws IOException { + RawBlockletColumnChunks rawBlockletColumnChunks = getNextBlockletColumnChunks(); + if (rawBlockletColumnChunks != null) { + blockletScanner.readBlocklet(rawBlockletColumnChunks); + return rawBlockletColumnChunks; + } + return null; + } + + private RawBlockletColumnChunks getNextBlockletColumnChunks() { + RawBlockletColumnChunks rawBlockletColumnChunks = null; + do { + DataRefNode dataBlock = blockletIterator.next(); + if (dataBlock.getColumnsMaxValue() == null || blockletScanner.isScanRequired(dataBlock)) { + rawBlockletColumnChunks = RawBlockletColumnChunks.newInstance( + blockExecutionInfo.getTotalNumberDimensionToRead(), + blockExecutionInfo.getTotalNumberOfMeasureToRead(), fileReader, dataBlock); + } + } while (rawBlockletColumnChunks == null && blockletIterator.hasNext()); + return rawBlockletColumnChunks; + } + + private Future<BlockletScannedResult> scanNextBlockletAsync() { + return executorService.submit(new Callable<BlockletScannedResult>() { + @Override public BlockletScannedResult call() throws Exception { + if (futureIo == null) { + futureIo = readNextBlockletAsync(); + } + RawBlockletColumnChunks rawBlockletColumnChunks = futureIo.get(); + futureIo = null; + nextRead.set(false); + if (rawBlockletColumnChunks != null) { + if (blockletIterator.hasNext()) { + nextRead.set(true); + futureIo = readNextBlockletAsync(); + } + return blockletScanner.scanBlocklet(rawBlockletColumnChunks); + } + return null; + } + }); + } + + private Future<RawBlockletColumnChunks> readNextBlockletAsync() { + return executorService.submit(new Callable<RawBlockletColumnChunks>() { + @Override public RawBlockletColumnChunks call() throws Exception { + try { + TaskMetricsMap.getInstance().registerThreadCallback(); + if (blockletIterator.hasNext()) { + return readNextBlockletColumnChunks(); + } else { + return null; + } + } finally { + // update read bytes metrics for this thread + TaskMetricsMap.getInstance().updateReadBytes(Thread.currentThread().getId()); + } + } + }); + } + + public void processNextBatch(CarbonColumnarBatch columnarBatch) { + if (updateScanner()) { + this.scannerResultAggregator.collectResultInColumnarBatch(scannedResult, columnarBatch); + } + } + + + /** + * Close the resources + */ + public void close() { + // free the current scanned result + if (null != scannedResult && !scannedResult.hasNext()) { + scannedResult.freeMemory(); + } + // free any pre-fetched memory if present + if (null != future) { + try { + BlockletScannedResult blockletScannedResult = future.get(); + if (blockletScannedResult != null) { + blockletScannedResult.freeMemory(); + } + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/core/src/main/java/org/apache/carbondata/core/scan/processor/RawBlockletColumnChunks.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/RawBlockletColumnChunks.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/RawBlockletColumnChunks.java new file mode 100644 index 0000000..6b7e880 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/RawBlockletColumnChunks.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.scan.processor; + +import org.apache.carbondata.core.datastore.DataRefNode; +import org.apache.carbondata.core.datastore.FileReader; +import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk; +import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk; +import org.apache.carbondata.core.util.BitSetGroup; + +/** + * Contains dimension and measure raw column chunks of one blocklet + */ +public class RawBlockletColumnChunks { + + /** + * dimension column data chunk + */ + private DimensionRawColumnChunk[] dimensionRawColumnChunks; + + /** + * measure column data chunk + */ + private MeasureRawColumnChunk[] measureRawColumnChunks; + + /** + * file reader which will use to read the block from file + */ + private FileReader fileReader; + + /** + * data block + */ + private DataRefNode dataBlock; + + private BitSetGroup bitSetGroup; + + private RawBlockletColumnChunks() { } + + public static RawBlockletColumnChunks newInstance(int numberOfDimensionChunk, + int numberOfMeasureChunk, FileReader fileReader, DataRefNode dataBlock) { + RawBlockletColumnChunks instance = new RawBlockletColumnChunks(); + instance.dimensionRawColumnChunks = new DimensionRawColumnChunk[numberOfDimensionChunk]; + instance.measureRawColumnChunks = new MeasureRawColumnChunk[numberOfMeasureChunk]; + instance.fileReader = fileReader; + instance.dataBlock = dataBlock; + return instance; + } + + /** + * @return the dimensionRawColumnChunks + */ + public DimensionRawColumnChunk[] getDimensionRawColumnChunks() { + return dimensionRawColumnChunks; + } + + /** + * @param dimensionRawColumnChunks the dimensionRawColumnChunks to set + */ + public void setDimensionRawColumnChunks(DimensionRawColumnChunk[] dimensionRawColumnChunks) { + this.dimensionRawColumnChunks = dimensionRawColumnChunks; + } + + /** + * @return the measureRawColumnChunks + */ + public MeasureRawColumnChunk[] getMeasureRawColumnChunks() { + return measureRawColumnChunks; + } + + /** + * @param measureRawColumnChunks the measureRawColumnChunks to set + */ + public void setMeasureRawColumnChunks(MeasureRawColumnChunk[] measureRawColumnChunks) { + this.measureRawColumnChunks = measureRawColumnChunks; + } + + /** + * @return the fileReader + */ + public FileReader getFileReader() { + return fileReader; + } + + /** + * @return the dataBlock + */ + public DataRefNode getDataBlock() { + return dataBlock; + } + + public BitSetGroup getBitSetGroup() { + return bitSetGroup; + } + + public void setBitSetGroup(BitSetGroup bitSetGroup) { + this.bitSetGroup = bitSetGroup; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/core/src/main/java/org/apache/carbondata/core/scan/processor/impl/DataBlockIteratorImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/impl/DataBlockIteratorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/impl/DataBlockIteratorImpl.java deleted file mode 100644 index 1c97725..0000000 --- a/core/src/main/java/org/apache/carbondata/core/scan/processor/impl/DataBlockIteratorImpl.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.core.scan.processor.impl; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutorService; - -import org.apache.carbondata.core.datastore.FileHolder; -import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; -import org.apache.carbondata.core.scan.processor.AbstractDataBlockIterator; -import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; -import org.apache.carbondata.core.stats.QueryStatisticsModel; - -/** - * Below class will be used to process the block for detail query - */ -public class DataBlockIteratorImpl extends AbstractDataBlockIterator { - /** - * DataBlockIteratorImpl Constructor - * - * @param blockExecutionInfo execution information - */ - public DataBlockIteratorImpl(BlockExecutionInfo blockExecutionInfo, FileHolder fileReader, - int batchSize, QueryStatisticsModel queryStatisticsModel, ExecutorService executorService) { - super(blockExecutionInfo, fileReader, batchSize, queryStatisticsModel, executorService); - } - - /** - * It scans the block and returns the result with @batchSize - * - * @return Result of @batchSize - */ - public List<Object[]> next() { - List<Object[]> collectedResult = null; - if (updateScanner()) { - collectedResult = this.scannerResultAggregator.collectData(scannedResult, batchSize); - while (collectedResult.size() < batchSize && updateScanner()) { - List<Object[]> data = this.scannerResultAggregator - .collectData(scannedResult, batchSize - collectedResult.size()); - collectedResult.addAll(data); - } - } else { - collectedResult = new ArrayList<>(); - } - return collectedResult; - } - - public void processNextBatch(CarbonColumnarBatch columnarBatch) { - if (updateScanner()) { - this.scannerResultAggregator.collectVectorBatch(scannedResult, columnarBatch); - } - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java deleted file mode 100644 index b089fad..0000000 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java +++ /dev/null @@ -1,698 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.core.scan.result; - -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.Map; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk; -import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk; -import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk; -import org.apache.carbondata.core.datastore.page.ColumnPage; -import org.apache.carbondata.core.mutate.CarbonUpdateUtil; -import org.apache.carbondata.core.mutate.DeleteDeltaVo; -import org.apache.carbondata.core.mutate.TupleIdEnum; -import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; -import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo; -import org.apache.carbondata.core.scan.filter.GenericQueryType; -import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; -import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; -import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo; -import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonTablePath; - -/** - * Scanned result class which will store and provide the result on request - */ -public abstract class AbstractScannedResult { - - private static final LogService LOGGER = - LogServiceFactory.getLogService(AbstractScannedResult.class.getName()); - /** - * current row number - */ - protected int currentRow = -1; - - protected int pageCounter; - /** - * row mapping indexes - */ - protected int[][] rowMapping; - /** - * key size of the fixed length column - */ - private int fixedLengthKeySize; - /** - * total number of rows per page - */ - private int[] numberOfRows; - - /** - * Total number of rows. - */ - private int totalNumberOfRows; - /** - * to keep track of number of rows process - */ - protected int rowCounter; - /** - * dimension column data chunk - */ - protected DimensionColumnDataChunk[][] dimensionDataChunks; - - /** - * Raw dimension chunks; - */ - protected DimensionRawColumnChunk[] dimRawColumnChunks; - - /** - * Raw dimension chunks; - */ - protected MeasureRawColumnChunk[] msrRawColumnChunks; - /** - * measure column data chunk - */ - protected ColumnPage[][] measureDataChunks; - /** - * dictionary column block index in file - */ - protected int[] dictionaryColumnBlockIndexes; - - /** - * no dictionary column block index in file - */ - protected int[] noDictionaryColumnBlockIndexes; - - /** - * column group to is key structure info - * which will be used to get the key from the complete - * column group key - * For example if only one dimension of the column group is selected - * then from complete column group key it will be used to mask the key and - * get the particular column key - */ - protected Map<Integer, KeyStructureInfo> columnGroupKeyStructureInfo; - - /** - * - */ - private Map<Integer, GenericQueryType> complexParentIndexToQueryMap; - - private int totalDimensionsSize; - - /** - * blockedId which will be blockId + blocklet number in the block - */ - private String blockletId; - - private long rowId; - - /** - * parent block indexes - */ - private int[] complexParentBlockIndexes; - - /** - * blockletid+pageumber to deleted reocrd map - */ - private Map<String, DeleteDeltaVo> deletedRecordMap; - - /** - * current page delete delta vo - */ - private DeleteDeltaVo currentDeleteDeltaVo; - - /** - * actual blocklet number - */ - private String blockletNumber; - - public AbstractScannedResult(BlockExecutionInfo blockExecutionInfo) { - this.fixedLengthKeySize = blockExecutionInfo.getFixedLengthKeySize(); - this.noDictionaryColumnBlockIndexes = blockExecutionInfo.getNoDictionaryBlockIndexes(); - this.dictionaryColumnBlockIndexes = blockExecutionInfo.getDictionaryColumnBlockIndex(); - this.columnGroupKeyStructureInfo = blockExecutionInfo.getColumnGroupToKeyStructureInfo(); - this.complexParentIndexToQueryMap = blockExecutionInfo.getComlexDimensionInfoMap(); - this.complexParentBlockIndexes = blockExecutionInfo.getComplexColumnParentBlockIndexes(); - this.totalDimensionsSize = blockExecutionInfo.getQueryDimensions().length; - this.deletedRecordMap = blockExecutionInfo.getDeletedRecordsMap(); - } - - /** - * Below method will be used to set the dimension chunks - * which will be used to create a row - * - * @param dataChunks dimension chunks used in query - */ - public void setDimensionChunks(DimensionColumnDataChunk[][] dataChunks) { - this.dimensionDataChunks = dataChunks; - } - - /** - * Below method will be used to set the measure column chunks - * - * @param measureDataChunks measure data chunks - */ - public void setMeasureChunks(ColumnPage[][] measureDataChunks) { - this.measureDataChunks = measureDataChunks; - } - - public void setDimRawColumnChunks(DimensionRawColumnChunk[] dimRawColumnChunks) { - this.dimRawColumnChunks = dimRawColumnChunks; - } - - public void setMsrRawColumnChunks(MeasureRawColumnChunk[] msrRawColumnChunks) { - this.msrRawColumnChunks = msrRawColumnChunks; - } - - /** - * Below method will be used to get the chunk based in measure ordinal - * - * @param ordinal measure ordinal - * @return measure column chunk - */ - public ColumnPage getMeasureChunk(int ordinal) { - return measureDataChunks[ordinal][pageCounter]; - } - - /** - * Below method will be used to get the key for all the dictionary dimensions - * which is present in the query - * - * @param rowId row id selected after scanning - * @return return the dictionary key - */ - protected byte[] getDictionaryKeyArray(int rowId) { - byte[] completeKey = new byte[fixedLengthKeySize]; - int offset = 0; - for (int i = 0; i < this.dictionaryColumnBlockIndexes.length; i++) { - offset += dimensionDataChunks[dictionaryColumnBlockIndexes[i]][pageCounter] - .fillChunkData(completeKey, offset, rowId, - columnGroupKeyStructureInfo.get(dictionaryColumnBlockIndexes[i])); - } - rowCounter++; - return completeKey; - } - - /** - * Below method will be used to get the key for all the dictionary dimensions - * in integer array format which is present in the query - * - * @param rowId row id selected after scanning - * @return return the dictionary key - */ - protected int[] getDictionaryKeyIntegerArray(int rowId) { - int[] completeKey = new int[totalDimensionsSize]; - int column = 0; - for (int i = 0; i < this.dictionaryColumnBlockIndexes.length; i++) { - column = dimensionDataChunks[dictionaryColumnBlockIndexes[i]][pageCounter] - .fillConvertedChunkData(rowId, column, completeKey, - columnGroupKeyStructureInfo.get(dictionaryColumnBlockIndexes[i])); - } - rowCounter++; - return completeKey; - } - - /** - * Fill the column data of dictionary to vector - */ - public void fillColumnarDictionaryBatch(ColumnVectorInfo[] vectorInfo) { - int column = 0; - for (int i = 0; i < this.dictionaryColumnBlockIndexes.length; i++) { - column = dimensionDataChunks[dictionaryColumnBlockIndexes[i]][pageCounter] - .fillConvertedChunkData(vectorInfo, column, - columnGroupKeyStructureInfo.get(dictionaryColumnBlockIndexes[i])); - } - } - - /** - * Fill the column data to vector - */ - public void fillColumnarNoDictionaryBatch(ColumnVectorInfo[] vectorInfo) { - int column = 0; - for (int i = 0; i < this.noDictionaryColumnBlockIndexes.length; i++) { - column = dimensionDataChunks[noDictionaryColumnBlockIndexes[i]][pageCounter] - .fillConvertedChunkData(vectorInfo, column, - columnGroupKeyStructureInfo.get(noDictionaryColumnBlockIndexes[i])); - } - } - - /** - * Fill the measure column data to vector - */ - public void fillColumnarMeasureBatch(ColumnVectorInfo[] vectorInfo, int[] measuresOrdinal) { - for (int i = 0; i < measuresOrdinal.length; i++) { - vectorInfo[i].measureVectorFiller - .fillMeasureVector(measureDataChunks[measuresOrdinal[i]][pageCounter], vectorInfo[i]); - } - } - - public void fillColumnarComplexBatch(ColumnVectorInfo[] vectorInfos) { - for (int i = 0; i < vectorInfos.length; i++) { - int offset = vectorInfos[i].offset; - int len = offset + vectorInfos[i].size; - int vectorOffset = vectorInfos[i].vectorOffset; - CarbonColumnVector vector = vectorInfos[i].vector; - for (int j = offset; j < len; j++) { - ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); - DataOutputStream dataOutput = new DataOutputStream(byteStream); - try { - vectorInfos[i].genericQueryType - .parseBlocksAndReturnComplexColumnByteArray(dimRawColumnChunks, - rowMapping == null ? j : rowMapping[pageCounter][j], pageCounter, dataOutput); - Object data = vectorInfos[i].genericQueryType - .getDataBasedOnDataTypeFromSurrogates(ByteBuffer.wrap(byteStream.toByteArray())); - vector.putObject(vectorOffset++, data); - } catch (IOException e) { - LOGGER.error(e); - } finally { - CarbonUtil.closeStreams(dataOutput); - CarbonUtil.closeStreams(byteStream); - } - } - } - } - - /** - * Fill the column data to vector - */ - public void fillColumnarImplicitBatch(ColumnVectorInfo[] vectorInfo) { - for (int i = 0; i < vectorInfo.length; i++) { - ColumnVectorInfo columnVectorInfo = vectorInfo[i]; - CarbonColumnVector vector = columnVectorInfo.vector; - int offset = columnVectorInfo.offset; - int vectorOffset = columnVectorInfo.vectorOffset; - int len = offset + columnVectorInfo.size; - for (int j = offset; j < len; j++) { - // Considering only String case now as we support only - String data = getBlockletId(); - if (CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID - .equals(columnVectorInfo.dimension.getColumnName())) { - data = data + CarbonCommonConstants.FILE_SEPARATOR + pageCounter - + CarbonCommonConstants.FILE_SEPARATOR + (rowMapping == null ? - j : - rowMapping[pageCounter][j]); - } - vector.putBytes(vectorOffset++, - data.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); - } - } - } - - /** - * Just increment the counter incase of query only on measures. - */ - public void incrementCounter() { - rowCounter++; - currentRow++; - } - - /** - * Just increment the page counter and reset the remaining counters. - */ - public void incrementPageCounter() { - rowCounter = 0; - currentRow = -1; - pageCounter++; - fillDataChunks(); - if (null != deletedRecordMap) { - currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + "_" + pageCounter); - } - } - - /** - * This case is used only in case of compaction, since it does not use filter flow. - */ - public void fillDataChunks() { - freeDataChunkMemory(); - if (pageCounter >= numberOfRows.length) { - return; - } - for (int i = 0; i < dimensionDataChunks.length; i++) { - if (dimensionDataChunks[i][pageCounter] == null && dimRawColumnChunks[i] != null) { - dimensionDataChunks[i][pageCounter] = - dimRawColumnChunks[i].convertToDimColDataChunkWithOutCache(pageCounter); - } - } - - for (int i = 0; i < measureDataChunks.length; i++) { - if (measureDataChunks[i][pageCounter] == null && msrRawColumnChunks[i] != null) { - measureDataChunks[i][pageCounter] = - msrRawColumnChunks[i].convertToColumnPageWithOutCache(pageCounter); - } - } - } - - // free the memory for the last page chunk - private void freeDataChunkMemory() { - for (int i = 0; i < dimensionDataChunks.length; i++) { - if (pageCounter > 0 && dimensionDataChunks[i][pageCounter - 1] != null) { - dimensionDataChunks[i][pageCounter - 1].freeMemory(); - dimensionDataChunks[i][pageCounter - 1] = null; - } - } - for (int i = 0; i < measureDataChunks.length; i++) { - if (pageCounter > 0 && measureDataChunks[i][pageCounter - 1] != null) { - measureDataChunks[i][pageCounter - 1].freeMemory(); - measureDataChunks[i][pageCounter - 1] = null; - } - } - } - - public int numberOfpages() { - return numberOfRows.length; - } - - /** - * Get total rows in the current page - * - * @return - */ - public int getCurrentPageRowCount() { - return numberOfRows[pageCounter]; - } - - public int getCurrentPageCounter() { - return pageCounter; - } - - /** - * increment the counter. - */ - public void setRowCounter(int rowCounter) { - this.rowCounter = rowCounter; - } - - /** - * Below method will be used to get the dimension data based on dimension - * ordinal and index - * - * @param dimOrdinal dimension ordinal present in the query - * @param rowId row index - * @return dimension data based on row id - */ - protected byte[] getDimensionData(int dimOrdinal, int rowId) { - return dimensionDataChunks[dimOrdinal][pageCounter].getChunkData(rowId); - } - - /** - * Below method will be used to get the dimension key array - * for all the no dictionary dimension present in the query - * - * @param rowId row number - * @return no dictionary keys for all no dictionary dimension - */ - protected byte[][] getNoDictionaryKeyArray(int rowId) { - byte[][] noDictionaryColumnsKeys = new byte[noDictionaryColumnBlockIndexes.length][]; - int position = 0; - for (int i = 0; i < this.noDictionaryColumnBlockIndexes.length; i++) { - noDictionaryColumnsKeys[position++] = - dimensionDataChunks[noDictionaryColumnBlockIndexes[i]][pageCounter].getChunkData(rowId); - } - return noDictionaryColumnsKeys; - } - - /** - * Below method will be used to get the dimension key array - * for all the no dictionary dimension present in the query - * - * @param rowId row number - * @return no dictionary keys for all no dictionary dimension - */ - protected String[] getNoDictionaryKeyStringArray(int rowId) { - String[] noDictionaryColumnsKeys = new String[noDictionaryColumnBlockIndexes.length]; - int position = 0; - for (int i = 0; i < this.noDictionaryColumnBlockIndexes.length; i++) { - noDictionaryColumnsKeys[position++] = new String( - dimensionDataChunks[noDictionaryColumnBlockIndexes[i]][pageCounter].getChunkData(rowId), - Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)); - } - return noDictionaryColumnsKeys; - } - - /** - * @return blockletId - */ - public String getBlockletId() { - return blockletId; - } - - /** - * @param blockletId - */ - public void setBlockletId(String blockletId) { - this.blockletId = CarbonTablePath.getShortBlockId(blockletId); - blockletNumber = CarbonUpdateUtil.getRequiredFieldFromTID(blockletId, TupleIdEnum.BLOCKLET_ID); - // if deleted recors map is present for this block - // then get the first page deleted vo - if (null != deletedRecordMap) { - currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + '_' + pageCounter); - } - } - - /** - * @return blockletId - */ - public long getRowId() { - return rowId; - } - - /** - * @param rowId - */ - public void setRowId(long rowId) { - this.rowId = rowId; - } - - /** - * Below method will be used to get the complex type keys array based - * on row id for all the complex type dimension selected in query - * - * @param rowId row number - * @return complex type key array for all the complex dimension selected in query - */ - protected byte[][] getComplexTypeKeyArray(int rowId) { - byte[][] complexTypeData = new byte[complexParentBlockIndexes.length][]; - for (int i = 0; i < complexTypeData.length; i++) { - GenericQueryType genericQueryType = - complexParentIndexToQueryMap.get(complexParentBlockIndexes[i]); - ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); - DataOutputStream dataOutput = new DataOutputStream(byteStream); - try { - genericQueryType - .parseBlocksAndReturnComplexColumnByteArray(dimRawColumnChunks, rowId, pageCounter, - dataOutput); - complexTypeData[i] = byteStream.toByteArray(); - } catch (IOException e) { - LOGGER.error(e); - } finally { - CarbonUtil.closeStreams(dataOutput); - CarbonUtil.closeStreams(byteStream); - } - } - return complexTypeData; - } - - /** - * @return return the total number of row after scanning - */ - public int numberOfOutputRows() { - return this.totalNumberOfRows; - } - - /** - * to check whether any more row is present in the result - * - * @return - */ - public boolean hasNext() { - if (pageCounter < numberOfRows.length && rowCounter < this.numberOfRows[pageCounter]) { - return true; - } else if (pageCounter < numberOfRows.length) { - pageCounter++; - fillDataChunks(); - rowCounter = 0; - currentRow = -1; - if (null != deletedRecordMap) { - currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + "_" + pageCounter); - } - return hasNext(); - } - return false; - } - - /** - * Below method will be used to free the occupied memory - */ - public void freeMemory() { - // first free the dimension chunks - if (null != dimensionDataChunks) { - for (int i = 0; i < dimensionDataChunks.length; i++) { - if (null != dimensionDataChunks[i]) { - for (int j = 0; j < dimensionDataChunks[i].length; j++) { - if (null != dimensionDataChunks[i][j]) { - dimensionDataChunks[i][j].freeMemory(); - } - } - } - } - } - // free the measure data chunks - if (null != measureDataChunks) { - for (int i = 0; i < measureDataChunks.length; i++) { - if (null != measureDataChunks[i]) { - for (int j = 0; j < measureDataChunks[i].length; j++) { - if (null != measureDataChunks[i][j]) { - measureDataChunks[i][j].freeMemory(); - } - } - } - } - } - // free the raw chunks - if (null != dimRawColumnChunks) { - for (int i = 0; i < dimRawColumnChunks.length; i++) { - if (null != dimRawColumnChunks[i]) { - dimRawColumnChunks[i].freeMemory(); - } - } - } - } - - /** - * As this class will be a flyweight object so - * for one block all the blocklet scanning will use same result object - * in that case we need to reset the counter to zero so - * for new result it will give the result from zero - */ - public void reset() { - rowCounter = 0; - currentRow = -1; - pageCounter = 0; - } - - /** - * @param numberOfRows set total of number rows valid after scanning - */ - public void setNumberOfRows(int[] numberOfRows) { - this.numberOfRows = numberOfRows; - - for (int count : numberOfRows) { - totalNumberOfRows += count; - } - } - - /** - * After applying filter it will return the bit set with the valid row indexes - * so below method will be used to set the row indexes - * - * @param indexes - */ - public void setIndexes(int[][] indexes) { - this.rowMapping = indexes; - } - - public int getRowCounter() { - return rowCounter; - } - - /** - * will return the current valid row id - * - * @return valid row id - */ - public abstract int getCurrentRowId(); - - /** - * @return dictionary key array for all the dictionary dimension - * selected in query - */ - public abstract byte[] getDictionaryKeyArray(); - - /** - * @return dictionary key array for all the dictionary dimension in integer array forat - * selected in query - */ - public abstract int[] getDictionaryKeyIntegerArray(); - - /** - * Below method will be used to get the complex type key array - * - * @return complex type key array - */ - public abstract byte[][] getComplexTypeKeyArray(); - - /** - * Below method will be used to get the no dictionary key - * array for all the no dictionary dimension selected in query - * - * @return no dictionary key array for all the no dictionary dimension - */ - public abstract byte[][] getNoDictionaryKeyArray(); - - /** - * Below method will be used to get the no dictionary key - * array in string array format for all the no dictionary dimension selected in query - * - * @return no dictionary key array for all the no dictionary dimension - */ - public abstract String[] getNoDictionaryKeyStringArray(); - - /** - * Mark the filtered rows in columnar batch. These rows will not be added to vector batches later. - * @param columnarBatch - * @param startRow - * @param size - * @param vectorOffset - */ - public int markFilteredRows(CarbonColumnarBatch columnarBatch, int startRow, int size, - int vectorOffset) { - int rowsFiltered = 0; - if (currentDeleteDeltaVo != null) { - int len = startRow + size; - for (int i = startRow; i < len; i++) { - int rowId = rowMapping != null ? rowMapping[pageCounter][i] : i; - if (currentDeleteDeltaVo.containsRow(rowId)) { - columnarBatch.markFiltered(vectorOffset); - rowsFiltered++; - } - vectorOffset++; - } - } - return rowsFiltered; - } - - /** - * Below method will be used to check row got deleted - * - * @param rowId - * @return is present in deleted row - */ - public boolean containsDeletedRow(int rowId) { - if (null != currentDeleteDeltaVo) { - return currentDeleteDeltaVo.containsRow(rowId); - } - return false; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/core/src/main/java/org/apache/carbondata/core/scan/result/BatchResult.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/BatchResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/BatchResult.java deleted file mode 100644 index 56ca2ac..0000000 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/BatchResult.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.core.scan.result; - -import java.util.ArrayList; -import java.util.List; -import java.util.NoSuchElementException; - -import org.apache.carbondata.common.CarbonIterator; - -/** - * Below class holds the query result - */ -public class BatchResult extends CarbonIterator<Object[]> { - - /** - * list of keys - */ - protected List<Object[]> rows; - - /** - * counter to check whether all the records are processed or not - */ - protected int counter; - - public BatchResult() { - this.rows = new ArrayList<>(); - } - - /** - * Below method will be used to get the rows - * - * @return - */ - public List<Object[]> getRows() { - return rows; - } - - /** - * Below method will be used to get the set the values - * - * @param rows - */ - public void setRows(List<Object[]> rows) { - this.rows = rows; - } - - /** - * This method will return one row at a time based on the counter given. - * @param counter - * @return - */ - public Object[] getRawRow(int counter) { - return rows.get(counter); - } - - /** - * For getting the total size. - * @return - */ - public int getSize() { - return rows.size(); - } - - - /** - * Returns {@code true} if the iteration has more elements. - * - * @return {@code true} if the iteration has more elements - */ - @Override public boolean hasNext() { - return counter < rows.size(); - } - - /** - * Returns the next element in the iteration. - * - * @return the next element in the iteration - */ - @Override public Object[] next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - Object[] row = rows.get(counter); - counter++; - return row; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java new file mode 100644 index 0000000..29404b4 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java @@ -0,0 +1,618 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.scan.result; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Map; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage; +import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk; +import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk; +import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.mutate.DeleteDeltaVo; +import org.apache.carbondata.core.mutate.TupleIdEnum; +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo; +import org.apache.carbondata.core.scan.filter.GenericQueryType; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; +import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +/** + * Scanned result class which will store and provide the result on request + */ +public abstract class BlockletScannedResult { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(BlockletScannedResult.class.getName()); + /** + * current row number + */ + protected int currentRow = -1; + + protected int pageCounter; + /** + * matched rowId for each page + */ + protected int[][] pageFilteredRowId; + /** + * key size of the fixed length column + */ + private int fixedLengthKeySize; + /** + * total number of filtered rows for each page + */ + private int[] pageFilteredRowCount; + + /** + * to keep track of number of rows process + */ + protected int rowCounter; + /** + * dimension column data chunk + */ + protected DimensionColumnPage[][] dimensionColumnPages; + + /** + * Raw dimension chunks; + */ + protected DimensionRawColumnChunk[] dimRawColumnChunks; + + /** + * Raw dimension chunks; + */ + protected MeasureRawColumnChunk[] msrRawColumnChunks; + /** + * measure column data chunk + */ + protected ColumnPage[][] measureColumnPages; + /** + * dictionary column block index in file + */ + protected int[] dictionaryColumnChunkIndexes; + + /** + * no dictionary column chunk index in file + */ + protected int[] noDictionaryColumnChunkIndexes; + + /** + * column group to is key structure info + * which will be used to get the key from the complete + * column group key + * For example if only one dimension of the column group is selected + * then from complete column group key it will be used to mask the key and + * get the particular column key + */ + protected Map<Integer, KeyStructureInfo> columnGroupKeyStructureInfo; + + /** + * + */ + private Map<Integer, GenericQueryType> complexParentIndexToQueryMap; + + private int totalDimensionsSize; + + /** + * blockedId which will be blockId + blocklet number in the block + */ + private String blockletId; + + /** + * parent block indexes + */ + private int[] complexParentBlockIndexes; + + /** + * blockletid+pageumber to deleted reocrd map + */ + private Map<String, DeleteDeltaVo> deletedRecordMap; + + /** + * current page delete delta vo + */ + private DeleteDeltaVo currentDeleteDeltaVo; + + /** + * actual blocklet number + */ + private String blockletNumber; + + public BlockletScannedResult(BlockExecutionInfo blockExecutionInfo) { + this.fixedLengthKeySize = blockExecutionInfo.getFixedLengthKeySize(); + this.noDictionaryColumnChunkIndexes = blockExecutionInfo.getNoDictionaryColumnChunkIndexes(); + this.dictionaryColumnChunkIndexes = blockExecutionInfo.getDictionaryColumnChunkIndex(); + this.columnGroupKeyStructureInfo = blockExecutionInfo.getColumnGroupToKeyStructureInfo(); + this.complexParentIndexToQueryMap = blockExecutionInfo.getComlexDimensionInfoMap(); + this.complexParentBlockIndexes = blockExecutionInfo.getComplexColumnParentBlockIndexes(); + this.totalDimensionsSize = blockExecutionInfo.getProjectionDimensions().length; + this.deletedRecordMap = blockExecutionInfo.getDeletedRecordsMap(); + } + + /** + * Below method will be used to set the dimension chunks + * which will be used to create a row + * + * @param columnPages dimension chunks used in query + */ + public void setDimensionColumnPages(DimensionColumnPage[][] columnPages) { + this.dimensionColumnPages = columnPages; + } + + /** + * Below method will be used to set the measure column chunks + * + * @param columnPages measure data chunks + */ + public void setMeasureColumnPages(ColumnPage[][] columnPages) { + this.measureColumnPages = columnPages; + } + + public void setDimRawColumnChunks(DimensionRawColumnChunk[] dimRawColumnChunks) { + this.dimRawColumnChunks = dimRawColumnChunks; + } + + public void setMsrRawColumnChunks(MeasureRawColumnChunk[] msrRawColumnChunks) { + this.msrRawColumnChunks = msrRawColumnChunks; + } + + /** + * Below method will be used to get the chunk based in measure ordinal + * + * @param ordinal measure ordinal + * @return measure column chunk + */ + public ColumnPage getMeasureChunk(int ordinal) { + return measureColumnPages[ordinal][pageCounter]; + } + + /** + * Below method will be used to get the key for all the dictionary dimensions + * which is present in the query + * + * @param rowId row id selected after scanning + * @return return the dictionary key + */ + protected byte[] getDictionaryKeyArray(int rowId) { + byte[] completeKey = new byte[fixedLengthKeySize]; + int offset = 0; + for (int i = 0; i < this.dictionaryColumnChunkIndexes.length; i++) { + offset += dimensionColumnPages[dictionaryColumnChunkIndexes[i]][pageCounter].fillRawData( + rowId, offset, completeKey, + columnGroupKeyStructureInfo.get(dictionaryColumnChunkIndexes[i])); + } + rowCounter++; + return completeKey; + } + + /** + * Below method will be used to get the key for all the dictionary dimensions + * in integer array format which is present in the query + * + * @param rowId row id selected after scanning + * @return return the dictionary key + */ + protected int[] getDictionaryKeyIntegerArray(int rowId) { + int[] completeKey = new int[totalDimensionsSize]; + int column = 0; + for (int i = 0; i < this.dictionaryColumnChunkIndexes.length; i++) { + column = dimensionColumnPages[dictionaryColumnChunkIndexes[i]][pageCounter] + .fillSurrogateKey(rowId, column, completeKey, + columnGroupKeyStructureInfo.get(dictionaryColumnChunkIndexes[i])); + } + rowCounter++; + return completeKey; + } + + /** + * Fill the column data of dictionary to vector + */ + public void fillColumnarDictionaryBatch(ColumnVectorInfo[] vectorInfo) { + int column = 0; + for (int i = 0; i < this.dictionaryColumnChunkIndexes.length; i++) { + column = dimensionColumnPages[dictionaryColumnChunkIndexes[i]][pageCounter] + .fillVector(vectorInfo, column, + columnGroupKeyStructureInfo.get(dictionaryColumnChunkIndexes[i])); + } + } + + /** + * Fill the column data to vector + */ + public void fillColumnarNoDictionaryBatch(ColumnVectorInfo[] vectorInfo) { + int column = 0; + for (int i = 0; i < this.noDictionaryColumnChunkIndexes.length; i++) { + column = dimensionColumnPages[noDictionaryColumnChunkIndexes[i]][pageCounter] + .fillVector(vectorInfo, column, + columnGroupKeyStructureInfo.get(noDictionaryColumnChunkIndexes[i])); + } + } + + /** + * Fill the measure column data to vector + */ + public void fillColumnarMeasureBatch(ColumnVectorInfo[] vectorInfo, int[] measuresOrdinal) { + for (int i = 0; i < measuresOrdinal.length; i++) { + vectorInfo[i].measureVectorFiller + .fillMeasureVector(measureColumnPages[measuresOrdinal[i]][pageCounter], vectorInfo[i]); + } + } + + public void fillColumnarComplexBatch(ColumnVectorInfo[] vectorInfos) { + for (int i = 0; i < vectorInfos.length; i++) { + int offset = vectorInfos[i].offset; + int len = offset + vectorInfos[i].size; + int vectorOffset = vectorInfos[i].vectorOffset; + CarbonColumnVector vector = vectorInfos[i].vector; + for (int j = offset; j < len; j++) { + ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + DataOutputStream dataOutput = new DataOutputStream(byteStream); + try { + vectorInfos[i].genericQueryType.parseBlocksAndReturnComplexColumnByteArray( + dimRawColumnChunks, + pageFilteredRowId == null ? j : pageFilteredRowId[pageCounter][j], pageCounter, + dataOutput); + Object data = vectorInfos[i].genericQueryType + .getDataBasedOnDataTypeFromSurrogates(ByteBuffer.wrap(byteStream.toByteArray())); + vector.putObject(vectorOffset++, data); + } catch (IOException e) { + LOGGER.error(e); + } finally { + CarbonUtil.closeStreams(dataOutput); + CarbonUtil.closeStreams(byteStream); + } + } + } + } + + /** + * Fill the column data to vector + */ + public void fillColumnarImplicitBatch(ColumnVectorInfo[] vectorInfo) { + for (int i = 0; i < vectorInfo.length; i++) { + ColumnVectorInfo columnVectorInfo = vectorInfo[i]; + CarbonColumnVector vector = columnVectorInfo.vector; + int offset = columnVectorInfo.offset; + int vectorOffset = columnVectorInfo.vectorOffset; + int len = offset + columnVectorInfo.size; + for (int j = offset; j < len; j++) { + // Considering only String case now as we support only + String data = getBlockletId(); + if (CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID + .equals(columnVectorInfo.dimension.getColumnName())) { + data = data + CarbonCommonConstants.FILE_SEPARATOR + pageCounter + + CarbonCommonConstants.FILE_SEPARATOR + (pageFilteredRowId == null ? + j : + pageFilteredRowId[pageCounter][j]); + } + vector.putBytes(vectorOffset++, + data.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); + } + } + } + + /** + * Just increment the counter incase of query only on measures. + */ + public void incrementCounter() { + rowCounter++; + currentRow++; + } + + /** + * Just increment the page counter and reset the remaining counters. + */ + public void incrementPageCounter() { + rowCounter = 0; + currentRow = -1; + pageCounter++; + fillDataChunks(); + if (null != deletedRecordMap) { + currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + "_" + pageCounter); + } + } + + /** + * This case is used only in case of compaction, since it does not use filter flow. + */ + public void fillDataChunks() { + freeDataChunkMemory(); + if (pageCounter >= pageFilteredRowCount.length) { + return; + } + for (int i = 0; i < dimensionColumnPages.length; i++) { + if (dimensionColumnPages[i][pageCounter] == null && dimRawColumnChunks[i] != null) { + dimensionColumnPages[i][pageCounter] = + dimRawColumnChunks[i].convertToDimColDataChunkWithOutCache(pageCounter); + } + } + + for (int i = 0; i < measureColumnPages.length; i++) { + if (measureColumnPages[i][pageCounter] == null && msrRawColumnChunks[i] != null) { + measureColumnPages[i][pageCounter] = + msrRawColumnChunks[i].convertToColumnPageWithOutCache(pageCounter); + } + } + } + + // free the memory for the last page chunk + private void freeDataChunkMemory() { + for (int i = 0; i < dimensionColumnPages.length; i++) { + if (pageCounter > 0 && dimensionColumnPages[i][pageCounter - 1] != null) { + dimensionColumnPages[i][pageCounter - 1].freeMemory(); + dimensionColumnPages[i][pageCounter - 1] = null; + } + } + for (int i = 0; i < measureColumnPages.length; i++) { + if (pageCounter > 0 && measureColumnPages[i][pageCounter - 1] != null) { + measureColumnPages[i][pageCounter - 1].freeMemory(); + measureColumnPages[i][pageCounter - 1] = null; + } + } + } + + public int numberOfpages() { + return pageFilteredRowCount.length; + } + + /** + * Get total rows in the current page + * + * @return + */ + public int getCurrentPageRowCount() { + return pageFilteredRowCount[pageCounter]; + } + + public int getCurrentPageCounter() { + return pageCounter; + } + + /** + * increment the counter. + */ + public void setRowCounter(int rowCounter) { + this.rowCounter = rowCounter; + } + + /** + * Below method will be used to get the dimension key array + * for all the no dictionary dimension present in the query + * + * @param rowId row number + * @return no dictionary keys for all no dictionary dimension + */ + protected byte[][] getNoDictionaryKeyArray(int rowId) { + byte[][] noDictionaryColumnsKeys = new byte[noDictionaryColumnChunkIndexes.length][]; + int position = 0; + for (int i = 0; i < this.noDictionaryColumnChunkIndexes.length; i++) { + noDictionaryColumnsKeys[position++] = + dimensionColumnPages[noDictionaryColumnChunkIndexes[i]][pageCounter].getChunkData(rowId); + } + return noDictionaryColumnsKeys; + } + + /** + * @return blockletId + */ + public String getBlockletId() { + return blockletId; + } + + /** + * Set blocklet id, which looks like + * "Part0/Segment_0/part-0-0_batchno0-0-1517155583332.carbondata/0" + */ + public void setBlockletId(String blockletId) { + this.blockletId = CarbonTablePath.getShortBlockId(blockletId); + blockletNumber = CarbonUpdateUtil.getRequiredFieldFromTID(blockletId, TupleIdEnum.BLOCKLET_ID); + // if deleted recors map is present for this block + // then get the first page deleted vo + if (null != deletedRecordMap) { + currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + '_' + pageCounter); + } + } + + /** + * Below method will be used to get the complex type keys array based + * on row id for all the complex type dimension selected in query + * + * @param rowId row number + * @return complex type key array for all the complex dimension selected in query + */ + protected byte[][] getComplexTypeKeyArray(int rowId) { + byte[][] complexTypeData = new byte[complexParentBlockIndexes.length][]; + for (int i = 0; i < complexTypeData.length; i++) { + GenericQueryType genericQueryType = + complexParentIndexToQueryMap.get(complexParentBlockIndexes[i]); + ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + DataOutputStream dataOutput = new DataOutputStream(byteStream); + try { + genericQueryType + .parseBlocksAndReturnComplexColumnByteArray(dimRawColumnChunks, rowId, pageCounter, + dataOutput); + complexTypeData[i] = byteStream.toByteArray(); + } catch (IOException e) { + LOGGER.error(e); + } finally { + CarbonUtil.closeStreams(dataOutput); + CarbonUtil.closeStreams(byteStream); + } + } + return complexTypeData; + } + + /** + * to check whether any more row is present in the result + * + * @return + */ + public boolean hasNext() { + if (pageCounter + < pageFilteredRowCount.length && rowCounter < this.pageFilteredRowCount[pageCounter]) { + return true; + } else if (pageCounter < pageFilteredRowCount.length) { + pageCounter++; + fillDataChunks(); + rowCounter = 0; + currentRow = -1; + if (null != deletedRecordMap) { + currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + "_" + pageCounter); + } + return hasNext(); + } + return false; + } + + /** + * Below method will be used to free the occupied memory + */ + public void freeMemory() { + // first free the dimension chunks + if (null != dimensionColumnPages) { + for (int i = 0; i < dimensionColumnPages.length; i++) { + if (null != dimensionColumnPages[i]) { + for (int j = 0; j < dimensionColumnPages[i].length; j++) { + if (null != dimensionColumnPages[i][j]) { + dimensionColumnPages[i][j].freeMemory(); + } + } + } + } + } + // free the measure data chunks + if (null != measureColumnPages) { + for (int i = 0; i < measureColumnPages.length; i++) { + if (null != measureColumnPages[i]) { + for (int j = 0; j < measureColumnPages[i].length; j++) { + if (null != measureColumnPages[i][j]) { + measureColumnPages[i][j].freeMemory(); + } + } + } + } + } + // free the raw chunks + if (null != dimRawColumnChunks) { + for (int i = 0; i < dimRawColumnChunks.length; i++) { + if (null != dimRawColumnChunks[i]) { + dimRawColumnChunks[i].freeMemory(); + } + } + } + } + + /** + * @param pageFilteredRowCount set total of number rows valid after scanning + */ + public void setPageFilteredRowCount(int[] pageFilteredRowCount) { + this.pageFilteredRowCount = pageFilteredRowCount; + } + + /** + * After applying filter it will return the bit set with the valid row indexes + * so below method will be used to set the row indexes + */ + public void setPageFilteredRowId(int[][] pageFilteredRowId) { + this.pageFilteredRowId = pageFilteredRowId; + } + + public int getRowCounter() { + return rowCounter; + } + + /** + * will return the current valid row id + * + * @return valid row id + */ + public abstract int getCurrentRowId(); + + /** + * @return dictionary key array for all the dictionary dimension + * selected in query + */ + public abstract byte[] getDictionaryKeyArray(); + + /** + * @return dictionary key array for all the dictionary dimension in integer array forat + * selected in query + */ + public abstract int[] getDictionaryKeyIntegerArray(); + + /** + * Below method will be used to get the complex type key array + * + * @return complex type key array + */ + public abstract byte[][] getComplexTypeKeyArray(); + + /** + * Below method will be used to get the no dictionary key + * array for all the no dictionary dimension selected in query + * + * @return no dictionary key array for all the no dictionary dimension + */ + public abstract byte[][] getNoDictionaryKeyArray(); + + /** + * Mark the filtered rows in columnar batch. These rows will not be added to vector batches later. + * @param columnarBatch + * @param startRow + * @param size + * @param vectorOffset + */ + public int markFilteredRows(CarbonColumnarBatch columnarBatch, int startRow, int size, + int vectorOffset) { + int rowsFiltered = 0; + if (currentDeleteDeltaVo != null) { + int len = startRow + size; + for (int i = startRow; i < len; i++) { + int rowId = pageFilteredRowId != null ? pageFilteredRowId[pageCounter][i] : i; + if (currentDeleteDeltaVo.containsRow(rowId)) { + columnarBatch.markFiltered(vectorOffset); + rowsFiltered++; + } + vectorOffset++; + } + } + return rowsFiltered; + } + + /** + * Below method will be used to check row got deleted + * + * @param rowId + * @return is present in deleted row + */ + public boolean containsDeletedRow(int rowId) { + if (null != currentDeleteDeltaVo) { + return currentDeleteDeltaVo.containsRow(rowId); + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/core/src/main/java/org/apache/carbondata/core/scan/result/RowBatch.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/RowBatch.java b/core/src/main/java/org/apache/carbondata/core/scan/result/RowBatch.java new file mode 100644 index 0000000..c129161 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/RowBatch.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.scan.result; + +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; + +import org.apache.carbondata.common.CarbonIterator; + +/** + * Below class holds the query result + */ +public class RowBatch extends CarbonIterator<Object[]> { + + /** + * list of keys + */ + protected List<Object[]> rows; + + /** + * counter to check whether all the records are processed or not + */ + protected int counter; + + public RowBatch() { + this.rows = new ArrayList<>(); + } + + /** + * Below method will be used to get the rows + * + * @return + */ + public List<Object[]> getRows() { + return rows; + } + + /** + * Below method will be used to get the set the values + * + * @param rows + */ + public void setRows(List<Object[]> rows) { + this.rows = rows; + } + + /** + * This method will return one row at a time based on the counter given. + * @param counter + * @return + */ + public Object[] getRawRow(int counter) { + return rows.get(counter); + } + + /** + * For getting the total size. + * @return + */ + public int getSize() { + return rows.size(); + } + + + /** + * Returns {@code true} if the iteration has more elements. + * + * @return {@code true} if the iteration has more elements + */ + @Override public boolean hasNext() { + return counter < rows.size(); + } + + /** + * Returns the next element in the iteration. + * + * @return the next element in the iteration + */ + @Override public Object[] next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + Object[] row = rows.get(counter); + counter++; + return row; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java index 8120310..bcc5634 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/FilterQueryScannedResult.java @@ -17,7 +17,7 @@ package org.apache.carbondata.core.scan.result.impl; import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; -import org.apache.carbondata.core.scan.result.AbstractScannedResult; +import org.apache.carbondata.core.scan.result.BlockletScannedResult; import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo; /** @@ -25,7 +25,7 @@ import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo; * In case of filter query data will be send * based on filtered row index */ -public class FilterQueryScannedResult extends AbstractScannedResult { +public class FilterQueryScannedResult extends BlockletScannedResult { public FilterQueryScannedResult(BlockExecutionInfo tableBlockExecutionInfos) { super(tableBlockExecutionInfos); @@ -37,7 +37,7 @@ public class FilterQueryScannedResult extends AbstractScannedResult { */ @Override public byte[] getDictionaryKeyArray() { ++currentRow; - return getDictionaryKeyArray(rowMapping[pageCounter][currentRow]); + return getDictionaryKeyArray(pageFilteredRowId[pageCounter][currentRow]); } /** @@ -46,7 +46,7 @@ public class FilterQueryScannedResult extends AbstractScannedResult { */ @Override public int[] getDictionaryKeyIntegerArray() { ++currentRow; - return getDictionaryKeyIntegerArray(rowMapping[pageCounter][currentRow]); + return getDictionaryKeyIntegerArray(pageFilteredRowId[pageCounter][currentRow]); } /** @@ -55,7 +55,7 @@ public class FilterQueryScannedResult extends AbstractScannedResult { * @return complex type key array */ @Override public byte[][] getComplexTypeKeyArray() { - return getComplexTypeKeyArray(rowMapping[pageCounter][currentRow]); + return getComplexTypeKeyArray(pageFilteredRowId[pageCounter][currentRow]); } /** @@ -65,17 +65,7 @@ public class FilterQueryScannedResult extends AbstractScannedResult { * @return no dictionary key array for all the no dictionary dimension */ @Override public byte[][] getNoDictionaryKeyArray() { - return getNoDictionaryKeyArray(rowMapping[pageCounter][currentRow]); - } - - /** - * Below method will be used to get the no dictionary key - * string array for all the no dictionary dimension selected in query - * - * @return no dictionary key array for all the no dictionary dimension - */ - @Override public String[] getNoDictionaryKeyStringArray() { - return getNoDictionaryKeyStringArray(rowMapping[pageCounter][currentRow]); + return getNoDictionaryKeyArray(pageFilteredRowId[pageCounter][currentRow]); } /** @@ -84,7 +74,7 @@ public class FilterQueryScannedResult extends AbstractScannedResult { * @return valid row id */ @Override public int getCurrentRowId() { - return rowMapping[pageCounter][currentRow]; + return pageFilteredRowId[pageCounter][currentRow]; } /** @@ -92,10 +82,12 @@ public class FilterQueryScannedResult extends AbstractScannedResult { */ public void fillColumnarDictionaryBatch(ColumnVectorInfo[] vectorInfo) { int column = 0; - for (int i = 0; i < this.dictionaryColumnBlockIndexes.length; i++) { - column = dimensionDataChunks[dictionaryColumnBlockIndexes[i]][pageCounter] - .fillConvertedChunkData(rowMapping[pageCounter], vectorInfo, column, - columnGroupKeyStructureInfo.get(dictionaryColumnBlockIndexes[i])); + for (int chunkIndex : this.dictionaryColumnChunkIndexes) { + column = dimensionColumnPages[chunkIndex][pageCounter].fillVector( + pageFilteredRowId[pageCounter], + vectorInfo, + column, + columnGroupKeyStructureInfo.get(chunkIndex)); } } @@ -104,10 +96,12 @@ public class FilterQueryScannedResult extends AbstractScannedResult { */ public void fillColumnarNoDictionaryBatch(ColumnVectorInfo[] vectorInfo) { int column = 0; - for (int i = 0; i < this.noDictionaryColumnBlockIndexes.length; i++) { - column = dimensionDataChunks[noDictionaryColumnBlockIndexes[i]][pageCounter] - .fillConvertedChunkData(rowMapping[pageCounter], vectorInfo, column, - columnGroupKeyStructureInfo.get(noDictionaryColumnBlockIndexes[i])); + for (int chunkIndex : this.noDictionaryColumnChunkIndexes) { + column = dimensionColumnPages[chunkIndex][pageCounter].fillVector( + pageFilteredRowId[pageCounter], + vectorInfo, + column, + columnGroupKeyStructureInfo.get(chunkIndex)); } } @@ -116,8 +110,10 @@ public class FilterQueryScannedResult extends AbstractScannedResult { */ public void fillColumnarMeasureBatch(ColumnVectorInfo[] vectorInfo, int[] measuresOrdinal) { for (int i = 0; i < measuresOrdinal.length; i++) { - vectorInfo[i].measureVectorFiller.fillMeasureVectorForFilter(rowMapping[pageCounter], - measureDataChunks[measuresOrdinal[i]][pageCounter], vectorInfo[i]); + vectorInfo[i].measureVectorFiller.fillMeasureVector( + pageFilteredRowId[pageCounter], + measureColumnPages[measuresOrdinal[i]][pageCounter], + vectorInfo[i]); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java index 3978f9e..06687c2 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/impl/NonFilterQueryScannedResult.java @@ -17,14 +17,14 @@ package org.apache.carbondata.core.scan.result.impl; import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; -import org.apache.carbondata.core.scan.result.AbstractScannedResult; +import org.apache.carbondata.core.scan.result.BlockletScannedResult; /** * Result provide class for non filter query * In case of no filter query we need to return * complete data */ -public class NonFilterQueryScannedResult extends AbstractScannedResult { +public class NonFilterQueryScannedResult extends BlockletScannedResult { public NonFilterQueryScannedResult(BlockExecutionInfo blockExecutionInfo) { super(blockExecutionInfo); @@ -68,16 +68,6 @@ public class NonFilterQueryScannedResult extends AbstractScannedResult { } /** - * Below method will be used to get the no dictionary key - * string array for all the no dictionary dimension selected in query - * - * @return no dictionary key array for all the no dictionary dimension - */ - @Override public String[] getNoDictionaryKeyStringArray() { - return getNoDictionaryKeyStringArray(currentRow); - } - - /** * will return the current valid row id * * @return valid row id http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java index 6172b40..4e628fe 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java @@ -28,18 +28,17 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.DataRefNode; import org.apache.carbondata.core.datastore.DataRefNodeFinder; -import org.apache.carbondata.core.datastore.FileHolder; +import org.apache.carbondata.core.datastore.FileReader; import org.apache.carbondata.core.datastore.block.AbstractIndex; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.datastore.impl.btree.BTreeDataRefNodeFinder; -import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNodeWrapper; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNode; import org.apache.carbondata.core.mutate.DeleteDeltaVo; import org.apache.carbondata.core.reader.CarbonDeleteFilesDataReader; import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; import org.apache.carbondata.core.scan.executor.infos.DeleteDeltaInfo; import org.apache.carbondata.core.scan.model.QueryModel; -import org.apache.carbondata.core.scan.processor.AbstractDataBlockIterator; -import org.apache.carbondata.core.scan.processor.impl.DataBlockIteratorImpl; +import org.apache.carbondata.core.scan.processor.DataBlockIterator; import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; import org.apache.carbondata.core.stats.QueryStatistic; import org.apache.carbondata.core.stats.QueryStatisticsConstants; @@ -63,23 +62,23 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato private static final Map<DeleteDeltaInfo, Object> deleteDeltaToLockObjectMap = new ConcurrentHashMap<>(); - protected ExecutorService execService; + private ExecutorService execService; /** * execution info of the block */ - protected List<BlockExecutionInfo> blockExecutionInfos; + private List<BlockExecutionInfo> blockExecutionInfos; /** * file reader which will be used to execute the query */ - protected FileHolder fileReader; + protected FileReader fileReader; - protected AbstractDataBlockIterator dataBlockIterator; + DataBlockIterator dataBlockIterator; /** * QueryStatisticsRecorder */ - protected QueryStatisticsRecorder recorder; + private QueryStatisticsRecorder recorder; /** * number of cores which can be used */ @@ -89,7 +88,7 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato */ private QueryStatisticsModel queryStatisticsModel; - public AbstractDetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel, + AbstractDetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel, ExecutorService execService) { String batchSizeString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE); @@ -107,7 +106,6 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato this.blockExecutionInfos = infos; this.fileReader = FileFactory.getFileHolder( FileFactory.getFileType(queryModel.getAbsoluteTableIdentifier().getTablePath())); - this.fileReader.setQueryId(queryModel.getQueryId()); this.fileReader.setReadPageByPage(queryModel.isReadPageByPage()); this.execService = execService; intialiseInfos(); @@ -130,22 +128,21 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato blockInfo.setDeletedRecordsMap(deletedRowsMap); } DataRefNode dataRefNode = blockInfo.getDataBlock().getDataRefNode(); - if (dataRefNode instanceof BlockletDataRefNodeWrapper) { - BlockletDataRefNodeWrapper wrapper = (BlockletDataRefNodeWrapper) dataRefNode; - blockInfo.setFirstDataBlock(wrapper); - blockInfo.setNumberOfBlockToScan(wrapper.numberOfNodes()); - + if (dataRefNode instanceof BlockletDataRefNode) { + BlockletDataRefNode node = (BlockletDataRefNode) dataRefNode; + blockInfo.setFirstDataBlock(node); + blockInfo.setNumberOfBlockToScan(node.numberOfNodes()); } else { DataRefNode startDataBlock = finder.findFirstDataBlock(dataRefNode, blockInfo.getStartKey()); - while (startDataBlock.nodeNumber() < blockInfo.getStartBlockletIndex()) { + while (startDataBlock.nodeIndex() < blockInfo.getStartBlockletIndex()) { startDataBlock = startDataBlock.getNextDataRefNode(); } long numberOfBlockToScan = blockInfo.getNumberOfBlockletToScan(); //if number of block is less than 0 then take end block. if (numberOfBlockToScan <= 0) { DataRefNode endDataBlock = finder.findLastDataBlock(dataRefNode, blockInfo.getEndKey()); - numberOfBlockToScan = endDataBlock.nodeNumber() - startDataBlock.nodeNumber() + 1; + numberOfBlockToScan = endDataBlock.nodeIndex() - startDataBlock.nodeIndex() + 1; } blockInfo.setFirstDataBlock(startDataBlock); blockInfo.setNumberOfBlockToScan(numberOfBlockToScan); @@ -230,7 +227,8 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato } } - @Override public boolean hasNext() { + @Override + public boolean hasNext() { if ((dataBlockIterator != null && dataBlockIterator.hasNext())) { return true; } else if (blockExecutionInfos.size() > 0) { @@ -240,7 +238,7 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato } } - protected void updateDataBlockIterator() { + void updateDataBlockIterator() { if (dataBlockIterator == null || !dataBlockIterator.hasNext()) { dataBlockIterator = getDataBlockIterator(); while (dataBlockIterator != null && !dataBlockIterator.hasNext()) { @@ -249,17 +247,17 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato } } - private DataBlockIteratorImpl getDataBlockIterator() { + private DataBlockIterator getDataBlockIterator() { if (blockExecutionInfos.size() > 0) { BlockExecutionInfo executionInfo = blockExecutionInfos.get(0); blockExecutionInfos.remove(executionInfo); - return new DataBlockIteratorImpl(executionInfo, fileReader, batchSize, queryStatisticsModel, + return new DataBlockIterator(executionInfo, fileReader, batchSize, queryStatisticsModel, execService); } return null; } - protected void initQueryStatiticsModel() { + private void initQueryStatiticsModel() { this.queryStatisticsModel = new QueryStatisticsModel(); this.queryStatisticsModel.setRecorder(recorder); QueryStatistic queryStatisticTotalBlocklet = new QueryStatistic(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ChunkRowIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ChunkRowIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ChunkRowIterator.java index 1efac30..1235789 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ChunkRowIterator.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/ChunkRowIterator.java @@ -18,7 +18,7 @@ package org.apache.carbondata.core.scan.result.iterator; import org.apache.carbondata.common.CarbonIterator; -import org.apache.carbondata.core.scan.result.BatchResult; +import org.apache.carbondata.core.scan.result.RowBatch; /** * Iterator over row result @@ -28,14 +28,14 @@ public class ChunkRowIterator extends CarbonIterator<Object[]> { /** * iterator over chunk result */ - private CarbonIterator<BatchResult> iterator; + private CarbonIterator<RowBatch> iterator; /** * currect chunk */ - private BatchResult currentchunk; + private RowBatch currentchunk; - public ChunkRowIterator(CarbonIterator<BatchResult> iterator) { + public ChunkRowIterator(CarbonIterator<RowBatch> iterator) { this.iterator = iterator; if (iterator.hasNext()) { currentchunk = iterator.next(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/15b4e192/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java index 747f5a9..c073c78 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java @@ -21,14 +21,14 @@ import java.util.concurrent.ExecutorService; import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; import org.apache.carbondata.core.scan.model.QueryModel; -import org.apache.carbondata.core.scan.result.BatchResult; +import org.apache.carbondata.core.scan.result.RowBatch; /** * In case of detail query we cannot keep all the records in memory so for * executing that query are returning a iterator over block and every time next * call will come it will execute the block and return the result */ -public class DetailQueryResultIterator extends AbstractDetailQueryResultIterator<BatchResult> { +public class DetailQueryResultIterator extends AbstractDetailQueryResultIterator<RowBatch> { private final Object lock = new Object(); @@ -37,18 +37,18 @@ public class DetailQueryResultIterator extends AbstractDetailQueryResultIterator super(infos, queryModel, execService); } - @Override public BatchResult next() { + @Override public RowBatch next() { return getBatchResult(); } - private BatchResult getBatchResult() { - BatchResult batchResult = new BatchResult(); + private RowBatch getBatchResult() { + RowBatch rowBatch = new RowBatch(); synchronized (lock) { updateDataBlockIterator(); if (dataBlockIterator != null) { - batchResult.setRows(dataBlockIterator.next()); + rowBatch.setRows(dataBlockIterator.next()); } } - return batchResult; + return rowBatch; } }