http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java index 0c2e8ab..1e8207c 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java @@ -34,6 +34,7 @@ import org.apache.carbondata.common.logging.impl.StandardLogService; import org.apache.carbondata.core.cache.CacheProvider; import org.apache.carbondata.core.cache.CacheType; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; import org.apache.carbondata.core.datastore.BlockIndexStore; import org.apache.carbondata.core.datastore.IndexKey; import org.apache.carbondata.core.datastore.block.AbstractIndex; @@ -60,6 +61,7 @@ import org.apache.carbondata.core.scan.model.QueryMeasure; import org.apache.carbondata.core.scan.model.QueryModel; import org.apache.carbondata.core.stats.QueryStatistic; import org.apache.carbondata.core.stats.QueryStatisticsConstants; +import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonStorePath; @@ -336,6 +338,10 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { int[] dimensionsBlockIndexes = QueryUtil.getDimensionsBlockIndexes(updatedQueryDimension, segmentProperties.getDimensionOrdinalToBlockMapping(), expressionDimensions, queryProperties.complexFilterDimension, allProjectionListDimensionIdexes); + int numberOfColumnToBeReadInOneIO = Integer.parseInt(CarbonProperties.getInstance() + .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO, + CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE)); + if (dimensionsBlockIndexes.length > 0) { numberOfElementToConsider = dimensionsBlockIndexes[dimensionsBlockIndexes.length - 1] == segmentProperties.getBlockTodimensionOrdinalMapping().size() - 1 ? @@ -343,7 +349,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { dimensionsBlockIndexes.length; blockExecutionInfo.setAllSelectedDimensionBlocksIndexes(CarbonUtil .getRangeIndex(dimensionsBlockIndexes, numberOfElementToConsider, - CarbonCommonConstants.NUMBER_OF_COLUMN_READ_IN_IO)); + numberOfColumnToBeReadInOneIO)); } else { blockExecutionInfo.setAllSelectedDimensionBlocksIndexes(new int[0][0]); } @@ -362,7 +368,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { // setting all the measure chunk indexes to be read from file blockExecutionInfo.setAllSelectedMeasureBlocksIndexes(CarbonUtil .getRangeIndex(measureBlockIndexes, numberOfElementToConsider, - CarbonCommonConstants.NUMBER_OF_COLUMN_READ_IN_IO)); + numberOfColumnToBeReadInOneIO)); } else { blockExecutionInfo.setAllSelectedMeasureBlocksIndexes(new int[0][0]); }
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java index d2523a1..c64f498 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java @@ -16,6 +16,7 @@ */ package org.apache.carbondata.core.scan.filter.executer; +import java.io.IOException; import java.util.ArrayList; import java.util.BitSet; import java.util.List; @@ -24,11 +25,14 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk; +import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.keygenerator.KeyGenerator; import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo; import org.apache.carbondata.core.scan.executor.util.QueryUtil; import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo; +import org.apache.carbondata.core.scan.processor.BlocksChunkHolder; +import org.apache.carbondata.core.util.BitSetGroup; import org.apache.carbondata.core.util.ByteUtil; /** @@ -80,6 +84,26 @@ public class IncludeColGroupFilterExecuterImpl extends IncludeFilterExecuterImpl return bitSet; } + @Override public BitSetGroup applyFilter(BlocksChunkHolder blockChunkHolder) throws IOException { + int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping() + .get(dimColumnEvaluatorInfo.getColumnIndex()); + if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) { + blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock() + .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex); + } + DimensionRawColumnChunk dimensionRawColumnChunk = + blockChunkHolder.getDimensionRawDataChunk()[blockIndex]; + BitSetGroup bitSetGroup = new BitSetGroup(dimensionRawColumnChunk.getPagesCount()); + for (int i = 0; i < dimensionRawColumnChunk.getPagesCount(); i++) { + if (dimensionRawColumnChunk.getMaxValues() != null) { + BitSet bitSet = getFilteredIndexes(dimensionRawColumnChunk.convertToDimColDataChunk(i), + dimensionRawColumnChunk.getRowCount()[i]); + bitSetGroup.setBitSet(bitSet, i); + } + } + return bitSetGroup; + } + /** * It is required for extracting column data from columngroup chunk * http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java index 2bdce8d..c7e2acc 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java @@ -96,17 +96,9 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter for (int i = 0; i < rawColumnChunk.getPagesCount(); i++) { if (rawColumnChunk.getMinValues() != null) { if (isScanRequired(rawColumnChunk.getMinValues()[i], this.filterRangeValues)) { - int compare = ByteUtil.UnsafeComparer.INSTANCE - .compareTo(filterRangeValues[0], rawColumnChunk.getMaxValues()[i]); - if (compare >= 0) { - BitSet bitSet = new BitSet(rawColumnChunk.getRowCount()[i]); - bitSet.flip(0, rawColumnChunk.getRowCount()[i]); - bitSetGroup.setBitSet(bitSet, i); - } else { - BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i), - rawColumnChunk.getRowCount()[i]); - bitSetGroup.setBitSet(bitSet, i); - } + BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i), + rawColumnChunk.getRowCount()[i]); + bitSetGroup.setBitSet(bitSet, i); } } else { BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i), http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java index ae9ba8a..d9795eb 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java @@ -96,17 +96,9 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut for (int i = 0; i < rawColumnChunk.getPagesCount(); i++) { if (rawColumnChunk.getMinValues() != null) { if (isScanRequired(rawColumnChunk.getMinValues()[i], this.filterRangeValues)) { - int compare = ByteUtil.UnsafeComparer.INSTANCE - .compareTo(filterRangeValues[0], rawColumnChunk.getMaxValues()[i]); - if (compare > 0) { - BitSet bitSet = new BitSet(rawColumnChunk.getRowCount()[i]); - bitSet.flip(0, rawColumnChunk.getRowCount()[i]); - bitSetGroup.setBitSet(bitSet, i); - } else { - BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i), - rawColumnChunk.getRowCount()[i]); - bitSetGroup.setBitSet(bitSet, i); - } + BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i), + rawColumnChunk.getRowCount()[i]); + bitSetGroup.setBitSet(bitSet, i); } } else { BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i), http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java b/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java index 07e3487..323042a 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java +++ b/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java @@ -52,9 +52,11 @@ public class BitSetGroup { public void and(BitSetGroup group) { int i = 0; for (BitSet bitSet : bitSets) { - BitSet otherSet = group.getBitSet(i); + BitSet otherSet = group.getBitSet(i); if (bitSet != null && otherSet != null) { bitSet.and(otherSet); + } else { + bitSets[i] = null; } i++; } @@ -63,7 +65,7 @@ public class BitSetGroup { public void or(BitSetGroup group) { int i = 0; for (BitSet bitSet : bitSets) { - BitSet otherSet = group.getBitSet(i); + BitSet otherSet = group.getBitSet(i); if (bitSet != null && otherSet != null) { bitSet.or(otherSet); } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java index 25e7cfe..55c0302 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java @@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -35,18 +36,22 @@ import org.apache.carbondata.core.datastore.compression.WriterCompressModel; import org.apache.carbondata.core.metadata.BlockletInfoColumnar; import org.apache.carbondata.core.metadata.ColumnarFormatVersion; import org.apache.carbondata.core.metadata.ValueEncoderMeta; +import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.index.BlockIndexInfo; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.format.BlockIndex; import org.apache.carbondata.format.BlockletBTreeIndex; import org.apache.carbondata.format.BlockletIndex; import org.apache.carbondata.format.BlockletInfo; import org.apache.carbondata.format.BlockletInfo2; +import org.apache.carbondata.format.BlockletInfo3; import org.apache.carbondata.format.BlockletMinMaxIndex; import org.apache.carbondata.format.ChunkCompressionMeta; import org.apache.carbondata.format.ColumnSchema; import org.apache.carbondata.format.CompressionCodec; import org.apache.carbondata.format.DataChunk; import org.apache.carbondata.format.DataChunk2; +import org.apache.carbondata.format.DataChunk3; import org.apache.carbondata.format.Encoding; import org.apache.carbondata.format.FileFooter; import org.apache.carbondata.format.IndexHeader; @@ -109,6 +114,50 @@ public class CarbonMetadataUtil { } /** + * It converts list of BlockletInfoColumnar to FileFooter thrift objects + * + * @param infoList + * @param numCols + * @param cardinalities + * @return FileFooter + */ + public static FileFooter convertFileFooter3(List<BlockletInfo3> infoList, + List<BlockletIndex> blockletIndexs, int[] cardinalities, List<ColumnSchema> columnSchemaList, + SegmentProperties segmentProperties) throws IOException { + FileFooter footer = getFileFooter3(infoList, blockletIndexs, cardinalities, columnSchemaList); + for (BlockletInfo3 info : infoList) { + footer.addToBlocklet_info_list3(info); + } + return footer; + } + + /** + * Below method will be used to get the file footer object + * + * @param infoList blocklet info + * @param cardinalities cardinlaity of dimension columns + * @param columnSchemaList column schema list + * @return file footer + */ + private static FileFooter getFileFooter3(List<BlockletInfo3> infoList, + List<BlockletIndex> blockletIndexs, int[] cardinalities, + List<ColumnSchema> columnSchemaList) { + SegmentInfo segmentInfo = new SegmentInfo(); + segmentInfo.setNum_cols(columnSchemaList.size()); + segmentInfo.setColumn_cardinalities(CarbonUtil.convertToIntegerList(cardinalities)); + ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion(); + FileFooter footer = new FileFooter(); + footer.setVersion(version.number()); + footer.setNum_rows(getNumberOfRowForFooter(infoList)); + footer.setSegment_info(segmentInfo); + footer.setTable_columns(columnSchemaList); + for (BlockletIndex info : blockletIndexs) { + footer.addToBlocklet_index_list(info); + } + return footer; + } + + /** * Below method will be used to get the file footer object for * * @param infoList blocklet info @@ -162,6 +211,20 @@ public class CarbonMetadataUtil { return numberOfRows; } + /** + * Get total number of rows for the file. + * + * @param infoList + * @return + */ + private static long getNumberOfRowForFooter(List<BlockletInfo3> infoList) { + long numberOfRows = 0; + for (BlockletInfo3 info : infoList) { + numberOfRows += info.num_rows; + } + return numberOfRows; + } + private static BlockletIndex getBlockletIndex(BlockletInfoColumnar info) { BlockletMinMaxIndex blockletMinMaxIndex = new BlockletMinMaxIndex(); @@ -181,9 +244,52 @@ public class CarbonMetadataUtil { return blockletIndex; } + public static BlockletIndex getBlockletIndex(List<NodeHolder> nodeHolderList, + List<CarbonMeasure> carbonMeasureList) { + BlockletMinMaxIndex blockletMinMaxIndex = new BlockletMinMaxIndex(); + for (byte[] max : nodeHolderList.get(nodeHolderList.size() - 1).getColumnMaxData()) { + blockletMinMaxIndex.addToMax_values(ByteBuffer.wrap(max)); + } + for (byte[] min : nodeHolderList.get(0).getColumnMinData()) { + blockletMinMaxIndex.addToMin_values(ByteBuffer.wrap(min)); + } + byte[][] measureMaxValue = nodeHolderList.get(0).getMeasureColumnMaxData().clone(); + byte[][] measureMinValue = nodeHolderList.get(0).getMeasureColumnMinData().clone(); + byte[] minVal = null; + byte[] maxVal = null; + for (int i = 1; i < nodeHolderList.size(); i++) { + for (int j = 0; j < measureMinValue.length; j++) { + minVal = nodeHolderList.get(i).getMeasureColumnMinData()[j]; + maxVal = nodeHolderList.get(i).getMeasureColumnMaxData()[j]; + if (compareMeasureData(measureMaxValue[j], maxVal, carbonMeasureList.get(j).getDataType()) + < 0) { + measureMaxValue[j] = maxVal.clone(); + } + if (compareMeasureData(measureMinValue[j], minVal, carbonMeasureList.get(j).getDataType()) + > 0) { + measureMinValue[j] = minVal.clone(); + } + } + } + + for (byte[] max : measureMaxValue) { + blockletMinMaxIndex.addToMax_values(ByteBuffer.wrap(max)); + } + for (byte[] min : measureMinValue) { + blockletMinMaxIndex.addToMin_values(ByteBuffer.wrap(min)); + } + BlockletBTreeIndex blockletBTreeIndex = new BlockletBTreeIndex(); + blockletBTreeIndex.setStart_key(nodeHolderList.get(0).getStartKey()); + blockletBTreeIndex.setEnd_key(nodeHolderList.get(nodeHolderList.size() - 1).getEndKey()); + BlockletIndex blockletIndex = new BlockletIndex(); + blockletIndex.setMin_max_index(blockletMinMaxIndex); + blockletIndex.setB_tree_index(blockletBTreeIndex); + return blockletIndex; + } + /** - * Below method will be used to get the blocklet info object for - * data version 2 file + * Below method will be used to get the blocklet info object for data version + * 2 file * * @param blockletInfoColumnar blocklet info * @param dataChunkOffsets data chunks offsets @@ -222,7 +328,8 @@ public class CarbonMetadataUtil { encodings.add(Encoding.DIRECT_DICTIONARY); } dataChunk.setRowMajor(colGrpblock[i]); - //TODO : Once schema PR is merged and information needs to be passed here. + // TODO : Once schema PR is merged and information needs to be passed + // here. dataChunk.setColumn_ids(new ArrayList<Integer>()); dataChunk.setData_page_length(blockletInfoColumnar.getKeyLengths()[i]); dataChunk.setData_page_offset(blockletInfoColumnar.getKeyOffSets()[i]); @@ -242,7 +349,8 @@ public class CarbonMetadataUtil { j++; } - //TODO : Right now the encodings are happening at runtime. change as per this encoders. + // TODO : Right now the encodings are happening at runtime. change as per + // this encoders. dataChunk.setEncoders(encodings); colDataChunks.add(dataChunk); @@ -252,24 +360,26 @@ public class CarbonMetadataUtil { DataChunk dataChunk = new DataChunk(); dataChunk.setChunk_meta(getChunkCompressionMeta()); dataChunk.setRowMajor(false); - //TODO : Once schema PR is merged and information needs to be passed here. + // TODO : Once schema PR is merged and information needs to be passed + // here. dataChunk.setColumn_ids(new ArrayList<Integer>()); dataChunk.setData_page_length(blockletInfoColumnar.getMeasureLength()[i]); dataChunk.setData_page_offset(blockletInfoColumnar.getMeasureOffset()[i]); - //TODO : Right now the encodings are happening at runtime. change as per this encoders. + // TODO : Right now the encodings are happening at runtime. change as per + // this encoders. List<Encoding> encodings = new ArrayList<Encoding>(); encodings.add(Encoding.DELTA); dataChunk.setEncoders(encodings); - //TODO writing dummy presence meta need to set actual presence - //meta + // TODO writing dummy presence meta need to set actual presence + // meta PresenceMeta presenceMeta = new PresenceMeta(); presenceMeta.setPresent_bit_streamIsSet(true); presenceMeta .setPresent_bit_stream(blockletInfoColumnar.getMeasureNullValueIndex()[i].toByteArray()); dataChunk.setPresence(presenceMeta); - //TODO : PresenceMeta needs to be implemented and set here + // TODO : PresenceMeta needs to be implemented and set here // dataChunk.setPresence(new PresenceMeta()); - //TODO : Need to write ValueCompression meta here. + // TODO : Need to write ValueCompression meta here. List<ByteBuffer> encoderMetaList = new ArrayList<ByteBuffer>(); encoderMetaList.add(ByteBuffer.wrap(serializeEncoderMeta( createValueEncoderMeta(blockletInfoColumnar.getCompressionModel(), i)))); @@ -291,7 +401,7 @@ public class CarbonMetadataUtil { private static boolean containsEncoding(int blockIndex, Encoding encoding, List<ColumnSchema> columnSchemas, SegmentProperties segmentProperties) { Set<Integer> dimOrdinals = segmentProperties.getDimensionOrdinalForBlock(blockIndex); - //column groups will always have dictionary encoding + // column groups will always have dictionary encoding if (dimOrdinals.size() > 1 && Encoding.DICTIONARY == encoding) { return true; } @@ -336,7 +446,8 @@ public class CarbonMetadataUtil { } /** - * It converts FileFooter thrift object to list of BlockletInfoColumnar objects + * It converts FileFooter thrift object to list of BlockletInfoColumnar + * objects * * @param footer * @return @@ -486,8 +597,8 @@ public class CarbonMetadataUtil { } /** - * Below method will be used to get the block index info thrift object for each block - * present in the segment + * Below method will be used to get the block index info thrift object for + * each block present in the segment * * @param blockIndexInfoList block index info list * @return list of block index @@ -508,8 +619,7 @@ public class CarbonMetadataUtil { } /** - * Below method will be used to get the data chunk object for all the - * columns + * Below method will be used to get the data chunk object for all the columns * * @param blockletInfoColumnar blocklet info * @param columnSchenma list of columns @@ -536,7 +646,8 @@ public class CarbonMetadataUtil { encodings.add(Encoding.DIRECT_DICTIONARY); } dataChunk.setRowMajor(colGrpblock[i]); - //TODO : Once schema PR is merged and information needs to be passed here. + // TODO : Once schema PR is merged and information needs to be passed + // here. dataChunk.setData_page_length(blockletInfoColumnar.getKeyLengths()[i]); if (aggKeyBlock[i]) { dataChunk.setRle_page_length(blockletInfoColumnar.getDataIndexMapLength()[aggregateIndex]); @@ -552,7 +663,8 @@ public class CarbonMetadataUtil { rowIdIndex++; } - //TODO : Right now the encodings are happening at runtime. change as per this encoders. + // TODO : Right now the encodings are happening at runtime. change as per + // this encoders. dataChunk.setEncoders(encodings); colDataChunks.add(dataChunk); @@ -562,22 +674,24 @@ public class CarbonMetadataUtil { DataChunk2 dataChunk = new DataChunk2(); dataChunk.setChunk_meta(getChunkCompressionMeta()); dataChunk.setRowMajor(false); - //TODO : Once schema PR is merged and information needs to be passed here. + // TODO : Once schema PR is merged and information needs to be passed + // here. dataChunk.setData_page_length(blockletInfoColumnar.getMeasureLength()[i]); - //TODO : Right now the encodings are happening at runtime. change as per this encoders. + // TODO : Right now the encodings are happening at runtime. change as per + // this encoders. List<Encoding> encodings = new ArrayList<Encoding>(); encodings.add(Encoding.DELTA); dataChunk.setEncoders(encodings); - //TODO writing dummy presence meta need to set actual presence - //meta + // TODO writing dummy presence meta need to set actual presence + // meta PresenceMeta presenceMeta = new PresenceMeta(); presenceMeta.setPresent_bit_streamIsSet(true); presenceMeta.setPresent_bit_stream(CompressorFactory.getInstance().getCompressor() .compressByte(blockletInfoColumnar.getMeasureNullValueIndex()[i].toByteArray())); dataChunk.setPresence(presenceMeta); - //TODO : PresenceMeta needs to be implemented and set here + // TODO : PresenceMeta needs to be implemented and set here // dataChunk.setPresence(new PresenceMeta()); - //TODO : Need to write ValueCompression meta here. + // TODO : Need to write ValueCompression meta here. List<ByteBuffer> encoderMetaList = new ArrayList<ByteBuffer>(); encoderMetaList.add(ByteBuffer.wrap(serializeEncoderMeta( createValueEncoderMeta(blockletInfoColumnar.getCompressionModel(), i)))); @@ -586,4 +700,189 @@ public class CarbonMetadataUtil { } return colDataChunks; } + + /** + * Below method will be used to get the data chunk object for all the columns + * + * @param blockletInfoColumnar blocklet info + * @param columnSchenma list of columns + * @param segmentProperties segment properties + * @return list of data chunks + * @throws IOException + */ + private static List<DataChunk2> getDatachunk2(List<NodeHolder> nodeHolderList, + List<ColumnSchema> columnSchenma, SegmentProperties segmentProperties, int index, + boolean isDimensionColumn) throws IOException { + List<DataChunk2> colDataChunks = new ArrayList<DataChunk2>(); + DataChunk2 dataChunk = null; + NodeHolder nodeHolder = null; + for (int i = 0; i < nodeHolderList.size(); i++) { + nodeHolder = nodeHolderList.get(i); + dataChunk = new DataChunk2(); + dataChunk.min_max = new BlockletMinMaxIndex(); + dataChunk.setChunk_meta(getChunkCompressionMeta()); + dataChunk.setNumberOfRowsInpage(nodeHolder.getEntryCount()); + List<Encoding> encodings = new ArrayList<Encoding>(); + if (isDimensionColumn) { + dataChunk.setData_page_length(nodeHolder.getKeyLengths()[index]); + if (containsEncoding(index, Encoding.DICTIONARY, columnSchenma, segmentProperties)) { + encodings.add(Encoding.DICTIONARY); + } + if (containsEncoding(index, Encoding.DIRECT_DICTIONARY, columnSchenma, segmentProperties)) { + encodings.add(Encoding.DIRECT_DICTIONARY); + } + dataChunk.setRowMajor(nodeHolder.getColGrpBlocks()[index]); + // TODO : Once schema PR is merged and information needs to be passed + // here. + if (nodeHolder.getAggBlocks()[index]) { + dataChunk.setRle_page_length(nodeHolder.getDataIndexMapLength()[index]); + encodings.add(Encoding.RLE); + } + dataChunk.setSort_state(nodeHolder.getIsSortedKeyBlock()[index] ? + SortState.SORT_EXPLICIT : + SortState.SORT_NATIVE); + + if (!nodeHolder.getIsSortedKeyBlock()[index]) { + dataChunk.setRowid_page_length(nodeHolder.getKeyBlockIndexLength()[index]); + encodings.add(Encoding.INVERTED_INDEX); + } + dataChunk.min_max.addToMax_values(ByteBuffer.wrap(nodeHolder.getColumnMaxData()[index])); + dataChunk.min_max.addToMin_values(ByteBuffer.wrap(nodeHolder.getColumnMinData()[index])); + } else { + dataChunk.setData_page_length(nodeHolder.getDataArray()[index].length); + // TODO : Right now the encodings are happening at runtime. change as + // per this encoders. + dataChunk.setEncoders(encodings); + + dataChunk.setRowMajor(false); + // TODO : Right now the encodings are happening at runtime. change as + // per this encoders. + encodings.add(Encoding.DELTA); + dataChunk.setEncoders(encodings); + // TODO writing dummy presence meta need to set actual presence + // meta + PresenceMeta presenceMeta = new PresenceMeta(); + presenceMeta.setPresent_bit_streamIsSet(true); + presenceMeta.setPresent_bit_stream(CompressorFactory.getInstance().getCompressor() + .compressByte(nodeHolder.getMeasureNullValueIndex()[index].toByteArray())); + dataChunk.setPresence(presenceMeta); + List<ByteBuffer> encoderMetaList = new ArrayList<ByteBuffer>(); + encoderMetaList.add(ByteBuffer.wrap(serializeEncodeMetaUsingByteBuffer( + createValueEncoderMeta(nodeHolder.getCompressionModel(), index)))); + dataChunk.setEncoder_meta(encoderMetaList); + dataChunk.min_max + .addToMax_values(ByteBuffer.wrap(nodeHolder.getMeasureColumnMaxData()[index])); + dataChunk.min_max + .addToMin_values(ByteBuffer.wrap(nodeHolder.getMeasureColumnMinData()[index])); + } + dataChunk.setEncoders(encodings); + colDataChunks.add(dataChunk); + } + return colDataChunks; + } + + public static DataChunk3 getDataChunk3(List<NodeHolder> nodeHolderList, + List<ColumnSchema> columnSchenma, SegmentProperties segmentProperties, int index, + boolean isDimensionColumn) throws IOException { + List<DataChunk2> dataChunksList = + getDatachunk2(nodeHolderList, columnSchenma, segmentProperties, index, isDimensionColumn); + int offset = 0; + DataChunk3 dataChunk = new DataChunk3(); + List<Integer> pageOffsets = new ArrayList<>(); + List<Integer> pageLengths = new ArrayList<>(); + int length = 0; + for (int i = 0; i < dataChunksList.size(); i++) { + pageOffsets.add(offset); + length = + dataChunksList.get(i).getData_page_length() + dataChunksList.get(i).getRle_page_length() + + dataChunksList.get(i).getRowid_page_length(); + pageLengths.add(length); + offset += length; + } + dataChunk.setData_chunk_list(dataChunksList); + dataChunk.setPage_length(pageLengths); + dataChunk.setPage_offset(pageOffsets); + return dataChunk; + } + + public static byte[] serializeEncodeMetaUsingByteBuffer(ValueEncoderMeta valueEncoderMeta) { + ByteBuffer buffer = null; + if (valueEncoderMeta.getType() == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) { + buffer = ByteBuffer.allocate( + (CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE + + 3); + buffer.putChar(valueEncoderMeta.getType()); + buffer.putDouble((Double) valueEncoderMeta.getMaxValue()); + buffer.putDouble((Double) valueEncoderMeta.getMinValue()); + buffer.putDouble((Double) valueEncoderMeta.getUniqueValue()); + } else if (valueEncoderMeta.getType() == CarbonCommonConstants.BIG_INT_MEASURE) { + buffer = ByteBuffer.allocate( + (CarbonCommonConstants.LONG_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE + + 3); + buffer.putChar(valueEncoderMeta.getType()); + buffer.putLong((Long) valueEncoderMeta.getMaxValue()); + buffer.putLong((Long) valueEncoderMeta.getMinValue()); + buffer.putLong((Long) valueEncoderMeta.getUniqueValue()); + } else { + buffer = ByteBuffer.allocate(CarbonCommonConstants.INT_SIZE_IN_BYTE + 3); + buffer.putChar(valueEncoderMeta.getType()); + } + buffer.putInt(valueEncoderMeta.getDecimal()); + buffer.put(valueEncoderMeta.getDataTypeSelected()); + buffer.flip(); + return buffer.array(); + } + + public static byte[] getByteValueForMeasure(Object data, DataType dataType) { + ByteBuffer b = null; + switch (dataType) { + case DOUBLE: + b = ByteBuffer.allocate(8); + b.putDouble((Double) data); + b.flip(); + return b.array(); + case LONG: + case INT: + case SHORT: + b = ByteBuffer.allocate(8); + b.putLong((Long) data); + b.flip(); + return b.array(); + case DECIMAL: + return DataTypeUtil.bigDecimalToByte((BigDecimal)data); + default: + throw new IllegalArgumentException("Invalid data type"); + } + } + + public static int compareMeasureData(byte[] first, byte[] second, DataType dataType) { + ByteBuffer firstBuffer = null; + ByteBuffer secondBuffer = null; + switch (dataType) { + case DOUBLE: + firstBuffer = ByteBuffer.allocate(8); + firstBuffer.put(first); + secondBuffer = ByteBuffer.allocate(8); + secondBuffer.put(first); + firstBuffer.flip(); + secondBuffer.flip(); + return (int) (firstBuffer.getDouble() - secondBuffer.getDouble()); + case LONG: + case INT: + case SHORT: + firstBuffer = ByteBuffer.allocate(8); + firstBuffer.put(first); + secondBuffer = ByteBuffer.allocate(8); + secondBuffer.put(first); + firstBuffer.flip(); + secondBuffer.flip(); + return (int) (firstBuffer.getLong() - secondBuffer.getLong()); + case DECIMAL: + return DataTypeUtil.byteToBigDecimal(first) + .compareTo(DataTypeUtil.byteToBigDecimal(second)); + default: + throw new IllegalArgumentException("Invalid data type"); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index 962d352..39c36ea 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -26,6 +26,7 @@ import java.util.Properties; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; import org.apache.carbondata.core.metadata.ColumnarFormatVersion; public final class CarbonProperties { @@ -85,6 +86,9 @@ public final class CarbonProperties { validateCarbonDataFileVersion(); validateExecutorStartUpTime(); validatePrefetchBufferSize(); + validateNumberOfPagesPerBlocklet(); + validateNumberOfColumnPerIORead(); + validateNumberOfRowsPerBlockletColumnPage(); } private void validatePrefetchBufferSize() { @@ -107,6 +111,93 @@ public final class CarbonProperties { } } + /** + * This method validates the number of pages per blocklet column + */ + private void validateNumberOfPagesPerBlocklet() { + String numberOfPagePerBlockletColumnString = carbonProperties + .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN, + CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_DEFAULT_VALUE); + try { + short numberOfPagePerBlockletColumn = Short.parseShort(numberOfPagePerBlockletColumnString); + if (numberOfPagePerBlockletColumn + < CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_MIN + || numberOfPagePerBlockletColumn + > CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_MAX) { + LOGGER.info( + "The Number Of pages per blocklet column value \"" + numberOfPagePerBlockletColumnString + + "\" is invalid. Using the default value \"" + + CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_DEFAULT_VALUE); + carbonProperties.setProperty(CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN, + CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_DEFAULT_VALUE); + } + } catch (NumberFormatException e) { + LOGGER.info( + "The Number Of pages per blocklet column value \"" + numberOfPagePerBlockletColumnString + + "\" is invalid. Using the default value \"" + + CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_DEFAULT_VALUE); + carbonProperties.setProperty(CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN, + CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_DEFAULT_VALUE); + } + } + + /** + * This method validates the number of column read in one IO + */ + private void validateNumberOfColumnPerIORead() { + String numberofColumnPerIOString = carbonProperties + .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO, + CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE); + try { + short numberofColumnPerIO = Short.parseShort(numberofColumnPerIOString); + if (numberofColumnPerIO < CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_MIN + || numberofColumnPerIO > CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_MAX) { + LOGGER.info("The Number Of pages per blocklet column value \"" + numberofColumnPerIOString + + "\" is invalid. Using the default value \"" + + CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE); + carbonProperties.setProperty(CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO, + CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE); + } + } catch (NumberFormatException e) { + LOGGER.info("The Number Of pages per blocklet column value \"" + numberofColumnPerIOString + + "\" is invalid. Using the default value \"" + + CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE); + carbonProperties.setProperty(CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO, + CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE); + } + } + + /** + * This method validates the number of column read in one IO + */ + private void validateNumberOfRowsPerBlockletColumnPage() { + String numberOfRowsPerBlockletColumnPageString = carbonProperties + .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE, + CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT); + try { + short numberOfRowsPerBlockletColumnPage = + Short.parseShort(numberOfRowsPerBlockletColumnPageString); + if (numberOfRowsPerBlockletColumnPage + < CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_MIN + || numberOfRowsPerBlockletColumnPage + > CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_MAX) { + LOGGER.info("The Number Of rows per blocklet column pages value \"" + + numberOfRowsPerBlockletColumnPageString + "\" is invalid. Using the default value \"" + + CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT); + carbonProperties + .setProperty(CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE, + CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT); + } + } catch (NumberFormatException e) { + LOGGER.info("The Number Of rows per blocklet column pages value \"" + + numberOfRowsPerBlockletColumnPageString + "\" is invalid. Using the default value \"" + + CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT); + carbonProperties + .setProperty(CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE, + CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT); + } + } + private void validateBadRecordsLocation() { String badRecordsLocation = carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC); @@ -288,17 +379,16 @@ public final class CarbonProperties { carbonProperties.getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION); if (carbondataFileVersionString == null) { // use default property if user does not specify version property - carbonProperties - .setProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, - CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION); + carbonProperties.setProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, + CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION); } else { try { ColumnarFormatVersion.valueOf(carbondataFileVersionString); } catch (IllegalArgumentException e) { // use default property if user specifies an invalid version property - LOGGER.warn("Specified file version property is invalid: " + - carbondataFileVersionString + ". Using " + - CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION + " as default file version"); + LOGGER.warn("Specified file version property is invalid: " + carbondataFileVersionString + + ". Using " + CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION + + " as default file version"); carbonProperties.setProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION); } @@ -569,6 +659,7 @@ public final class CarbonProperties { /** * Returns configured update deleta files value for IUD compaction + * * @return numberOfDeltaFilesThreshold */ public int getNoUpdateDeltaFilesThresholdForIUDCompaction() { @@ -588,8 +679,7 @@ public final class CarbonProperties { } } catch (NumberFormatException e) { LOGGER.error("The specified value for property " - + CarbonCommonConstants.UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION - + "is incorrect." + + CarbonCommonConstants.UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION + "is incorrect." + " Correct value should be in range of 0 -10000. Taking the default value."); numberOfDeltaFilesThreshold = Integer .parseInt(CarbonCommonConstants.DEFAULT_UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION); @@ -599,6 +689,7 @@ public final class CarbonProperties { /** * Returns configured delete deleta files value for IUD compaction + * * @return numberOfDeltaFilesThreshold */ public int getNoDeleteDeltaFilesThresholdForIUDCompaction() { @@ -618,8 +709,7 @@ public final class CarbonProperties { } } catch (NumberFormatException e) { LOGGER.error("The specified value for property " - + CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION - + "is incorrect." + + CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION + "is incorrect." + " Correct value should be in range of 0 -10000. Taking the default value."); numberOfDeltaFilesThreshold = Integer .parseInt(CarbonCommonConstants.DEFAULT_DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index b9a96d2..5a656e0 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -73,6 +73,7 @@ import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.format.DataChunk2; +import org.apache.carbondata.format.DataChunk3; import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.conf.Configuration; @@ -471,6 +472,28 @@ public final class CarbonUtil { numberCompressor.unCompress(indexMap, 0, indexMap.length)); } + public static int[] getUnCompressColumnIndex(int totalLength, ByteBuffer buffer, int offset) { + buffer.position(offset); + int indexDataLength = buffer.getInt(); + int indexMapLength = totalLength - indexDataLength - CarbonCommonConstants.INT_SIZE_IN_BYTE; + int[] indexData = getIntArray(buffer, buffer.position(), indexDataLength); + int[] indexMap = getIntArray(buffer, buffer.position(), indexMapLength); + return UnBlockIndexer.uncompressIndex(indexData, indexMap); + } + + public static int[] getIntArray(ByteBuffer data, int offset, int length) { + if (length == 0) { + return new int[0]; + } + data.position(offset); + int[] intArray = new int[length / 2]; + int index = 0; + while (index < intArray.length) { + intArray[index++] = data.getShort(); + } + return intArray; + } + /** * Convert int array to Integer list * @@ -1233,6 +1256,18 @@ public final class CarbonUtil { }, offset, length); } + public static DataChunk3 readDataChunk3(ByteBuffer dataChunkBuffer, int offset, int length) + throws IOException { + byte[] data = new byte[length]; + dataChunkBuffer.position(offset); + dataChunkBuffer.get(data); + return (DataChunk3) read(data, new ThriftReader.TBaseCreator() { + @Override public TBase create() { + return new DataChunk3(); + } + }, 0, length); + } + public static DataChunk2 readDataChunk(ByteBuffer dataChunkBuffer, int offset, int length) throws IOException { byte[] data = new byte[length]; @@ -1293,6 +1328,35 @@ public final class CarbonUtil { return meta; } + public static ValueEncoderMeta deserializeEncoderMetaNew(byte[] encodeMeta) { + ByteBuffer buffer = ByteBuffer.wrap(encodeMeta); + char measureType = buffer.getChar(); + ValueEncoderMeta valueEncoderMeta = new ValueEncoderMeta(); + valueEncoderMeta.setType(measureType); + switch (measureType) { + case CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE: + valueEncoderMeta.setMaxValue(buffer.getDouble()); + valueEncoderMeta.setMinValue(buffer.getDouble()); + valueEncoderMeta.setUniqueValue(buffer.getDouble()); + break; + case CarbonCommonConstants.BIG_DECIMAL_MEASURE: + valueEncoderMeta.setMaxValue(0.0); + valueEncoderMeta.setMinValue(0.0); + valueEncoderMeta.setUniqueValue(0.0); + break; + case CarbonCommonConstants.BIG_INT_MEASURE: + valueEncoderMeta.setMaxValue(buffer.getLong()); + valueEncoderMeta.setMinValue(buffer.getLong()); + valueEncoderMeta.setUniqueValue(buffer.getLong()); + break; + default: + throw new IllegalArgumentException("invalid measure type"); + } + valueEncoderMeta.setDecimal(buffer.getInt()); + valueEncoderMeta.setDataTypeSelected(buffer.get()); + return valueEncoderMeta; + } + /** * Below method will be used to convert indexes in range * Indexes=[0,1,2,3,4,5,6,7,8,9] @@ -1454,5 +1518,51 @@ public final class CarbonUtil { return null; } } + + /** + * Below method will be used to convert byte data to surrogate key based + * column value size + * + * @param data data + * @param startOffsetOfData start offset of data + * @param eachColumnValueSize size of each column value + * @return surrogate key + */ + public static int getSurrogateInternal(byte[] data, int startOffsetOfData, + int eachColumnValueSize) { + int surrogate = 0; + switch (eachColumnValueSize) { + case 1: + surrogate <<= 8; + surrogate ^= data[startOffsetOfData] & 0xFF; + return surrogate; + case 2: + surrogate <<= 8; + surrogate ^= data[startOffsetOfData] & 0xFF; + surrogate <<= 8; + surrogate ^= data[startOffsetOfData + 1] & 0xFF; + return surrogate; + case 3: + surrogate <<= 8; + surrogate ^= data[startOffsetOfData] & 0xFF; + surrogate <<= 8; + surrogate ^= data[startOffsetOfData + 1] & 0xFF; + surrogate <<= 8; + surrogate ^= data[startOffsetOfData + 2] & 0xFF; + return surrogate; + case 4: + surrogate <<= 8; + surrogate ^= data[startOffsetOfData] & 0xFF; + surrogate <<= 8; + surrogate ^= data[startOffsetOfData + 1] & 0xFF; + surrogate <<= 8; + surrogate ^= data[startOffsetOfData + 2] & 0xFF; + surrogate <<= 8; + surrogate ^= data[startOffsetOfData + 3] & 0xFF; + return surrogate; + default: + throw new IllegalArgumentException("Int cannot me more than 4 bytes"); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java index 08bfd6d..153fcb9 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java @@ -56,8 +56,11 @@ public class DataFileFooterConverterFactory { switch (version) { case V1: return new DataFileFooterConverter(); - default: + case V2: return new DataFileFooterConverter2(); + case V3: + default: + return new DataFileFooterConverterV3(); } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java new file mode 100644 index 0000000..1ab3133 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.metadata.ColumnarFormatVersion; +import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.reader.CarbonFooterReader; +import org.apache.carbondata.format.FileFooter; + +public class DataFileFooterConverterV3 extends AbstractDataFileFooterConverter { + + /** + * Below method will be used to convert thrift file meta to wrapper file meta + * This method will read the footer from footer offset present in the data file + * 1. It will set the stream offset + * 2. It will read the footer data from file + * 3. parse the footer to thrift object + * 4. convert to wrapper object + * + * @param tableBlockInfo + * table block info + * @return data file footer + */ + @Override public DataFileFooter readDataFileFooter(TableBlockInfo tableBlockInfo) + throws IOException { + DataFileFooter dataFileFooter = new DataFileFooter(); + CarbonFooterReader reader = + new CarbonFooterReader(tableBlockInfo.getFilePath(), tableBlockInfo.getBlockOffset()); + FileFooter footer = reader.readFooter(); + dataFileFooter.setVersionId(ColumnarFormatVersion.valueOf((short) footer.getVersion())); + dataFileFooter.setNumberOfRows(footer.getNum_rows()); + dataFileFooter.setSegmentInfo(getSegmentInfo(footer.getSegment_info())); + List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>(); + List<org.apache.carbondata.format.ColumnSchema> table_columns = footer.getTable_columns(); + for (int i = 0; i < table_columns.size(); i++) { + columnSchemaList.add(thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i))); + } + dataFileFooter.setColumnInTable(columnSchemaList); + + List<org.apache.carbondata.format.BlockletIndex> leaf_node_indices_Thrift = + footer.getBlocklet_index_list(); + List<BlockletIndex> blockletIndexList = new ArrayList<BlockletIndex>(); + for (int i = 0; i < leaf_node_indices_Thrift.size(); i++) { + BlockletIndex blockletIndex = getBlockletIndex(leaf_node_indices_Thrift.get(i)); + blockletIndexList.add(blockletIndex); + } + List<org.apache.carbondata.format.BlockletInfo3> leaf_node_infos_Thrift = + footer.getBlocklet_info_list3(); + List<BlockletInfo> blockletInfoList = new ArrayList<BlockletInfo>(); + for (int i = 0; i < leaf_node_infos_Thrift.size(); i++) { + BlockletInfo blockletInfo = getBlockletInfo(leaf_node_infos_Thrift.get(i), + getNumberOfDimensionColumns(columnSchemaList)); + blockletInfo.setBlockletIndex(blockletIndexList.get(i)); + blockletInfoList.add(blockletInfo); + } + dataFileFooter.setBlockletList(blockletInfoList); + dataFileFooter.setBlockletIndex(getBlockletIndexForDataFileFooter(blockletIndexList)); + return dataFileFooter; + } + + /** + * Below method is to convert the blocklet info of the thrift to wrapper + * blocklet info + * + * @param blockletInfoThrift blocklet info of the thrift + * @return blocklet info wrapper + */ + private BlockletInfo getBlockletInfo( + org.apache.carbondata.format.BlockletInfo3 blockletInfoThrift, int numberOfDimensionColumns) { + BlockletInfo blockletInfo = new BlockletInfo(); + List<Long> dimensionColumnChunkOffsets = + blockletInfoThrift.getColumn_data_chunks_offsets().subList(0, numberOfDimensionColumns); + List<Long> measureColumnChunksOffsets = blockletInfoThrift.getColumn_data_chunks_offsets() + .subList(numberOfDimensionColumns, + blockletInfoThrift.getColumn_data_chunks_offsets().size()); + List<Integer> dimensionColumnChunkLength = + blockletInfoThrift.getColumn_data_chunks_length().subList(0, numberOfDimensionColumns); + List<Integer> measureColumnChunksLength = blockletInfoThrift.getColumn_data_chunks_length() + .subList(numberOfDimensionColumns, + blockletInfoThrift.getColumn_data_chunks_offsets().size()); + blockletInfo.setDimensionChunkOffsets(dimensionColumnChunkOffsets); + blockletInfo.setMeasureChunkOffsets(measureColumnChunksOffsets); + blockletInfo.setDimensionChunksLength(dimensionColumnChunkLength); + blockletInfo.setMeasureChunksLength(measureColumnChunksLength); + blockletInfo.setNumberOfRows(blockletInfoThrift.getNum_rows()); + blockletInfo.setDimensionOffset(blockletInfoThrift.getDimension_offsets()); + blockletInfo.setMeasureOffsets(blockletInfoThrift.getMeasure_offsets()); + return blockletInfo; + } + + /** + * Below method will be used to get the number of dimension column + * in carbon column schema + * + * @param columnSchemaList column schema list + * @return number of dimension column + */ + private int getNumberOfDimensionColumns(List<ColumnSchema> columnSchemaList) { + int numberOfDimensionColumns = 0; + int previousColumnGroupId = -1; + ColumnSchema columnSchema = null; + for (int i = 0; i < columnSchemaList.size(); i++) { + columnSchema = columnSchemaList.get(i); + if (columnSchema.isDimensionColumn() && columnSchema.isColumnar()) { + numberOfDimensionColumns++; + } else if (columnSchema.isDimensionColumn()) { + if (previousColumnGroupId != columnSchema.getColumnGroupId()) { + previousColumnGroupId = columnSchema.getColumnGroupId(); + numberOfDimensionColumns++; + } + } else { + break; + } + } + return numberOfDimensionColumns; + } + +} + http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java b/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java new file mode 100644 index 0000000..d46b806 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.util; + +import java.util.BitSet; + +import org.apache.carbondata.core.datastore.compression.WriterCompressModel; + +public class NodeHolder { + /** + * keyArray + */ + private byte[][] keyArray; + + /** + * dataArray + */ + private byte[][] dataArray; + + /** + * measureLenght + */ + private int[] measureLenght; + + /** + * startKey + */ + private byte[] startKey; + + /** + * endKey + */ + private byte[] endKey; + + /** + * entryCount + */ + private int entryCount; + /** + * keyLenghts + */ + private int[] keyLengths; + + /** + * dataAfterCompression + */ + private short[][] dataAfterCompression; + + /** + * indexMap + */ + private short[][] indexMap; + + /** + * keyIndexBlockLenght + */ + private int[] keyBlockIndexLength; + + /** + * isSortedKeyBlock + */ + private boolean[] isSortedKeyBlock; + + private byte[][] compressedIndex; + + private byte[][] compressedIndexMap; + + /** + * dataIndexMap + */ + private int[] dataIndexMapLength; + + /** + * dataIndexMap + */ + private int[] dataIndexMapOffsets; + + /** + * compressedDataIndex + */ + private byte[][] compressedDataIndex; + + /** + * column max data + */ + private byte[][] columnMaxData; + + /** + * column min data + */ + private byte[][] columnMinData; + + private byte[][] measureColumnMaxData; + + private byte[][] measureColumnMinData; + + /** + * compression model for numbers data block. + */ + private WriterCompressModel compressionModel; + + /** + * array of aggBlocks flag to identify the aggBlocks + */ + private boolean[] aggBlocks; + + /** + * all columns max value + */ + private byte[][] allMaxValue; + + /** + * all column max value + */ + private byte[][] allMinValue; + + /** + * true if given index is colgroup block + */ + private boolean[] colGrpBlock; + + /** + * bit set which will holds the measure + * indexes which are null + */ + private BitSet[] measureNullValueIndex; + + /** + * total length of dimension values + */ + private int totalDimensionArrayLength; + + /** + * total length of all measure values + */ + private int totalMeasureArrayLength; + + /** + * @return the keyArray + */ + public byte[][] getKeyArray() { + return keyArray; + } + + /** + * @param keyArray the keyArray to set + */ + public void setKeyArray(byte[][] keyArray) { + this.keyArray = keyArray; + } + + /** + * @return the dataArray + */ + public byte[][] getDataArray() { + return dataArray; + } + + /** + * @param dataArray the dataArray to set + */ + public void setDataArray(byte[][] dataArray) { + this.dataArray = dataArray; + } + + /** + * @return the measureLenght + */ + public int[] getMeasureLenght() { + return measureLenght; + } + + /** + * @param measureLenght the measureLenght to set + */ + public void setMeasureLenght(int[] measureLenght) { + this.measureLenght = measureLenght; + } + + /** + * @return the startKey + */ + public byte[] getStartKey() { + return startKey; + } + + /** + * @param startKey the startKey to set + */ + public void setStartKey(byte[] startKey) { + this.startKey = startKey; + } + + /** + * @return the endKey + */ + public byte[] getEndKey() { + return endKey; + } + + /** + * @param endKey the endKey to set + */ + public void setEndKey(byte[] endKey) { + this.endKey = endKey; + } + + /** + * @return the entryCount + */ + public int getEntryCount() { + return entryCount; + } + + /** + * @param entryCount the entryCount to set + */ + public void setEntryCount(int entryCount) { + this.entryCount = entryCount; + } + + /** + * @return the keyLenghts + */ + public int[] getKeyLengths() { + return keyLengths; + } + + public void setKeyLengths(int[] keyLengths) { + this.keyLengths = keyLengths; + } + + /** + * @return the keyBlockIndexLength + */ + public int[] getKeyBlockIndexLength() { + return keyBlockIndexLength; + } + + /** + * @param keyBlockIndexLength the keyBlockIndexLength to set + */ + public void setKeyBlockIndexLength(int[] keyBlockIndexLength) { + this.keyBlockIndexLength = keyBlockIndexLength; + } + + /** + * @return the isSortedKeyBlock + */ + public boolean[] getIsSortedKeyBlock() { + return isSortedKeyBlock; + } + + /** + * @param isSortedKeyBlock the isSortedKeyBlock to set + */ + public void setIsSortedKeyBlock(boolean[] isSortedKeyBlock) { + this.isSortedKeyBlock = isSortedKeyBlock; + } + + /** + * @return the compressedIndexex + */ + public byte[][] getCompressedIndex() { + return compressedIndex; + } + + public void setCompressedIndex(byte[][] compressedIndex) { + this.compressedIndex = compressedIndex; + } + + /** + * @return the compressedIndexMap + */ + public byte[][] getCompressedIndexMap() { + return compressedIndexMap; + } + + /** + * @param compressedIndexMap the compressedIndexMap to set + */ + public void setCompressedIndexMap(byte[][] compressedIndexMap) { + this.compressedIndexMap = compressedIndexMap; + } + + /** + * @return the compressedDataIndex + */ + public byte[][] getCompressedDataIndex() { + return compressedDataIndex; + } + + /** + * @param compressedDataIndex the compressedDataIndex to set + */ + public void setCompressedDataIndex(byte[][] compressedDataIndex) { + this.compressedDataIndex = compressedDataIndex; + } + + /** + * @return the dataIndexMapLength + */ + public int[] getDataIndexMapLength() { + return dataIndexMapLength; + } + + /** + * @param dataIndexMapLength the dataIndexMapLength to set + */ + public void setDataIndexMapLength(int[] dataIndexMapLength) { + this.dataIndexMapLength = dataIndexMapLength; + } + + public byte[][] getColumnMaxData() { + return this.columnMaxData; + } + + public void setColumnMaxData(byte[][] columnMaxData) { + this.columnMaxData = columnMaxData; + } + + public byte[][] getColumnMinData() { + return this.columnMinData; + } + + public void setColumnMinData(byte[][] columnMinData) { + this.columnMinData = columnMinData; + } + + public WriterCompressModel getCompressionModel() { + return compressionModel; + } + + public void setCompressionModel(WriterCompressModel compressionModel) { + this.compressionModel = compressionModel; + } + + /** + * returns array of aggBlocks flag to identify the aag blocks + * + * @return + */ + public boolean[] getAggBlocks() { + return aggBlocks; + } + + /** + * set array of aggBlocks flag to identify the aggBlocks + * + * @param aggBlocks + */ + public void setAggBlocks(boolean[] aggBlocks) { + this.aggBlocks = aggBlocks; + } + + /** + * @return + */ + public boolean[] getColGrpBlocks() { + return this.colGrpBlock; + } + + /** + * @param colGrpBlock true if block is column group + */ + public void setColGrpBlocks(boolean[] colGrpBlock) { + this.colGrpBlock = colGrpBlock; + } + + /** + * @return the measureNullValueIndex + */ + public BitSet[] getMeasureNullValueIndex() { + return measureNullValueIndex; + } + + /** + * @param measureNullValueIndex the measureNullValueIndex to set + */ + public void setMeasureNullValueIndex(BitSet[] measureNullValueIndex) { + this.measureNullValueIndex = measureNullValueIndex; + } + + public int getTotalDimensionArrayLength() { + return totalDimensionArrayLength; + } + + public void setTotalDimensionArrayLength(int totalDimensionArrayLength) { + this.totalDimensionArrayLength = totalDimensionArrayLength; + } + + public int getTotalMeasureArrayLength() { + return totalMeasureArrayLength; + } + + public void setTotalMeasureArrayLength(int totalMeasureArrayLength) { + this.totalMeasureArrayLength = totalMeasureArrayLength; + } + + public byte[][] getMeasureColumnMaxData() { + return measureColumnMaxData; + } + + public void setMeasureColumnMaxData(byte[][] measureColumnMaxData) { + this.measureColumnMaxData = measureColumnMaxData; + } + + public byte[][] getMeasureColumnMinData() { + return measureColumnMinData; + } + + public void setMeasureColumnMinData(byte[][] measureColumnMinData) { + this.measureColumnMinData = measureColumnMinData; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java index 3935bdc..2c6c890 100644 --- a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java +++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java @@ -155,7 +155,7 @@ public class CarbonMetadataUtilTest { segmentInfo.setNum_cols(0); segmentInfo.setColumn_cardinalities(CarbonUtil.convertToIntegerList(columnCardinality)); IndexHeader indexHeader = new IndexHeader(); - indexHeader.setVersion(2); + indexHeader.setVersion(3); indexHeader.setSegment_info(segmentInfo); indexHeader.setTable_columns(columnSchemaList); indexHeader.setBucket_id(0); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/format/src/main/thrift/carbondata.thrift ---------------------------------------------------------------------- diff --git a/format/src/main/thrift/carbondata.thrift b/format/src/main/thrift/carbondata.thrift index 759fbf7..3114ee1 100644 --- a/format/src/main/thrift/carbondata.thrift +++ b/format/src/main/thrift/carbondata.thrift @@ -127,10 +127,21 @@ struct DataChunk2{ 7: optional SortState sort_state; 8: optional list<schema.Encoding> encoders; // The List of encoders overriden at node level 9: optional list<binary> encoder_meta; // extra information required by encoders -} + 10: optional BlockletMinMaxIndex min_max; + 11: optional i32 numberOfRowsInpage; + } /** +* Represents a chunk of data. The chunk can be a single column stored in Column Major format or a group of columns stored in Row Major Format. +**/ +struct DataChunk3{ + 1: required list<DataChunk2> data_chunk_list; // list of data chunk + 2: optional list<i32> page_offset; // offset of each chunk + 3: optional list<i32> page_length; // length of each chunk + + } +/** * Information about a blocklet */ struct BlockletInfo{ @@ -146,7 +157,16 @@ struct BlockletInfo2{ 2: required list<i64> column_data_chunks_offsets; // Information about offsets all column chunks in this blocklet 3: required list<i16> column_data_chunks_length; // Information about length all column chunks in this blocklet } - +/** +* Information about a blocklet +*/ +struct BlockletInfo3{ + 1: required i32 num_rows; // Number of rows in this blocklet + 2: required list<i64> column_data_chunks_offsets; // Information about offsets all column chunks in this blocklet + 3: required list<i32> column_data_chunks_length; // Information about length all column chunks in this blocklet + 4: required i64 dimension_offsets; + 5: required i64 measure_offsets; + } /** * Footer for indexed carbon file */ @@ -158,7 +178,8 @@ struct FileFooter{ 5: required list<BlockletIndex> blocklet_index_list; // blocklet index of all blocklets in this file 6: optional list<BlockletInfo> blocklet_info_list; // Information about blocklets of all columns in this file 7: optional list<BlockletInfo2> blocklet_info_list2; // Information about blocklets of all columns in this file - 8: optional dictionary.ColumnDictionaryChunk dictionary; // blocklet local dictionary + 8: optional list<BlockletInfo3> blocklet_info_list3; // Information about blocklets of all columns in this file + 9: optional dictionary.ColumnDictionaryChunk dictionary; // blocklet local dictionary } /** http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java index 828ece8..3f75cd1 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java @@ -22,6 +22,7 @@ import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo; import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter; import org.apache.carbondata.processing.store.writer.v1.CarbonFactDataWriterImplV1; import org.apache.carbondata.processing.store.writer.v2.CarbonFactDataWriterImplV2; +import org.apache.carbondata.processing.store.writer.v3.CarbonFactDataWriterImplV3; /** * Factory class to get the writer instance @@ -62,8 +63,12 @@ public class CarbonDataWriterFactory { switch (version) { case V1: return new CarbonFactDataWriterImplV1(carbonDataWriterVo); - default: + case V2: return new CarbonFactDataWriterImplV2(carbonDataWriterVo); + case V3: + return new CarbonFactDataWriterImplV3(carbonDataWriterVo); + default: + return new CarbonFactDataWriterImplV3(carbonDataWriterVo); } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java index bf66700..0699167 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java @@ -40,9 +40,11 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForInt; import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndex; +import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort; import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel; import org.apache.carbondata.core.datastore.columnar.IndexStorage; import org.apache.carbondata.core.datastore.compression.WriterCompressModel; @@ -59,6 +61,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.core.util.NodeHolder; import org.apache.carbondata.core.util.ValueCompressionUtil; import org.apache.carbondata.processing.datatypes.GenericDataType; import org.apache.carbondata.processing.mdkeygen.file.FileManager; @@ -70,7 +73,6 @@ import org.apache.carbondata.processing.store.colgroup.ColumnDataHolder; import org.apache.carbondata.processing.store.colgroup.DataHolder; import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo; import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter; -import org.apache.carbondata.processing.store.writer.NodeHolder; import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException; import org.apache.carbondata.processing.util.RemoveDictionaryUtil; @@ -257,6 +259,11 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { private int bucketNumber; /** + * current data format version + */ + private ColumnarFormatVersion version; + + /** * CarbonFactDataHandler constructor */ public CarbonFactDataHandlerColumnar(CarbonFactDataHandlerModel carbonFactDataHandlerModel) { @@ -326,6 +333,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { } aggKeyBlock = arrangeUniqueBlockType(aggKeyBlock); } + version = CarbonProperties.getInstance().getFormatVersion(); } private void initParameters(CarbonFactDataHandlerModel carbonFactDataHandlerModel) { @@ -476,7 +484,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { } else if (type[i] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) { max[i] = -Double.MAX_VALUE; } else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) { - max[i] = new BigDecimal(0.0); + max[i] = new BigDecimal(-Double.MAX_VALUE); } else { max[i] = 0.0; } @@ -748,9 +756,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { // TODO remove after kettle flow is removed private NodeHolder getNodeHolderObject(byte[][] dataHolderLocal, byte[][] byteArrayValues, int entryCountLocal, byte[] startkeyLocal, byte[] endKeyLocal, - WriterCompressModel compressionModel, byte[][] noDictionaryData, - byte[] noDictionaryStartKey, byte[] noDictionaryEndKey) - throws CarbonDataWriterException { + WriterCompressModel compressionModel, byte[][] noDictionaryData, byte[] noDictionaryStartKey, + byte[] noDictionaryEndKey) throws CarbonDataWriterException { byte[][][] noDictionaryColumnsData = null; List<ArrayList<byte[]>> colsAndValues = new ArrayList<ArrayList<byte[]>>(); int complexColCount = getComplexColsCount(); @@ -836,9 +843,9 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { if (dimensionType[i]) { dictionaryColumnCount++; if (colGrpModel.isColumnar(dictionaryColumnCount)) { - submit.add(executorService - .submit(new BlockSortThread(i, dataHolders[dictionaryColumnCount].getData(), - true, isUseInvertedIndex[i]))); + submit.add(executorService.submit( + new BlockSortThread(i, dataHolders[dictionaryColumnCount].getData(), true, + isUseInvertedIndex[i]))); } else { submit.add( executorService.submit(new ColGroupBlockStorage(dataHolders[dictionaryColumnCount]))); @@ -876,8 +883,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { private NodeHolder getNodeHolderObjectWithOutKettle(byte[][] dataHolderLocal, byte[][] byteArrayValues, int entryCountLocal, byte[] startkeyLocal, byte[] endKeyLocal, WriterCompressModel compressionModel, byte[][][] noDictionaryData, - byte[][] noDictionaryStartKey, byte[][] noDictionaryEndKey) - throws CarbonDataWriterException { + byte[][] noDictionaryStartKey, byte[][] noDictionaryEndKey) throws CarbonDataWriterException { byte[][][] noDictionaryColumnsData = null; List<ArrayList<byte[]>> colsAndValues = new ArrayList<ArrayList<byte[]>>(); int complexColCount = getComplexColsCount(); @@ -907,7 +913,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { int keyLength = splitKey[j].length; byte[] newKey = new byte[keyLength + 2]; ByteBuffer buffer = ByteBuffer.wrap(newKey); - buffer.putShort((short)keyLength); + buffer.putShort((short) keyLength); System.arraycopy(splitKey[j], 0, newKey, 2, keyLength); noDictionaryColumnsData[j][i] = newKey; } @@ -963,9 +969,9 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { if (dimensionType[i]) { dictionaryColumnCount++; if (colGrpModel.isColumnar(dictionaryColumnCount)) { - submit.add(executorService - .submit(new BlockSortThread(i, dataHolders[dictionaryColumnCount].getData(), - true, isUseInvertedIndex[i]))); + submit.add(executorService.submit( + new BlockSortThread(i, dataHolders[dictionaryColumnCount].getData(), true, + isUseInvertedIndex[i]))); } else { submit.add( executorService.submit(new ColGroupBlockStorage(dataHolders[dictionaryColumnCount]))); @@ -1008,7 +1014,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { endKeyLocal, compressionModel, composedNonDictStartKey, composedNonDictEndKey); } - /** * DataHolder will have all row mdkey data * @@ -1150,6 +1155,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { } return decimalPlaces; } + /** * This method will be used to update the max value for each measure */ @@ -1220,7 +1226,12 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { this.blockletSize = Integer.parseInt(CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.BLOCKLET_SIZE, CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL)); - LOGGER.info("Blocklet Size: " + blockletSize); + if (version == ColumnarFormatVersion.V3) { + this.blockletSize = Integer.parseInt(CarbonProperties.getInstance() + .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE, + CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT)); + } + LOGGER.info("Number of rows per column blocklet " + blockletSize); dataRows = new ArrayList<>(this.blockletSize); int dimSet = Integer.parseInt(CarbonCommonConstants.DIMENSION_SPLIT_VALUE_IN_COLUMNAR_DEFAULTVALUE); @@ -1280,8 +1291,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { .getBlockKeySize()); System.arraycopy(blockKeySize, noOfColStore, keyBlockSize, noOfColStore, blockKeySize.length - noOfColStore); - this.dataWriter = - getFactDataWriter(keyBlockSize); + this.dataWriter = getFactDataWriter(keyBlockSize); this.dataWriter.setIsNoDictionary(isNoDictionary); // initialize the channel; this.dataWriter.initializeWriter(); @@ -1377,7 +1387,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { * @return data writer instance */ private CarbonFactDataWriter<?> getFactDataWriter(int[] keyBlockSize) { - ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion(); return CarbonDataWriterFactory.getInstance() .getFactDataWriter(version, getDataWriterVo(keyBlockSize)); } @@ -1620,8 +1629,13 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { @Override public IndexStorage call() throws Exception { if (isUseInvertedIndex) { - return new BlockIndexerStorageForInt(this.data, isCompressionReq, isNoDictionary, - isSortRequired); + if (version == ColumnarFormatVersion.V3) { + return new BlockIndexerStorageForShort(this.data, isCompressionReq, isNoDictionary, + isSortRequired); + } else { + return new BlockIndexerStorageForInt(this.data, isCompressionReq, isNoDictionary, + isSortRequired); + } } else { return new BlockIndexerStorageForNoInvertedIndex(this.data, isCompressionReq, isNoDictionary);