Repository: carbondata Updated Branches: refs/heads/branch-1.5 442e2446e -> 951a7811a
[CARBONDATA-3145] Avoid duplicate decoding for complex column pages while querying Problem: Column page is decoded for getting each row of a complex primitive column. Solution: Decode a page it once then use the same. This closes #2975 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/523515cd Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/523515cd Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/523515cd Branch: refs/heads/branch-1.5 Commit: 523515cd6700f0c19d43de126525fd3c8dede0b9 Parents: d909482 Author: dhatchayani <dhatcha.offic...@gmail.com> Authored: Wed Dec 5 12:40:56 2018 +0530 Committer: Raghunandan S <carbondatacontributi...@gmail.com> Committed: Mon Dec 17 18:50:23 2018 +0530 ---------------------------------------------------------------------- .../core/scan/complextypes/ArrayQueryType.java | 11 ++-- .../scan/complextypes/ComplexQueryType.java | 14 +++- .../scan/complextypes/PrimitiveQueryType.java | 11 ++-- .../core/scan/complextypes/StructQueryType.java | 14 ++-- .../core/scan/filter/GenericQueryType.java | 4 +- .../executer/RowLevelFilterExecuterImpl.java | 7 +- .../core/scan/result/BlockletScannedResult.java | 68 +++++++++++++------- 7 files changed, 86 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/523515cd/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java index a5f4234..8538edb 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Map; +import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage; import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.scan.filter.GenericQueryType; @@ -62,17 +63,17 @@ public class ArrayQueryType extends ComplexQueryType implements GenericQueryType } public void parseBlocksAndReturnComplexColumnByteArray(DimensionRawColumnChunk[] rawColumnChunks, - int rowNumber, int pageNumber, DataOutputStream dataOutputStream) throws IOException { - byte[] input = copyBlockDataChunk(rawColumnChunks, rowNumber, pageNumber); + DimensionColumnPage[][] dimensionColumnPages, int rowNumber, int pageNumber, + DataOutputStream dataOutputStream) throws IOException { + byte[] input = copyBlockDataChunk(rawColumnChunks, dimensionColumnPages, rowNumber, pageNumber); ByteBuffer byteArray = ByteBuffer.wrap(input); int dataLength = byteArray.getInt(); dataOutputStream.writeInt(dataLength); if (dataLength > 0) { int dataOffset = byteArray.getInt(); for (int i = 0; i < dataLength; i++) { - children - .parseBlocksAndReturnComplexColumnByteArray(rawColumnChunks, dataOffset++, pageNumber, - dataOutputStream); + children.parseBlocksAndReturnComplexColumnByteArray(rawColumnChunks, dimensionColumnPages, + dataOffset++, pageNumber, dataOutputStream); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/523515cd/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java index 98f0715..704af89 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java @@ -19,6 +19,7 @@ package org.apache.carbondata.core.scan.complextypes; import java.io.IOException; +import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage; import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk; import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks; @@ -40,9 +41,10 @@ public class ComplexQueryType { * This method is also used by child. */ protected byte[] copyBlockDataChunk(DimensionRawColumnChunk[] rawColumnChunks, - int rowNumber, int pageNumber) { + DimensionColumnPage[][] dimensionColumnPages, int rowNumber, int pageNumber) { byte[] data = - rawColumnChunks[blockIndex].decodeColumnPage(pageNumber).getChunkData(rowNumber); + getDecodedDimensionPage(dimensionColumnPages, rawColumnChunks[blockIndex], pageNumber) + .getChunkData(rowNumber); byte[] output = new byte[data.length]; System.arraycopy(data, 0, output, 0, output.length); return output; @@ -57,4 +59,12 @@ public class ComplexQueryType { .readDimensionChunk(blockChunkHolder.getFileReader(), blockIndex); } } + + private DimensionColumnPage getDecodedDimensionPage(DimensionColumnPage[][] dimensionColumnPages, + DimensionRawColumnChunk dimensionRawColumnChunk, int pageNumber) { + if (dimensionColumnPages == null || null == dimensionColumnPages[blockIndex]) { + return dimensionRawColumnChunk.decodeColumnPage(pageNumber); + } + return dimensionColumnPages[blockIndex][pageNumber]; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/523515cd/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java index abe33c4..6347397 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.carbondata.core.cache.dictionary.Dictionary; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage; import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk; import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; @@ -93,10 +94,12 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery return 1; } - @Override public void parseBlocksAndReturnComplexColumnByteArray( - DimensionRawColumnChunk[] rawColumnChunks, int rowNumber, - int pageNumber, DataOutputStream dataOutputStream) throws IOException { - byte[] currentVal = copyBlockDataChunk(rawColumnChunks, rowNumber, pageNumber); + @Override + public void parseBlocksAndReturnComplexColumnByteArray(DimensionRawColumnChunk[] rawColumnChunks, + DimensionColumnPage[][] dimensionColumnPages, int rowNumber, int pageNumber, + DataOutputStream dataOutputStream) throws IOException { + byte[] currentVal = + copyBlockDataChunk(rawColumnChunks, dimensionColumnPages, rowNumber, pageNumber); if (!this.isDictionary && !this.isDirectDictionary) { dataOutputStream.writeShort(currentVal.length); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/523515cd/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java index c607f84..7bccbc0 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage; import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.scan.filter.GenericQueryType; @@ -79,17 +80,18 @@ public class StructQueryType extends ComplexQueryType implements GenericQueryTyp } @Override public void parseBlocksAndReturnComplexColumnByteArray( - DimensionRawColumnChunk[] dimensionColumnDataChunks, int rowNumber, - int pageNumber, DataOutputStream dataOutputStream) throws IOException { - byte[] input = copyBlockDataChunk(dimensionColumnDataChunks, rowNumber, pageNumber); + DimensionRawColumnChunk[] dimensionColumnDataChunks, + DimensionColumnPage[][] dimensionColumnPages, int rowNumber, int pageNumber, + DataOutputStream dataOutputStream) throws IOException { + byte[] input = + copyBlockDataChunk(dimensionColumnDataChunks, dimensionColumnPages, rowNumber, pageNumber); ByteBuffer byteArray = ByteBuffer.wrap(input); int childElement = byteArray.getShort(); dataOutputStream.writeShort(childElement); if (childElement > 0) { for (int i = 0; i < childElement; i++) { - children.get(i) - .parseBlocksAndReturnComplexColumnByteArray(dimensionColumnDataChunks, rowNumber, - pageNumber, dataOutputStream); + children.get(i).parseBlocksAndReturnComplexColumnByteArray(dimensionColumnDataChunks, + dimensionColumnPages, rowNumber, pageNumber, dataOutputStream); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/523515cd/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java index 6c087d7..b43062e 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Map; +import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage; import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks; @@ -41,7 +42,8 @@ public interface GenericQueryType { int getColsCount(); void parseBlocksAndReturnComplexColumnByteArray(DimensionRawColumnChunk[] rawColumnChunks, - int rowNumber, int pageNumber, DataOutputStream dataOutputStream) throws IOException; + DimensionColumnPage[][] dimensionColumnPages, int rowNumber, int pageNumber, + DataOutputStream dataOutputStream) throws IOException; void fillRequiredBlockData(RawBlockletColumnChunks blockChunkHolder) throws IOException; http://git-wip-us.apache.org/repos/asf/carbondata/blob/523515cd/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java index 7ca2579..63ae0cd 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java @@ -457,9 +457,10 @@ public class RowLevelFilterExecuterImpl implements FilterExecuter { ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); DataOutputStream dataOutputStream = new DataOutputStream(byteStream); complexType.parseBlocksAndReturnComplexColumnByteArray( - blockChunkHolder.getDimensionRawColumnChunks(), index, pageIndex, dataOutputStream); - record[dimColumnEvaluatorInfo.getRowIndex()] = complexType - .getDataBasedOnDataType(ByteBuffer.wrap(byteStream.toByteArray())); + blockChunkHolder.getDimensionRawColumnChunks(), null, index, pageIndex, + dataOutputStream); + record[dimColumnEvaluatorInfo.getRowIndex()] = + complexType.getDataBasedOnDataType(ByteBuffer.wrap(byteStream.toByteArray())); byteStream.close(); } catch (IOException e) { LOGGER.info(e.getMessage()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/523515cd/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java index bb373eb..c04df52 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java @@ -17,7 +17,6 @@ package org.apache.carbondata.core.scan.result; import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.Charset; @@ -46,6 +45,7 @@ import org.apache.carbondata.core.stats.QueryStatistic; import org.apache.carbondata.core.stats.QueryStatisticsConstants; import org.apache.carbondata.core.stats.QueryStatisticsModel; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.ReUsableByteArrayDataOutputStream; import org.apache.log4j.Logger; @@ -282,30 +282,38 @@ public abstract class BlockletScannedResult { } public void fillColumnarComplexBatch(ColumnVectorInfo[] vectorInfos) { + ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + ReUsableByteArrayDataOutputStream reuseableDataOutput = + new ReUsableByteArrayDataOutputStream(byteStream); + boolean isExceptionThrown = false; for (int i = 0; i < vectorInfos.length; i++) { int offset = vectorInfos[i].offset; int len = offset + vectorInfos[i].size; int vectorOffset = vectorInfos[i].vectorOffset; CarbonColumnVector vector = vectorInfos[i].vector; for (int j = offset; j < len; j++) { - ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); - DataOutputStream dataOutput = new DataOutputStream(byteStream); try { - vectorInfos[i].genericQueryType.parseBlocksAndReturnComplexColumnByteArray( - dimRawColumnChunks, - pageFilteredRowId == null ? j : pageFilteredRowId[pageCounter][j], pageCounter, - dataOutput); + vectorInfos[i].genericQueryType + .parseBlocksAndReturnComplexColumnByteArray(dimRawColumnChunks, dimensionColumnPages, + pageFilteredRowId == null ? j : pageFilteredRowId[pageCounter][j], pageCounter, + reuseableDataOutput); Object data = vectorInfos[i].genericQueryType - .getDataBasedOnDataType(ByteBuffer.wrap(byteStream.toByteArray())); + .getDataBasedOnDataType(ByteBuffer.wrap(reuseableDataOutput.getByteArray())); vector.putObject(vectorOffset++, data); + reuseableDataOutput.reset(); } catch (IOException e) { + isExceptionThrown = true; LOGGER.error(e); } finally { - CarbonUtil.closeStreams(dataOutput); - CarbonUtil.closeStreams(byteStream); + if (isExceptionThrown) { + CarbonUtil.closeStreams(reuseableDataOutput); + CarbonUtil.closeStreams(byteStream); + } } } } + CarbonUtil.closeStreams(reuseableDataOutput); + CarbonUtil.closeStreams(byteStream); } /** @@ -541,6 +549,10 @@ public abstract class BlockletScannedResult { */ protected List<byte[][]> getComplexTypeKeyArrayBatch() { List<byte[][]> complexTypeArrayList = new ArrayList<>(validRowIds.size()); + ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + ReUsableByteArrayDataOutputStream reUseableDataOutput = + new ReUsableByteArrayDataOutputStream(byteStream); + boolean isExceptionThrown = false; byte[][] complexTypeData = null; // everyTime it is initialized new as in case of prefetch it can modify the data for (int i = 0; i < validRowIds.size(); i++) { @@ -552,23 +564,27 @@ public abstract class BlockletScannedResult { GenericQueryType genericQueryType = complexParentIndexToQueryMap.get(complexParentBlockIndexes[i]); for (int j = 0; j < validRowIds.size(); j++) { - ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); - DataOutputStream dataOutput = new DataOutputStream(byteStream); try { genericQueryType - .parseBlocksAndReturnComplexColumnByteArray(dimRawColumnChunks, validRowIds.get(j), - pageCounter, dataOutput); + .parseBlocksAndReturnComplexColumnByteArray(dimRawColumnChunks, dimensionColumnPages, + validRowIds.get(j), pageCounter, reUseableDataOutput); // get the key array in columnar way byte[][] complexKeyArray = complexTypeArrayList.get(j); complexKeyArray[i] = byteStream.toByteArray(); + reUseableDataOutput.reset(); } catch (IOException e) { + isExceptionThrown = true; LOGGER.error(e); } finally { - CarbonUtil.closeStreams(dataOutput); - CarbonUtil.closeStreams(byteStream); + if (isExceptionThrown) { + CarbonUtil.closeStreams(reUseableDataOutput); + CarbonUtil.closeStreams(byteStream); + } } } } + CarbonUtil.closeStreams(reUseableDataOutput); + CarbonUtil.closeStreams(byteStream); return complexTypeArrayList; } @@ -607,24 +623,32 @@ public abstract class BlockletScannedResult { * @return complex type key array for all the complex dimension selected in query */ protected byte[][] getComplexTypeKeyArray(int rowId) { + ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + ReUsableByteArrayDataOutputStream reUsableDataOutput = + new ReUsableByteArrayDataOutputStream(byteStream); + boolean isExceptionThrown = false; byte[][] complexTypeData = new byte[complexParentBlockIndexes.length][]; for (int i = 0; i < complexTypeData.length; i++) { GenericQueryType genericQueryType = complexParentIndexToQueryMap.get(complexParentBlockIndexes[i]); - ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); - DataOutputStream dataOutput = new DataOutputStream(byteStream); try { genericQueryType - .parseBlocksAndReturnComplexColumnByteArray(dimRawColumnChunks, rowId, pageCounter, - dataOutput); + .parseBlocksAndReturnComplexColumnByteArray(dimRawColumnChunks, dimensionColumnPages, + rowId, pageCounter, reUsableDataOutput); complexTypeData[i] = byteStream.toByteArray(); + reUsableDataOutput.reset(); } catch (IOException e) { + isExceptionThrown = true; LOGGER.error(e); } finally { - CarbonUtil.closeStreams(dataOutput); - CarbonUtil.closeStreams(byteStream); + if (isExceptionThrown) { + CarbonUtil.closeStreams(reUsableDataOutput); + CarbonUtil.closeStreams(byteStream); + } } } + CarbonUtil.closeStreams(reUsableDataOutput); + CarbonUtil.closeStreams(byteStream); return complexTypeData; }