[CARBONDATA-2618][32K] Split to multiple pages if varchar column page exceeds 2GB/snappy limits
Dynamic column page size decided by long string column A varchar column page uses SafeVarLengthColumnPage/UnsafeVarLengthColumnPage to store data and encoded using HighCardDictDimensionIndexCodec which will call getByteArrayPage() from column page and flatten into byte[] for compression. Limited by the index of array, we can only put number of Integer.MAX_VALUE bytes in a page. Another limitation is from Compressor. Currently we use snappy as default compressor, and it will call MaxCompressedLength method to estimate the result size for preparing output. For safety, the estimate result is oversize: 32 + source_len + source_len/6. So the maximum bytes to compress by snappy is (2GB-32)*6/7â1.71GB. Size of a row does not exceed 2MB since UnsafeSortDataRows uses 2MB byte[] as rowBuffer. Such that we can stop adding more row here if any long string column reach this limit. This closes #2464 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/1c3b8b81 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/1c3b8b81 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/1c3b8b81 Branch: refs/heads/branch-1.4 Commit: 1c3b8b81d4c0439a5856f5839e9db07c2470415a Parents: c18f612 Author: Manhua <kevin...@qq.com> Authored: Mon Jul 9 16:42:08 2018 +0800 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Tue Jul 31 00:11:26 2018 +0530 ---------------------------------------------------------------------- .../datastore/compression/SnappyCompressor.java | 3 ++ .../store/CarbonFactDataHandlerColumnar.java | 53 ++++++++++++++++++-- .../store/CarbonFactDataHandlerModel.java | 44 ++++++++++++++++ 3 files changed, 97 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/1c3b8b81/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java index 65244d2..bd740b2 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java @@ -31,6 +31,9 @@ public class SnappyCompressor implements Compressor { private static final LogService LOGGER = LogServiceFactory.getLogService(SnappyCompressor.class.getName()); + // snappy estimate max compressed length as 32 + source_len + source_len/6 + public static final int MAX_BYTE_TO_COMPRESS = (int)((Integer.MAX_VALUE - 32) / 7.0 * 6); + private final SnappyNative snappyNative; public SnappyCompressor() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/1c3b8b81/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java index 97737d0..94511cf 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java @@ -34,8 +34,10 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; +import org.apache.carbondata.core.datastore.compression.SnappyCompressor; import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.datastore.row.WriteStepRowUtil; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.keygenerator.columnar.impl.MultiDimKeyVarLengthEquiSplitGenerator; import org.apache.carbondata.core.memory.MemoryException; @@ -84,6 +86,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { private ExecutorService consumerExecutorService; private List<Future<Void>> consumerExecutorServiceTaskList; private List<CarbonRow> dataRows; + private int[] varcharColumnSizeInByte; /** * semaphore which will used for managing node holder objects */ @@ -191,7 +194,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { this.entryCount++; // if entry count reaches to leaf node size then we are ready to write // this to leaf node file and update the intermediate files - if (this.entryCount == this.pageSize) { + if (this.entryCount == this.pageSize || isVarcharColumnFull(row)) { try { semaphore.acquire(); @@ -214,6 +217,43 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { } /** + * Check if column page can be added more rows after adding this row to page. + * + * A varchar column page uses SafeVarLengthColumnPage/UnsafeVarLengthColumnPage to store data + * and encoded using HighCardDictDimensionIndexCodec which will call getByteArrayPage() from + * column page and flatten into byte[] for compression. + * Limited by the index of array, we can only put number of Integer.MAX_VALUE bytes in a page. + * + * Another limitation is from Compressor. Currently we use snappy as default compressor, + * and it will call MaxCompressedLength method to estimate the result size for preparing output. + * For safety, the estimate result is oversize: `32 + source_len + source_len/6`. + * So the maximum bytes to compress by snappy is (2GB-32)*6/7â1.71GB. + * + * Size of a row does not exceed 2MB since UnsafeSortDataRows uses 2MB byte[] as rowBuffer. + * Such that we can stop adding more row here if any long string column reach this limit. + * + * If use unsafe column page, please ensure the memory configured is enough. + * @param row + * @return false if any varchar column page cannot add one more value(2MB) + */ + private boolean isVarcharColumnFull(CarbonRow row) { + if (model.getVarcharDimIdxInNoDict().size() > 0) { + byte[][] nonDictArray = WriteStepRowUtil.getNoDictAndComplexDimension(row); + for (int i = 0; i < model.getVarcharDimIdxInNoDict().size(); i++) { + varcharColumnSizeInByte[i] += nonDictArray[model.getVarcharDimIdxInNoDict().get(i)].length; + if (SnappyCompressor.MAX_BYTE_TO_COMPRESS - + (varcharColumnSizeInByte[i] + dataRows.size() * 4) < (2 << 20)) { + LOGGER.info("Limited by varchar column, page size is " + dataRows.size()); + // re-init for next page + varcharColumnSizeInByte = new int[model.getVarcharDimIdxInNoDict().size()]; + return true; + } + } + } + return false; + } + + /** * generate the EncodedTablePage from the input rows (one page in case of V3 format) */ private TablePage processDataRows(List<CarbonRow> dataRows) @@ -342,15 +382,22 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { .getProperty(CarbonCommonConstants.BLOCKLET_SIZE, CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL)); // support less than 32000 rows in one page, because we support super long string, - // if it is long enough, a clomun page with 32000 rows will exceed 2GB + // if it is long enough, a column page with 32000 rows will exceed 2GB if (version == ColumnarFormatVersion.V3) { this.pageSize = pageSize < CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT ? pageSize : CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT; } - LOGGER.info("Number of rows per column blocklet " + pageSize); + LOGGER.info("Number of rows per column page is configured as pageSize = " + pageSize); dataRows = new ArrayList<>(this.pageSize); + + if (model.getVarcharDimIdxInNoDict().size() > 0) { + LOGGER.info("Number of rows per column blocklet is constrained by pageSize and actual size " + + "of long string column(s)"); + varcharColumnSizeInByte = new int[model.getVarcharDimIdxInNoDict().size()]; + } + int dimSet = Integer.parseInt(CarbonCommonConstants.DIMENSION_SPLIT_VALUE_IN_COLUMNAR_DEFAULTVALUE); // if at least one dimension is present then initialize column splitter otherwise null http://git-wip-us.apache.org/repos/asf/carbondata/blob/1c3b8b81/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java index 3e70fc1..a0483fe 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java @@ -34,7 +34,11 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.CarbonMetadata; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.encoder.Encoding; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.util.CarbonProperties; @@ -43,6 +47,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.processing.datamap.DataMapWriterListener; import org.apache.carbondata.processing.datatypes.GenericDataType; import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration; +import org.apache.carbondata.processing.loading.DataField; import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; import org.apache.carbondata.processing.loading.sort.SortScopeOptions; @@ -172,6 +177,8 @@ public class CarbonFactDataHandlerModel { private int numberOfCores; + private List<Integer> varcharDimIdxInNoDict; + /** * Create the model using @{@link CarbonDataLoadConfiguration} */ @@ -214,6 +221,19 @@ public class CarbonFactDataHandlerModel { for (int i = 0; i < simpleDimsCount; i++) { simpleDimsLen[i] = dimLens[i]; } + + // for dynamic page size in write step if varchar columns exist + List<Integer> varcharDimIdxInNoDict = new ArrayList<>(); + int dictDimCount = configuration.getDimensionCount() - configuration.getNoDictionaryCount(); + for (DataField dataField : configuration.getDataFields()) { + CarbonColumn column = dataField.getColumn(); + if (!column.isComplex() && !dataField.hasDictionaryEncoding() && + column.getDataType() == DataTypes.VARCHAR) { + // ordinal is set in CarbonTable.fillDimensionsAndMeasuresForTables() + varcharDimIdxInNoDict.add(column.getOrdinal() - dictDimCount); + } + } + //To Set MDKey Index of each primitive type in complex type int surrIndex = simpleDimsCount; Iterator<Map.Entry<String, GenericDataType>> complexMap = @@ -280,6 +300,7 @@ public class CarbonFactDataHandlerModel { carbonFactDataHandlerModel.dataMapWriterlistener = listener; carbonFactDataHandlerModel.writingCoresCount = configuration.getWritingCoresCount(); setNumberOfCores(carbonFactDataHandlerModel); + carbonFactDataHandlerModel.setVarcharDimIdxInNoDict(varcharDimIdxInNoDict); return carbonFactDataHandlerModel; } @@ -292,6 +313,19 @@ public class CarbonFactDataHandlerModel { public static CarbonFactDataHandlerModel getCarbonFactDataHandlerModel(CarbonLoadModel loadModel, CarbonTable carbonTable, SegmentProperties segmentProperties, String tableName, String[] tempStoreLocation, String carbonDataDirectoryPath) { + + // for dynamic page size in write step if varchar columns exist + List<Integer> varcharDimIdxInNoDict = new ArrayList<>(); + List<CarbonDimension> allDimensions = carbonTable.getDimensions(); + int dictDimCount = allDimensions.size() - segmentProperties.getNumberOfNoDictionaryDimension(); + for (CarbonDimension dim : allDimensions) { + if (!dim.isComplex() && !dim.hasEncoding(Encoding.DICTIONARY) && + dim.getDataType() == DataTypes.VARCHAR) { + // ordinal is set in CarbonTable.fillDimensionsAndMeasuresForTables() + varcharDimIdxInNoDict.add(dim.getOrdinal() - dictDimCount); + } + } + CarbonFactDataHandlerModel carbonFactDataHandlerModel = new CarbonFactDataHandlerModel(); carbonFactDataHandlerModel.setSchemaUpdatedTimeStamp(carbonTable.getTableLastUpdatedTime()); carbonFactDataHandlerModel.setDatabaseName(loadModel.getDatabaseName()); @@ -344,6 +378,7 @@ public class CarbonFactDataHandlerModel { setNumberOfCores(carbonFactDataHandlerModel); carbonFactDataHandlerModel .setColumnLocalDictGenMap(CarbonUtil.getLocalDictionaryModel(carbonTable)); + carbonFactDataHandlerModel.setVarcharDimIdxInNoDict(varcharDimIdxInNoDict); return carbonFactDataHandlerModel; } @@ -653,5 +688,14 @@ public class CarbonFactDataHandlerModel { public int getNumberOfCores() { return numberOfCores; } + + public void setVarcharDimIdxInNoDict(List<Integer> varcharDimIdxInNoDict) { + this.varcharDimIdxInNoDict = varcharDimIdxInNoDict; + } + + public List<Integer> getVarcharDimIdxInNoDict() { + return varcharDimIdxInNoDict; + } + }