[CARBONDATA-2618][32K] Split to multiple pages if varchar column page exceeds 
2GB/snappy limits

Dynamic column page size decided by long string column

A varchar column page uses SafeVarLengthColumnPage/UnsafeVarLengthColumnPage to 
store data
and encoded using HighCardDictDimensionIndexCodec which will call 
getByteArrayPage() from
column page and flatten into byte[] for compression.
Limited by the index of array, we can only put number of Integer.MAX_VALUE 
bytes in a page.

Another limitation is from Compressor. Currently we use snappy as default 
compressor,
and it will call MaxCompressedLength method to estimate the result size for 
preparing output.
For safety, the estimate result is oversize: 32 + source_len + source_len/6.
So the maximum bytes to compress by snappy is (2GB-32)*6/7≈1.71GB.

Size of a row does not exceed 2MB since UnsafeSortDataRows uses 2MB byte[] as 
rowBuffer.
Such that we can stop adding more row here if any long string column reach this 
limit.

This closes #2464


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/1c3b8b81
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/1c3b8b81
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/1c3b8b81

Branch: refs/heads/branch-1.4
Commit: 1c3b8b81d4c0439a5856f5839e9db07c2470415a
Parents: c18f612
Author: Manhua <kevin...@qq.com>
Authored: Mon Jul 9 16:42:08 2018 +0800
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Tue Jul 31 00:11:26 2018 +0530

----------------------------------------------------------------------
 .../datastore/compression/SnappyCompressor.java |  3 ++
 .../store/CarbonFactDataHandlerColumnar.java    | 53 ++++++++++++++++++--
 .../store/CarbonFactDataHandlerModel.java       | 44 ++++++++++++++++
 3 files changed, 97 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/1c3b8b81/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
