Repository: carbondata Updated Branches: refs/heads/master b62b0fd9c -> 71d617955
[CARBONDATA-3014] Added support for inverted index and delete delta for direct scan queries Added new classes to support inverted index and delete delta directly from column vector. ColumnarVectorWrapperDirectWithInvertedIndex ColumnarVectorWrapperDirectWithDeleteDelta ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex This closes #2822 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/71d61795 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/71d61795 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/71d61795 Branch: refs/heads/master Commit: 71d6179557703718ff0aac099efcc89ee41ed941 Parents: b62b0fd Author: ravipesala <ravi.pes...@gmail.com> Authored: Tue Oct 16 16:37:18 2018 +0530 Committer: kumarvishal09 <kumarvishal1...@gmail.com> Committed: Fri Oct 26 18:52:10 2018 +0530 ---------------------------------------------------------------------- ...mpressedDimensionChunkFileBasedReaderV3.java | 12 +- .../safe/AbstractNonDictionaryVectorFiller.java | 6 +- .../SafeFixedLengthDimensionDataChunkStore.java | 11 + ...feVariableLengthDimensionDataChunkStore.java | 10 + .../adaptive/AdaptiveDeltaFloatingCodec.java | 3 + .../adaptive/AdaptiveDeltaIntegralCodec.java | 35 ++- .../adaptive/AdaptiveFloatingCodec.java | 3 + .../adaptive/AdaptiveIntegralCodec.java | 17 +- .../encoding/compress/DirectCompressCodec.java | 16 +- .../datatype/DecimalConverterFactory.java | 42 +++- .../scan/collector/ResultCollectorFactory.java | 11 +- .../executer/RestructureEvaluatorImpl.java | 2 +- ...elRangeGrtrThanEquaToFilterExecuterImpl.java | 14 +- .../scan/result/vector/ColumnVectorInfo.java | 1 + .../AbstractCarbonColumnarVector.java | 133 ++++++++++++ .../ColumnarVectorWrapperDirectFactory.java | 59 +++++ ...umnarVectorWrapperDirectWithDeleteDelta.java | 216 +++++++++++++++++++ ...erDirectWithDeleteDeltaAndInvertedIndex.java | 179 +++++++++++++++ ...narVectorWrapperDirectWithInvertedIndex.java | 144 +++++++++++++ .../impl/directread/ConvertableVector.java | 30 +++ .../scanner/impl/BlockletFilterScanner.java | 8 +- .../detailquery/CastColumnTestCase.scala | 2 +- .../datasources/SparkCarbonFileFormat.scala | 1 + 23 files changed, 910 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java index a9f9338..602e694 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java @@ -276,13 +276,19 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead offset += pageMetadata.data_page_length; invertedIndexes = CarbonUtil .getUnCompressColumnIndex(pageMetadata.rowid_page_length, pageData, offset); - // get the reverse index - invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes); + if (vectorInfo == null) { + // get the reverse index + invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes); + } else { + vectorInfo.invertedIndex = invertedIndexes; + } } BitSet nullBitSet = QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor); ColumnPage decodedPage = decodeDimensionByMeta(pageMetadata, pageData, dataOffset, null != rawColumnPage.getLocalDictionary(), vectorInfo, nullBitSet); - decodedPage.setNullBits(nullBitSet); + if (decodedPage != null) { + decodedPage.setNullBits(nullBitSet); + } return new ColumnPageWrapper(decodedPage, rawColumnPage.getLocalDictionary(), invertedIndexes, invertedIndexesReverse, isEncodedWithAdaptiveMeta(pageMetadata), isExplicitSorted); } else { http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java index ddfa470..2e68648 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java @@ -52,7 +52,11 @@ class NonDictionaryVectorFillerFactory { public static AbstractNonDictionaryVectorFiller getVectorFiller(DataType type, int lengthSize, int numberOfRows) { if (type == DataTypes.STRING) { - return new StringVectorFiller(lengthSize, numberOfRows); + if (lengthSize > 2) { + return new LongStringVectorFiller(lengthSize, numberOfRows); + } else { + return new StringVectorFiller(lengthSize, numberOfRows); + } } else if (type == DataTypes.VARCHAR) { return new LongStringVectorFiller(lengthSize, numberOfRows); } else if (type == DataTypes.TIMESTAMP) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java index d30650d..d4bae90 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java @@ -17,12 +17,16 @@ package org.apache.carbondata.core.datastore.chunk.store.impl.safe; +import java.util.BitSet; + import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo; +import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory; +import org.apache.carbondata.core.scan.result.vector.impl.directread.ConvertableVector; import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.core.util.CarbonUtil; @@ -49,7 +53,14 @@ public class SafeFixedLengthDimensionDataChunkStore extends SafeAbsractDimension public void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] data, ColumnVectorInfo vectorInfo) { CarbonColumnVector vector = vectorInfo.vector; + BitSet deletedRows = vectorInfo.deletedRows; + BitSet nullBits = new BitSet(numOfRows); + vector = ColumnarVectorWrapperDirectFactory + .getDirectVectorWrapperFactory(vector, invertedIndex, nullBits, deletedRows, false); fillVector(data, vectorInfo, vector); + if (vector instanceof ConvertableVector) { + ((ConvertableVector) vector).convert(); + } } private void fillVector(byte[] data, ColumnVectorInfo vectorInfo, CarbonColumnVector vector) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java index 0fb4854..b80ad7f 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java @@ -18,12 +18,15 @@ package org.apache.carbondata.core.datastore.chunk.store.impl.safe; import java.nio.ByteBuffer; +import java.util.BitSet; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo; +import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory; +import org.apache.carbondata.core.scan.result.vector.impl.directread.ConvertableVector; import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.core.util.DataTypeUtil; @@ -103,7 +106,14 @@ public abstract class SafeVariableLengthDimensionDataChunkStore ByteBuffer buffer = ByteBuffer.wrap(data); AbstractNonDictionaryVectorFiller vectorFiller = NonDictionaryVectorFillerFactory.getVectorFiller(dt, lengthSize, numberOfRows); + BitSet nullBits = new BitSet(numberOfRows); + vector = ColumnarVectorWrapperDirectFactory + .getDirectVectorWrapperFactory(vector, invertedIndex, nullBits, vectorInfo.deletedRows, + false); vectorFiller.fillVector(data, vector, buffer); + if (vector instanceof ConvertableVector) { + ((ConvertableVector) vector).convert(); + } } protected abstract int getLengthSize(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java index 1826798..d19d1c9 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java @@ -39,6 +39,7 @@ import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo; +import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory; import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.format.DataChunk2; import org.apache.carbondata.format.Encoding; @@ -250,6 +251,8 @@ public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec { int pageSize = columnPage.getPageSize(); BitSet deletedRows = vectorInfo.deletedRows; DataType vectorDataType = vector.getType(); + vector = ColumnarVectorWrapperDirectFactory + .getDirectVectorWrapperFactory(vector, null, nullBits, deletedRows, true); if (vectorDataType == DataTypes.FLOAT) { float floatFactor = factor.floatValue(); if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java index 0d7ad8a..1671246 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java @@ -40,6 +40,8 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory; import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo; +import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory; +import org.apache.carbondata.core.scan.result.vector.impl.directread.ConvertableVector; import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.format.DataChunk2; import org.apache.carbondata.format.Encoding; @@ -305,16 +307,26 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec { DataType pageDataType = columnPage.getDataType(); int pageSize = columnPage.getPageSize(); BitSet deletedRows = vectorInfo.deletedRows; + vector = ColumnarVectorWrapperDirectFactory + .getDirectVectorWrapperFactory(vector, vectorInfo.invertedIndex, nullBits, deletedRows, + true); fillVector(columnPage, vector, vectorDataType, pageDataType, pageSize, vectorInfo); if (deletedRows == null || deletedRows.isEmpty()) { for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) { vector.putNull(i); } } + if (vector instanceof ConvertableVector) { + ((ConvertableVector) vector).convert(); + } } private void fillVector(ColumnPage columnPage, CarbonColumnVector vector, DataType vectorDataType, DataType pageDataType, int pageSize, ColumnVectorInfo vectorInfo) { + int newScale = 0; + if (vectorInfo.measure != null) { + newScale = vectorInfo.measure.getMeasure().getScale(); + } if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) { byte[] byteData = columnPage.getBytePage(); if (vectorDataType == DataTypes.SHORT) { @@ -331,7 +343,7 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec { } } else if (vectorDataType == DataTypes.TIMESTAMP) { for (int i = 0; i < pageSize; i++) { - vector.putLong(i, (max - byteData[i]) * 1000); + vector.putLong(i, (max - (long) byteData[i]) * 1000); } } else if (vectorDataType == DataTypes.BOOLEAN) { for (int i = 0; i < pageSize; i++) { @@ -342,6 +354,9 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec { int precision = vectorInfo.measure.getMeasure().getPrecision(); for (int i = 0; i < pageSize; i++) { BigDecimal decimal = decimalConverter.getDecimal(max - byteData[i]); + if (decimal.scale() < newScale) { + decimal = decimal.setScale(newScale); + } vector.putDecimal(i, decimal, precision); } } else { @@ -365,13 +380,16 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec { } } else if (vectorDataType == DataTypes.TIMESTAMP) { for (int i = 0; i < pageSize; i++) { - vector.putLong(i, (max - shortData[i]) * 1000); + vector.putLong(i, (max - (long) shortData[i]) * 1000); } } else if (DataTypes.isDecimal(vectorDataType)) { DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter; int precision = vectorInfo.measure.getMeasure().getPrecision(); for (int i = 0; i < pageSize; i++) { BigDecimal decimal = decimalConverter.getDecimal(max - shortData[i]); + if (decimal.scale() < newScale) { + decimal = decimal.setScale(newScale); + } vector.putDecimal(i, decimal, precision); } } else { @@ -395,7 +413,7 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec { } else if (vectorDataType == DataTypes.TIMESTAMP) { for (int i = 0; i < pageSize; i++) { int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3); - vector.putLong(i, (max - shortInt) * 1000); + vector.putLong(i, (max - (long) shortInt) * 1000); } } else if (DataTypes.isDecimal(vectorDataType)) { DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter; @@ -403,6 +421,9 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec { for (int i = 0; i < pageSize; i++) { int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3); BigDecimal decimal = decimalConverter.getDecimal(max - shortInt); + if (decimal.scale() < newScale) { + decimal = decimal.setScale(newScale); + } vector.putDecimal(i, decimal, precision); } } else { @@ -423,13 +444,16 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec { } } else if (vectorDataType == DataTypes.TIMESTAMP) { for (int i = 0; i < pageSize; i++) { - vector.putLong(i, (max - intData[i]) * 1000); + vector.putLong(i, (max - (long) intData[i]) * 1000); } } else if (DataTypes.isDecimal(vectorDataType)) { DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter; int precision = vectorInfo.measure.getMeasure().getPrecision(); for (int i = 0; i < pageSize; i++) { BigDecimal decimal = decimalConverter.getDecimal(max - intData[i]); + if (decimal.scale() < newScale) { + decimal = decimal.setScale(newScale); + } vector.putDecimal(i, decimal, precision); } } else { @@ -452,6 +476,9 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec { int precision = vectorInfo.measure.getMeasure().getPrecision(); for (int i = 0; i < pageSize; i++) { BigDecimal decimal = decimalConverter.getDecimal(max - longData[i]); + if (decimal.scale() < newScale) { + decimal = decimal.setScale(newScale); + } vector.putDecimal(i, decimal, precision); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java index 38bf9b6..21421d3 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java @@ -38,6 +38,7 @@ import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo; +import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory; import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.format.DataChunk2; import org.apache.carbondata.format.Encoding; @@ -253,6 +254,8 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec { int pageSize = columnPage.getPageSize(); BitSet deletedRows = vectorInfo.deletedRows; DataType vectorDataType = vector.getType(); + vector = ColumnarVectorWrapperDirectFactory + .getDirectVectorWrapperFactory(vector, null, nullBits, deletedRows, true); if (vectorDataType == DataTypes.FLOAT) { if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) { byte[] byteData = columnPage.getBytePage(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java index bdf5373..1813907 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java @@ -39,6 +39,8 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory; import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo; +import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory; +import org.apache.carbondata.core.scan.result.vector.impl.directread.ConvertableVector; import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.format.DataChunk2; import org.apache.carbondata.format.Encoding; @@ -278,12 +280,19 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec { DataType pageDataType = columnPage.getDataType(); int pageSize = columnPage.getPageSize(); BitSet deletedRows = vectorInfo.deletedRows; + vector = ColumnarVectorWrapperDirectFactory + .getDirectVectorWrapperFactory(vector, vectorInfo.invertedIndex, nullBits, deletedRows, + true); fillVector(columnPage, vector, vectorDataType, pageDataType, pageSize, vectorInfo); if (deletedRows == null || deletedRows.isEmpty()) { for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) { vector.putNull(i); } } + if (vector instanceof ConvertableVector) { + ((ConvertableVector) vector).convert(); + } + } private void fillVector(ColumnPage columnPage, CarbonColumnVector vector, @@ -304,7 +313,7 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec { } } else if (vectorDataType == DataTypes.TIMESTAMP) { for (int i = 0; i < pageSize; i++) { - vector.putLong(i, byteData[i] * 1000); + vector.putLong(i, (long) byteData[i] * 1000); } } else if (vectorDataType == DataTypes.BOOLEAN) { vector.putBytes(0, pageSize, byteData, 0); @@ -330,7 +339,7 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec { } } else if (vectorDataType == DataTypes.TIMESTAMP) { for (int i = 0; i < pageSize; i++) { - vector.putLong(i, shortData[i] * 1000); + vector.putLong(i, (long) shortData[i] * 1000); } } else if (DataTypes.isDecimal(vectorDataType)) { DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter; @@ -356,7 +365,7 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec { } else if (vectorDataType == DataTypes.TIMESTAMP) { for (int i = 0; i < pageSize; i++) { int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3); - vector.putLong(i, shortInt * 1000); + vector.putLong(i, (long) shortInt * 1000); } } else if (DataTypes.isDecimal(vectorDataType)) { DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter; @@ -378,7 +387,7 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec { } } else if (vectorDataType == DataTypes.TIMESTAMP) { for (int i = 0; i < pageSize; i++) { - vector.putLong(i, intData[i] * 1000); + vector.putLong(i, (long) intData[i] * 1000); } } else if (DataTypes.isDecimal(vectorDataType)) { DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter; http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java index 4d1e6e7..1d065cf 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java @@ -39,6 +39,8 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory; import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo; +import org.apache.carbondata.core.scan.result.vector.impl.directread.ColumnarVectorWrapperDirectFactory; +import org.apache.carbondata.core.scan.result.vector.impl.directread.ConvertableVector; import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.format.Encoding; @@ -208,12 +210,18 @@ public class DirectCompressCodec implements ColumnPageCodec { DataType pageDataType = columnPage.getDataType(); int pageSize = columnPage.getPageSize(); BitSet deletedRows = vectorInfo.deletedRows; + vector = ColumnarVectorWrapperDirectFactory + .getDirectVectorWrapperFactory(vector, vectorInfo.invertedIndex, nullBits, deletedRows, + true); fillVector(columnPage, vector, vectorDataType, pageDataType, pageSize, vectorInfo); if (deletedRows == null || deletedRows.isEmpty()) { for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) { vector.putNull(i); } } + if (vector instanceof ConvertableVector) { + ((ConvertableVector) vector).convert(); + } } private void fillVector(ColumnPage columnPage, CarbonColumnVector vector, @@ -234,7 +242,7 @@ public class DirectCompressCodec implements ColumnPageCodec { } } else if (vectorDataType == DataTypes.TIMESTAMP) { for (int i = 0; i < pageSize; i++) { - vector.putLong(i, byteData[i] * 1000); + vector.putLong(i, (long) byteData[i] * 1000); } } else if (vectorDataType == DataTypes.BOOLEAN || vectorDataType == DataTypes.BYTE) { vector.putBytes(0, pageSize, byteData, 0); @@ -260,7 +268,7 @@ public class DirectCompressCodec implements ColumnPageCodec { } } else if (vectorDataType == DataTypes.TIMESTAMP) { for (int i = 0; i < pageSize; i++) { - vector.putLong(i, shortData[i] * 1000); + vector.putLong(i, (long) shortData[i] * 1000); } } else if (DataTypes.isDecimal(vectorDataType)) { DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter; @@ -286,7 +294,7 @@ public class DirectCompressCodec implements ColumnPageCodec { } else if (vectorDataType == DataTypes.TIMESTAMP) { for (int i = 0; i < pageSize; i++) { int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3); - vector.putLong(i, shortInt * 1000); + vector.putLong(i, (long) shortInt * 1000); } } else if (DataTypes.isDecimal(vectorDataType)) { DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter; @@ -308,7 +316,7 @@ public class DirectCompressCodec implements ColumnPageCodec { } } else if (vectorDataType == DataTypes.TIMESTAMP) { for (int i = 0; i < pageSize; i++) { - vector.putLong(i, intData[i] * 1000); + vector.putLong(i, (long) intData[i] * 1000); } } else if (DataTypes.isDecimal(vectorDataType)) { DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter; http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java index 89a3168..5231cb9 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java @@ -106,13 +106,18 @@ public final class DecimalConverterFactory { // inefficient. CarbonColumnVector vector = info.vector; int precision = info.measure.getMeasure().getPrecision(); + int newMeasureScale = info.measure.getMeasure().getScale(); if (valuesToBeConverted instanceof byte[]) { byte[] data = (byte[]) valuesToBeConverted; for (int i = 0; i < size; i++) { if (nullBitset.get(i)) { vector.putNull(i); } else { - vector.putDecimal(i, BigDecimal.valueOf(data[i], scale), precision); + BigDecimal value = BigDecimal.valueOf(data[i], scale); + if (value.scale() < newMeasureScale) { + value = value.setScale(newMeasureScale); + } + vector.putDecimal(i, value, precision); } } } else if (valuesToBeConverted instanceof short[]) { @@ -121,7 +126,11 @@ public final class DecimalConverterFactory { if (nullBitset.get(i)) { vector.putNull(i); } else { - vector.putDecimal(i, BigDecimal.valueOf(data[i], scale), precision); + BigDecimal value = BigDecimal.valueOf(data[i], scale); + if (value.scale() < newMeasureScale) { + value = value.setScale(newMeasureScale); + } + vector.putDecimal(i, value, precision); } } } else if (valuesToBeConverted instanceof int[]) { @@ -130,7 +139,11 @@ public final class DecimalConverterFactory { if (nullBitset.get(i)) { vector.putNull(i); } else { - vector.putDecimal(i, BigDecimal.valueOf(data[i], scale), precision); + BigDecimal value = BigDecimal.valueOf(data[i], scale); + if (value.scale() < newMeasureScale) { + value = value.setScale(newMeasureScale); + } + vector.putDecimal(i, value, precision); } } } else if (valuesToBeConverted instanceof long[]) { @@ -139,7 +152,11 @@ public final class DecimalConverterFactory { if (nullBitset.get(i)) { vector.putNull(i); } else { - vector.putDecimal(i, BigDecimal.valueOf(data[i], scale), precision); + BigDecimal value = BigDecimal.valueOf(data[i], scale); + if (value.scale() < newMeasureScale) { + value = value.setScale(newMeasureScale); + } + vector.putDecimal(i, value, precision); } } } @@ -225,6 +242,10 @@ public final class DecimalConverterFactory { BitSet nullBitset) { CarbonColumnVector vector = info.vector; int precision = info.measure.getMeasure().getPrecision(); + int newMeasureScale = info.measure.getMeasure().getScale(); + if (scale < newMeasureScale) { + scale = newMeasureScale; + } if (valuesToBeConverted instanceof byte[][]) { byte[][] data = (byte[][]) valuesToBeConverted; for (int i = 0; i < size; i++) { @@ -232,7 +253,11 @@ public final class DecimalConverterFactory { vector.putNull(i); } else { BigInteger bigInteger = new BigInteger(data[i]); - vector.putDecimal(i, new BigDecimal(bigInteger, scale), precision); + BigDecimal value = new BigDecimal(bigInteger, scale); + if (value.scale() < newMeasureScale) { + value = value.setScale(newMeasureScale); + } + vector.putDecimal(i, value, precision); } } } @@ -263,13 +288,18 @@ public final class DecimalConverterFactory { BitSet nullBitset) { CarbonColumnVector vector = info.vector; int precision = info.measure.getMeasure().getPrecision(); + int newMeasureScale = info.measure.getMeasure().getScale(); if (valuesToBeConverted instanceof byte[][]) { byte[][] data = (byte[][]) valuesToBeConverted; for (int i = 0; i < size; i++) { if (nullBitset.get(i)) { vector.putNull(i); } else { - vector.putDecimal(i, DataTypeUtil.byteToBigDecimal(data[i]), precision); + BigDecimal value = DataTypeUtil.byteToBigDecimal(data[i]); + if (value.scale() < newMeasureScale) { + value = value.setScale(newMeasureScale); + } + vector.putDecimal(i, value, precision); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/scan/collector/ResultCollectorFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/ResultCollectorFactory.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/ResultCollectorFactory.java index 68f8ae6..f102a48 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/collector/ResultCollectorFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/ResultCollectorFactory.java @@ -17,16 +17,7 @@ package org.apache.carbondata.core.scan.collector; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.scan.collector.impl.AbstractScannedResultCollector; -import org.apache.carbondata.core.scan.collector.impl.DictionaryBasedResultCollector; -import org.apache.carbondata.core.scan.collector.impl.DictionaryBasedVectorResultCollector; -import org.apache.carbondata.core.scan.collector.impl.RawBasedResultCollector; -import org.apache.carbondata.core.scan.collector.impl.RestructureBasedDictionaryResultCollector; -import org.apache.carbondata.core.scan.collector.impl.RestructureBasedRawResultCollector; -import org.apache.carbondata.core.scan.collector.impl.RestructureBasedVectorResultCollector; -import org.apache.carbondata.core.scan.collector.impl.RowIdBasedResultCollector; -import org.apache.carbondata.core.scan.collector.impl.RowIdRawBasedResultCollector; -import org.apache.carbondata.core.scan.collector.impl.RowIdRestructureBasedRawResultCollector; +import org.apache.carbondata.core.scan.collector.impl.*; import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; import org.apache.log4j.Logger; http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java index a25394f..9d44462 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RestructureEvaluatorImpl.java @@ -111,7 +111,7 @@ public abstract class RestructureEvaluatorImpl implements FilterExecuter { @Override public BitSet prunePages(RawBlockletColumnChunks rawBlockletColumnChunks) throws FilterUnsupportedException, IOException { - return new BitSet(); + throw new FilterUnsupportedException("Unsupported RestructureEvaluatorImpl on pune pages"); } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java index e4c507d..02c587e 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java @@ -319,18 +319,20 @@ public class RowLevelRangeGrtrThanEquaToFilterExecuterImpl extends RowLevelFilte } } - private boolean isScanRequired(DimensionRawColumnChunk rawColumnChunk, int i) { + private boolean isScanRequired(DimensionRawColumnChunk rawColumnChunk, int columnIndex) { boolean scanRequired; DataType dataType = dimColEvaluatorInfoList.get(0).getDimension().getDataType(); // for no dictionary measure column comparison can be done // on the original data as like measure column - if (DataTypeUtil.isPrimitiveColumn(dataType) && !dimColEvaluatorInfoList.get(0) - .getDimension().hasEncoding(Encoding.DICTIONARY)) { + if (DataTypeUtil.isPrimitiveColumn(dataType) && !dimColEvaluatorInfoList.get(0).getDimension() + .hasEncoding(Encoding.DICTIONARY)) { scanRequired = - isScanRequired(rawColumnChunk.getMaxValues()[i], this.filterRangeValues, dataType); + isScanRequired(rawColumnChunk.getMaxValues()[columnIndex], this.filterRangeValues, + dataType); } else { - scanRequired = isScanRequired(rawColumnChunk.getMaxValues()[i], this.filterRangeValues, - rawColumnChunk.getMinMaxFlagArray()[i]); + scanRequired = + isScanRequired(rawColumnChunk.getMaxValues()[columnIndex], this.filterRangeValues, + rawColumnChunk.getMinMaxFlagArray()[columnIndex]); } return scanRequired; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java index d127728..6a9b3b3 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java @@ -35,6 +35,7 @@ public class ColumnVectorInfo implements Comparable<ColumnVectorInfo> { public DirectDictionaryGenerator directDictionaryGenerator; public MeasureDataVectorProcessor.MeasureVectorFiller measureVectorFiller; public GenericQueryType genericQueryType; + public int[] invertedIndex; public BitSet deletedRows; public DecimalConverterFactory.DecimalConverter decimalConverter; http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/AbstractCarbonColumnarVector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/AbstractCarbonColumnarVector.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/AbstractCarbonColumnarVector.java new file mode 100644 index 0000000..437eee4 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/AbstractCarbonColumnarVector.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.scan.result.vector.impl.directread; + +import java.math.BigDecimal; + +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; +import org.apache.carbondata.core.scan.result.vector.CarbonDictionary; + +public abstract class AbstractCarbonColumnarVector + implements CarbonColumnVector, ConvertableVector { + + @Override + public void putShorts(int rowId, int count, short value) { + throw new UnsupportedOperationException("Not allowed from here"); + } + + @Override + public void putInts(int rowId, int count, int value) { + throw new UnsupportedOperationException("Not allowed from here"); + } + + @Override + public void putLongs(int rowId, int count, long value) { + throw new UnsupportedOperationException("Not allowed from here"); + } + + @Override + public void putDecimals(int rowId, int count, BigDecimal value, int precision) { + throw new UnsupportedOperationException("Not allowed from here"); + } + + @Override + public void putDoubles(int rowId, int count, double value) { + throw new UnsupportedOperationException("Not allowed from here"); + } + + @Override + public void putBytes(int rowId, int count, byte[] value) { + throw new UnsupportedOperationException("Not allowed from here"); + } + + @Override + public void putNulls(int rowId, int count) { + throw new UnsupportedOperationException("Not allowed from here"); + } + + @Override + public void putNotNull(int rowId) { + throw new UnsupportedOperationException("Not allowed from here"); + } + + @Override + public void putNotNull(int rowId, int count) { + throw new UnsupportedOperationException("Not allowed from here"); + } + + @Override + public boolean isNull(int rowId) { + throw new UnsupportedOperationException("Not allowed from here"); + } + + @Override + public void putObject(int rowId, Object obj) { + throw new UnsupportedOperationException("Not allowed from here"); + } + + @Override + public Object getData(int rowId) { + throw new UnsupportedOperationException("Not allowed from here"); + } + + @Override + public void reset() { + throw new UnsupportedOperationException("Not allowed from here"); + } + + @Override + public DataType getType() { + throw new UnsupportedOperationException("Not allowed from here"); + } + + @Override + public DataType getBlockDataType() { + throw new UnsupportedOperationException("Not allowed from here"); + } + + @Override + public void setBlockDataType(DataType blockDataType) { + throw new UnsupportedOperationException("Not allowed from here"); + } + + @Override + public void setFilteredRowsExist(boolean filteredRowsExist) { + throw new UnsupportedOperationException("Not allowed from here"); + } + + @Override + public void setDictionary(CarbonDictionary dictionary) { + throw new UnsupportedOperationException("Not allowed from here"); + } + + @Override + public boolean hasDictionary() { + throw new UnsupportedOperationException("Not allowed from here"); + } + + @Override + public CarbonColumnVector getDictionaryVector() { + throw new UnsupportedOperationException("Not allowed from here"); + } + + @Override + public void convert() { + // Do nothing + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectFactory.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectFactory.java new file mode 100644 index 0000000..4884b4d --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectFactory.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.scan.result.vector.impl.directread; + +import java.util.BitSet; + +import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; + +/** + * Factory to create ColumnarVectors for inverted index and delete delta queries. + */ +public final class ColumnarVectorWrapperDirectFactory { + + /** + * Gets carbon vector wrapper to fill the underlying vector based on inverted index and delete + * delta. + * + * @param columnVector Actual vector to be filled. + * @param invertedIndex Inverted index of column page + * @param nullBitset row locations of nulls in bitset + * @param deletedRows deleted rows locations in bitset. + * @param isnullBitsExists whether nullbitset present on this page, usually for dimension columns + * there is no null bitset. + * @return wrapped CarbonColumnVector + */ + public static CarbonColumnVector getDirectVectorWrapperFactory(CarbonColumnVector columnVector, + int[] invertedIndex, BitSet nullBitset, BitSet deletedRows, boolean isnullBitsExists) { + if ((invertedIndex != null && invertedIndex.length > 0) && (deletedRows == null || deletedRows + .isEmpty())) { + return new ColumnarVectorWrapperDirectWithInvertedIndex(columnVector, invertedIndex, + isnullBitsExists); + } else if ((invertedIndex == null || invertedIndex.length == 0) && (deletedRows != null + && !deletedRows.isEmpty())) { + return new ColumnarVectorWrapperDirectWithDeleteDelta(columnVector, deletedRows, nullBitset); + } else if ((invertedIndex != null && invertedIndex.length > 0) && (deletedRows != null + && !deletedRows.isEmpty())) { + return new ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex(columnVector, + deletedRows, invertedIndex, nullBitset, isnullBitsExists); + } else { + return columnVector; + } + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDelta.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDelta.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDelta.java new file mode 100644 index 0000000..ccde63e --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDelta.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.scan.result.vector.impl.directread; + +import java.math.BigDecimal; +import java.util.BitSet; + +import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; + +/** + * Column vector for column pages which has delete delta, so it uses delta biset to filter out + * data before filling to actual vector. + */ +class ColumnarVectorWrapperDirectWithDeleteDelta extends AbstractCarbonColumnarVector { + + private BitSet deletedRows; + + private BitSet nullBits; + + private int counter; + + private CarbonColumnVector columnVector; + + public ColumnarVectorWrapperDirectWithDeleteDelta(CarbonColumnVector vectorWrapper, + BitSet deletedRows, BitSet nullBits) { + this.deletedRows = deletedRows; + this.nullBits = nullBits; + this.columnVector = vectorWrapper; + } + + @Override + public void putBoolean(int rowId, boolean value) { + if (!deletedRows.get(rowId)) { + if (nullBits.get(rowId)) { + columnVector.putNull(counter++); + } else { + columnVector.putBoolean(counter++, value); + } + } + } + + @Override + public void putFloat(int rowId, float value) { + if (!deletedRows.get(rowId)) { + if (nullBits.get(rowId)) { + columnVector.putNull(counter++); + } else { + columnVector.putFloat(counter++, value); + } + } + } + + @Override + public void putShort(int rowId, short value) { + if (!deletedRows.get(rowId)) { + if (nullBits.get(rowId)) { + columnVector.putNull(counter++); + } else { + columnVector.putShort(counter++, value); + } + } + } + + @Override + public void putInt(int rowId, int value) { + if (!deletedRows.get(rowId)) { + if (nullBits.get(rowId)) { + columnVector.putNull(counter++); + } else { + columnVector.putInt(counter++, value); + } + } + } + + @Override + public void putLong(int rowId, long value) { + if (!deletedRows.get(rowId)) { + if (nullBits.get(rowId)) { + columnVector.putNull(counter++); + } else { + columnVector.putLong(counter++, value); + } + } + } + + @Override + public void putDecimal(int rowId, BigDecimal value, int precision) { + if (!deletedRows.get(rowId)) { + if (nullBits.get(rowId)) { + columnVector.putNull(counter++); + } else { + columnVector.putDecimal(counter++, value, precision); + } + } + } + + @Override + public void putDouble(int rowId, double value) { + if (!deletedRows.get(rowId)) { + if (nullBits.get(rowId)) { + columnVector.putNull(counter++); + } else { + columnVector.putDouble(counter++, value); + } + } + } + + @Override + public void putByteArray(int rowId, byte[] value) { + if (!deletedRows.get(rowId)) { + if (nullBits.get(rowId)) { + columnVector.putNull(counter++); + } else { + columnVector.putByteArray(counter++, value); + } + } + } + + @Override + public void putByteArray(int rowId, int offset, int length, byte[] value) { + if (!deletedRows.get(rowId)) { + if (nullBits.get(rowId)) { + columnVector.putNull(counter++); + } else { + columnVector.putByteArray(counter++, offset, length, value); + } + } + } + + @Override + public void putByte(int rowId, byte value) { + if (!deletedRows.get(rowId)) { + if (nullBits.get(rowId)) { + columnVector.putNull(counter++); + } else { + columnVector.putByte(counter++, value); + } + } + } + + @Override + public void putNull(int rowId) { + if (!deletedRows.get(rowId)) { + columnVector.putNull(counter++); + } + } + + @Override + public void putFloats(int rowId, int count, float[] src, int srcIndex) { + for (int i = srcIndex; i < count; i++) { + if (!deletedRows.get(rowId++)) { + columnVector.putFloat(counter++, src[i]); + } + } + } + + @Override + public void putShorts(int rowId, int count, short[] src, int srcIndex) { + for (int i = srcIndex; i < count; i++) { + if (!deletedRows.get(rowId++)) { + columnVector.putShort(counter++, src[i]); + } + } + } + + @Override + public void putInts(int rowId, int count, int[] src, int srcIndex) { + for (int i = srcIndex; i < count; i++) { + if (!deletedRows.get(rowId++)) { + columnVector.putInt(counter++, src[i]); + } + } + } + + @Override + public void putLongs(int rowId, int count, long[] src, int srcIndex) { + for (int i = srcIndex; i < count; i++) { + if (!deletedRows.get(rowId++)) { + columnVector.putLong(counter++, src[i]); + } + } + } + + @Override + public void putDoubles(int rowId, int count, double[] src, int srcIndex) { + for (int i = srcIndex; i < count; i++) { + if (!deletedRows.get(rowId++)) { + columnVector.putDouble(counter++, src[i]); + } + } + } + + @Override + public void putBytes(int rowId, int count, byte[] src, int srcIndex) { + for (int i = srcIndex; i < count; i++) { + if (!deletedRows.get(rowId++)) { + columnVector.putByte(counter++, src[i]); + } + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex.java new file mode 100644 index 0000000..e4507cb --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.scan.result.vector.impl.directread; + +import java.math.BigDecimal; +import java.util.BitSet; + +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.datatype.DecimalType; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; +import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl; + +/** + * Column vector for column pages which has delete delta and inverted index, so it uses delta biset + * to filter out data and use inverted index before filling to actual vector + */ +class ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex + extends ColumnarVectorWrapperDirectWithInvertedIndex { + + private BitSet deletedRows; + + private CarbonColumnVector carbonColumnVector; + + private int precision; + + private BitSet nullBits; + + /** + * Constructor + * @param vectorWrapper vector to be filled + * @param deletedRows deleted rows from delete delta. + * @param invertedIndex Inverted index of the column + * @param nullBits Null row ordinals in the bitset + * @param isnullBitsExists whether to consider inverted index while setting null bitset or not. + * we are having nullbitset even for dimensions also. + * But some dimension columns still don't have nullbitset. + * So if null bitset does not exist then + * it should not inverted index while setting the null + */ + public ColumnarVectorWrapperDirectWithDeleteDeltaAndInvertedIndex( + CarbonColumnVector vectorWrapper, BitSet deletedRows, int[] invertedIndex, BitSet nullBits, + boolean isnullBitsExists) { + super(new CarbonColumnVectorImpl(invertedIndex.length, vectorWrapper.getType()), invertedIndex, + isnullBitsExists); + this.deletedRows = deletedRows; + this.carbonColumnVector = vectorWrapper; + this.nullBits = nullBits; + } + + @Override + public void putDecimal(int rowId, BigDecimal value, int precision) { + this.precision = precision; + carbonColumnVector.putDecimal(invertedIndex[rowId], value, precision); + } + + @Override + public void putNull(int rowId) { + if (isnullBitsExists) { + nullBits.set(rowId); + } else { + nullBits.set(invertedIndex[rowId]); + } + } + + @Override + public void convert() { + if (columnVector instanceof CarbonColumnVectorImpl) { + CarbonColumnVectorImpl localVector = (CarbonColumnVectorImpl) columnVector; + DataType dataType = carbonColumnVector.getType(); + int length = invertedIndex.length; + int counter = 0; + if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) { + byte[] dataArray = (byte[]) localVector.getDataArray(); + for (int i = 0; i < length; i++) { + if (!deletedRows.get(i)) { + if (nullBits.get(i)) { + carbonColumnVector.putNull(counter++); + } else { + carbonColumnVector.putByte(counter++, dataArray[i]); + } + } + } + } else if (dataType == DataTypes.SHORT) { + short[] dataArray = (short[]) localVector.getDataArray(); + for (int i = 0; i < length; i++) { + if (!deletedRows.get(i)) { + if (nullBits.get(i)) { + carbonColumnVector.putNull(counter++); + } else { + carbonColumnVector.putShort(counter++, dataArray[i]); + } + } + } + } else if (dataType == DataTypes.INT) { + int[] dataArray = (int[]) localVector.getDataArray(); + for (int i = 0; i < length; i++) { + if (!deletedRows.get(i)) { + if (nullBits.get(i)) { + carbonColumnVector.putNull(counter++); + } else { + carbonColumnVector.putInt(counter++, dataArray[i]); + } + } + } + } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) { + long[] dataArray = (long[]) localVector.getDataArray(); + for (int i = 0; i < length; i++) { + if (!deletedRows.get(i)) { + if (nullBits.get(i)) { + carbonColumnVector.putNull(counter++); + } else { + carbonColumnVector.putLong(counter++, dataArray[i]); + } + } + } + } else if (dataType == DataTypes.FLOAT) { + float[] dataArray = (float[]) localVector.getDataArray(); + for (int i = 0; i < length; i++) { + if (!deletedRows.get(i)) { + if (nullBits.get(i)) { + carbonColumnVector.putNull(counter++); + } else { + carbonColumnVector.putFloat(counter++, dataArray[i]); + } + } + } + } else if (dataType == DataTypes.DOUBLE) { + double[] dataArray = (double[]) localVector.getDataArray(); + for (int i = 0; i < length; i++) { + if (!deletedRows.get(i)) { + if (nullBits.get(i)) { + carbonColumnVector.putNull(counter++); + } else { + carbonColumnVector.putDouble(counter++, dataArray[i]); + } + } + } + } else if (dataType instanceof DecimalType) { + BigDecimal[] dataArray = (BigDecimal[]) localVector.getDataArray(); + for (int i = 0; i < length; i++) { + if (!deletedRows.get(i)) { + if (nullBits.get(i)) { + carbonColumnVector.putNull(counter++); + } else { + carbonColumnVector.putDecimal(counter++, dataArray[i], precision); + } + } + } + } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY) { + byte[][] dataArray = (byte[][]) localVector.getDataArray(); + for (int i = 0; i < length; i++) { + if (!deletedRows.get(i)) { + if (nullBits.get(i)) { + carbonColumnVector.putNull(counter++); + } else { + carbonColumnVector.putByteArray(counter++, dataArray[i]); + } + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithInvertedIndex.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithInvertedIndex.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithInvertedIndex.java new file mode 100644 index 0000000..d95267f --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ColumnarVectorWrapperDirectWithInvertedIndex.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.scan.result.vector.impl.directread; + +import java.math.BigDecimal; + +import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; + +/** + * Column vector for column pages which has inverted index, so it uses inverted index + * before filling to actual vector + */ +class ColumnarVectorWrapperDirectWithInvertedIndex extends AbstractCarbonColumnarVector { + + protected int[] invertedIndex; + + protected CarbonColumnVector columnVector; + + protected boolean isnullBitsExists; + + public ColumnarVectorWrapperDirectWithInvertedIndex(CarbonColumnVector columnVector, + int[] invertedIndex, boolean isnullBitsExists) { + this.invertedIndex = invertedIndex; + this.columnVector = columnVector; + this.isnullBitsExists = isnullBitsExists; + } + + @Override + public void putBoolean(int rowId, boolean value) { + columnVector.putBoolean(invertedIndex[rowId], value); + } + + @Override + public void putFloat(int rowId, float value) { + columnVector.putFloat(invertedIndex[rowId], value); + } + + @Override + public void putShort(int rowId, short value) { + columnVector.putShort(invertedIndex[rowId], value); + } + + @Override + public void putInt(int rowId, int value) { + columnVector.putInt(invertedIndex[rowId], value); + } + + @Override + public void putLong(int rowId, long value) { + columnVector.putLong(invertedIndex[rowId], value); + } + + @Override + public void putDecimal(int rowId, BigDecimal value, int precision) { + columnVector.putDecimal(invertedIndex[rowId], value, precision); + } + + @Override + public void putDouble(int rowId, double value) { + columnVector.putDouble(invertedIndex[rowId], value); + } + + @Override + public void putByteArray(int rowId, byte[] value) { + columnVector.putByteArray(invertedIndex[rowId], value); + } + + @Override + public void putByteArray(int rowId, int offset, int length, byte[] value) { + columnVector.putByteArray(invertedIndex[rowId], offset, length, value); + } + + + @Override + public void putByte(int rowId, byte value) { + columnVector.putByte(invertedIndex[rowId], value); + } + + @Override + public void putNull(int rowId) { + if (isnullBitsExists) { + columnVector.putNull(rowId); + } else { + columnVector.putNull(invertedIndex[rowId]); + } + } + + @Override + public void putFloats(int rowId, int count, float[] src, int srcIndex) { + for (int i = srcIndex; i < count; i++) { + columnVector.putFloat(invertedIndex[rowId++], src[i]); + } + } + + @Override + public void putShorts(int rowId, int count, short[] src, int srcIndex) { + for (int i = srcIndex; i < count; i++) { + columnVector.putShort(invertedIndex[rowId++], src[i]); + } + } + + @Override + public void putInts(int rowId, int count, int[] src, int srcIndex) { + for (int i = srcIndex; i < count; i++) { + columnVector.putInt(invertedIndex[rowId++], src[i]); + } + } + + @Override + public void putLongs(int rowId, int count, long[] src, int srcIndex) { + for (int i = srcIndex; i < count; i++) { + columnVector.putLong(invertedIndex[rowId++], src[i]); + } + } + + @Override + public void putDoubles(int rowId, int count, double[] src, int srcIndex) { + for (int i = srcIndex; i < count; i++) { + columnVector.putDouble(invertedIndex[rowId++], src[i]); + } + } + + @Override + public void putBytes(int rowId, int count, byte[] src, int srcIndex) { + for (int i = srcIndex; i < count; i++) { + columnVector.putByte(invertedIndex[rowId++], src[i]); + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ConvertableVector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ConvertableVector.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ConvertableVector.java new file mode 100644 index 0000000..7020c66 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/directread/ConvertableVector.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.scan.result.vector.impl.directread; + +/** + * This interface provides method to convert the values by using inverted index and delete delta + * and fill to the underlying vector. + */ +public interface ConvertableVector { + + /** + * Convert the values and fill it to the underlying vector. + */ + void convert(); +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java index fc32862..0434480 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFilterScanner.java @@ -383,15 +383,13 @@ public class BlockletFilterScanner extends BlockletFullScanner { scannedPages.getCount() + pages.cardinality()); // get the row indexes from bit set for each page int[] pageFilteredPages = new int[pages.cardinality()]; + int[] numberOfRows = new int[pages.cardinality()]; int index = 0; for (int i = pages.nextSetBit(0); i >= 0; i = pages.nextSetBit(i + 1)) { - pageFilteredPages[index++] = i; + pageFilteredPages[index] = i; + numberOfRows[index++] = rawBlockletColumnChunks.getDataBlock().getPageRowCount(i); } // count(*) case there would not be any dimensions are measures selected. - int[] numberOfRows = new int[pages.cardinality()]; - for (int i = 0; i < numberOfRows.length; i++) { - numberOfRows[i] = rawBlockletColumnChunks.getDataBlock().getPageRowCount(i); - } long dimensionReadTime = System.currentTimeMillis(); dimensionReadTime = System.currentTimeMillis() - dimensionReadTime; FileReader fileReader = rawBlockletColumnChunks.getFileReader(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/CastColumnTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/CastColumnTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/CastColumnTestCase.scala index 24524b8..a989230 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/CastColumnTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/CastColumnTestCase.scala @@ -224,7 +224,7 @@ class CastColumnTestCase extends QueryTest with BeforeAndAfterAll { test("Dictionary INT In to implicit Int") { checkAnswer( - sql("select empno,empname,workgroupcategory from DICTIONARY_CARBON_1 where workgroupcategory in ('1', '2')"), + sql("select empno,empname,workgroupcategory from DICTIONARY_CARBON_1 where workgroupcategory in (1, 2)"), sql("select empno,empname,workgroupcategory from DICTIONARY_HIVE_1 where workgroupcategory in ('1', '2')") ) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/71d61795/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala index 88b7ff9..719fa34 100644 --- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala +++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala @@ -408,6 +408,7 @@ class SparkCarbonFileFormat extends FileFormat model.setFreeUnsafeMemory(!isAdded) } val carbonReader = if (readVector) { + model.setDirectVectorFill(true); val vectorizedReader = new VectorizedCarbonRecordReader(model, null, supportBatchValue.toString)