index 65244d2..bd740b2 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/SnappyCompressor.java
@@ -31,6 +31,9 @@ public class SnappyCompressor implements Compressor {
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(SnappyCompressor.class.getName());
 
+  // snappy estimate max compressed length as 32 + source_len + source_len/6
+  public static final int MAX_BYTE_TO_COMPRESS = (int)((Integer.MAX_VALUE - 
32) / 7.0 * 6);
+
   private final SnappyNative snappyNative;
 
   public SnappyCompressor() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1c3b8b81/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 97737d0..94511cf 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -34,8 +34,10 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
+import org.apache.carbondata.core.datastore.compression.SnappyCompressor;
 import 
org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import 
org.apache.carbondata.core.keygenerator.columnar.impl.MultiDimKeyVarLengthEquiSplitGenerator;
 import org.apache.carbondata.core.memory.MemoryException;
@@ -84,6 +86,7 @@ public class CarbonFactDataHandlerColumnar implements 
CarbonFactHandler {
   private ExecutorService consumerExecutorService;
   private List<Future<Void>> consumerExecutorServiceTaskList;
   private List<CarbonRow> dataRows;
+  private int[] varcharColumnSizeInByte;
   /**
    * semaphore which will used for managing node holder objects
    */
@@ -191,7 +194,7 @@ public class CarbonFactDataHandlerColumnar implements 
CarbonFactHandler {
     this.entryCount++;
     // if entry count reaches to leaf node size then we are ready to write
     // this to leaf node file and update the intermediate files
-    if (this.entryCount == this.pageSize) {
+    if (this.entryCount == this.pageSize || isVarcharColumnFull(row)) {
       try {
         semaphore.acquire();
 
@@ -214,6 +217,43 @@ public class CarbonFactDataHandlerColumnar implements 
CarbonFactHandler {
   }
 
   /**
+   * Check if column page can be added more rows after adding this row to page.
+   *
+   * A varchar column page uses 
SafeVarLengthColumnPage/UnsafeVarLengthColumnPage to store data
+   * and encoded using HighCardDictDimensionIndexCodec which will call 
getByteArrayPage() from
+   * column page and flatten into byte[] for compression.
+   * Limited by the index of array, we can only put number of 
Integer.MAX_VALUE bytes in a page.
+   *
+   * Another limitation is from Compressor. Currently we use snappy as default 
compressor,
+   * and it will call MaxCompressedLength method to estimate the result size 
for preparing output.
+   * For safety, the estimate result is oversize: `32 + source_len + 
source_len/6`.
+   * So the maximum bytes to compress by snappy is (2GB-32)*6/7≈1.71GB.
+   *
+   * Size of a row does not exceed 2MB since UnsafeSortDataRows uses 2MB 
byte[] as rowBuffer.
+   * Such that we can stop adding more row here if any long string column 
reach this limit.
+   *
+   * If use unsafe column page, please ensure the memory configured is enough.
+   * @param row
+   * @return false if any varchar column page cannot add one more value(2MB)
+   */
+  private boolean isVarcharColumnFull(CarbonRow row) {
+    if (model.getVarcharDimIdxInNoDict().size() > 0) {
+      byte[][] nonDictArray = 
WriteStepRowUtil.getNoDictAndComplexDimension(row);
+      for (int i = 0; i < model.getVarcharDimIdxInNoDict().size(); i++) {
+        varcharColumnSizeInByte[i] += 
nonDictArray[model.getVarcharDimIdxInNoDict().get(i)].length;
+        if (SnappyCompressor.MAX_BYTE_TO_COMPRESS -
+                (varcharColumnSizeInByte[i] + dataRows.size() * 4) < (2 << 
20)) {
+          LOGGER.info("Limited by varchar column, page size is " + 
dataRows.size());
+          // re-init for next page
+          varcharColumnSizeInByte = new 
int[model.getVarcharDimIdxInNoDict().size()];
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  /**
    * generate the EncodedTablePage from the input rows (one page in case of V3 
format)
    */
   private TablePage processDataRows(List<CarbonRow> dataRows)
@@ -342,15 +382,22 @@ public class CarbonFactDataHandlerColumnar implements 
CarbonFactHandler {
         .getProperty(CarbonCommonConstants.BLOCKLET_SIZE,
             CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL));
     // support less than 32000 rows in one page, because we support super long 
string,
-    // if it is long enough, a clomun page with 32000 rows will exceed 2GB
+    // if it is long enough, a column page with 32000 rows will exceed 2GB
     if (version == ColumnarFormatVersion.V3) {
       this.pageSize =
           pageSize < 
CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT ?
               pageSize :
               
CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT;
     }
-    LOGGER.info("Number of rows per column blocklet " + pageSize);
+    LOGGER.info("Number of rows per column page is configured as pageSize = " 
+ pageSize);
     dataRows = new ArrayList<>(this.pageSize);
+
+    if (model.getVarcharDimIdxInNoDict().size() > 0) {
+      LOGGER.info("Number of rows per column blocklet is constrained by 
pageSize and actual size " +
+              "of long string column(s)");
+      varcharColumnSizeInByte = new 
int[model.getVarcharDimIdxInNoDict().size()];
+    }
+
     int dimSet =
         
Integer.parseInt(CarbonCommonConstants.DIMENSION_SPLIT_VALUE_IN_COLUMNAR_DEFAULTVALUE);
     // if at least one dimension is present then initialize column splitter 
otherwise null

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1c3b8b81/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 3e70fc1..a0483fe 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -34,7 +34,11 @@ import 
org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.CarbonProperties;
@@ -43,6 +47,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.datamap.DataMapWriterListener;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.DataField;
 import 
org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.loading.sort.SortScopeOptions;
@@ -172,6 +177,8 @@ public class CarbonFactDataHandlerModel {
 
   private int numberOfCores;
 
+  private List<Integer> varcharDimIdxInNoDict;
+
   /**
    * Create the model using @{@link CarbonDataLoadConfiguration}
    */
@@ -214,6 +221,19 @@ public class CarbonFactDataHandlerModel {
     for (int i = 0; i < simpleDimsCount; i++) {
       simpleDimsLen[i] = dimLens[i];
     }
+
+    // for dynamic page size in write step if varchar columns exist
+    List<Integer> varcharDimIdxInNoDict = new ArrayList<>();
+    int dictDimCount = configuration.getDimensionCount() - 
configuration.getNoDictionaryCount();
+    for (DataField dataField : configuration.getDataFields()) {
+      CarbonColumn column = dataField.getColumn();
+      if (!column.isComplex() && !dataField.hasDictionaryEncoding() &&
+              column.getDataType() == DataTypes.VARCHAR) {
+        // ordinal is set in CarbonTable.fillDimensionsAndMeasuresForTables()
+        varcharDimIdxInNoDict.add(column.getOrdinal() - dictDimCount);
+      }
+    }
+
     //To Set MDKey Index of each primitive type in complex type
     int surrIndex = simpleDimsCount;
     Iterator<Map.Entry<String, GenericDataType>> complexMap =
@@ -280,6 +300,7 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.dataMapWriterlistener = listener;
     carbonFactDataHandlerModel.writingCoresCount = 
configuration.getWritingCoresCount();
     setNumberOfCores(carbonFactDataHandlerModel);
+    carbonFactDataHandlerModel.setVarcharDimIdxInNoDict(varcharDimIdxInNoDict);
     return carbonFactDataHandlerModel;
   }
 
@@ -292,6 +313,19 @@ public class CarbonFactDataHandlerModel {
   public static CarbonFactDataHandlerModel 
getCarbonFactDataHandlerModel(CarbonLoadModel loadModel,
       CarbonTable carbonTable, SegmentProperties segmentProperties, String 
tableName,
       String[] tempStoreLocation, String carbonDataDirectoryPath) {
+
+    // for dynamic page size in write step if varchar columns exist
+    List<Integer> varcharDimIdxInNoDict = new ArrayList<>();
+    List<CarbonDimension> allDimensions = carbonTable.getDimensions();
+    int dictDimCount = allDimensions.size() - 
segmentProperties.getNumberOfNoDictionaryDimension();
+    for (CarbonDimension dim : allDimensions) {
+      if (!dim.isComplex() && !dim.hasEncoding(Encoding.DICTIONARY) &&
+          dim.getDataType() == DataTypes.VARCHAR) {
+        // ordinal is set in CarbonTable.fillDimensionsAndMeasuresForTables()
+        varcharDimIdxInNoDict.add(dim.getOrdinal() - dictDimCount);
+      }
+    }
+
     CarbonFactDataHandlerModel carbonFactDataHandlerModel = new 
CarbonFactDataHandlerModel();
     
carbonFactDataHandlerModel.setSchemaUpdatedTimeStamp(carbonTable.getTableLastUpdatedTime());
     carbonFactDataHandlerModel.setDatabaseName(loadModel.getDatabaseName());
@@ -344,6 +378,7 @@ public class CarbonFactDataHandlerModel {
     setNumberOfCores(carbonFactDataHandlerModel);
     carbonFactDataHandlerModel
         
.setColumnLocalDictGenMap(CarbonUtil.getLocalDictionaryModel(carbonTable));
+    carbonFactDataHandlerModel.setVarcharDimIdxInNoDict(varcharDimIdxInNoDict);
     return carbonFactDataHandlerModel;
   }
 
@@ -653,5 +688,14 @@ public class CarbonFactDataHandlerModel {
   public int getNumberOfCores() {
     return numberOfCores;
   }
+
+  public void setVarcharDimIdxInNoDict(List<Integer> varcharDimIdxInNoDict) {
+    this.varcharDimIdxInNoDict = varcharDimIdxInNoDict;
+  }
+
+  public List<Integer> getVarcharDimIdxInNoDict() {
+    return varcharDimIdxInNoDict;
+  }
+
 }
 

Reply via email to