Added support for offheap storage in query

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/8d9babe3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/8d9babe3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/8d9babe3

Branch: refs/heads/master
Commit: 8d9babe3cb783b21791c577d1b22cc331e5ce967
Parents: 49727a2
Author: kumarvishal <kumarvishal.1...@gmail.com>
Authored: Wed Dec 14 11:33:41 2016 +0800
Committer: kumarvishal <kumarvishal.1...@gmail.com>
Committed: Sun Jan 8 21:45:26 2017 +0800

----------------------------------------------------------------------
 .../chunk/DimensionChunkAttributes.java         | 102 -----
 .../chunk/DimensionColumnDataChunk.java         |  50 ++-
 .../datastore/chunk/MeasureColumnDataChunk.java |   3 +
 .../chunk/impl/AbstractDimensionDataChunk.java  |  90 +++++
 .../impl/ColumnGroupDimensionDataChunk.java     | 186 ++++-----
 .../impl/FixedLengthDimensionDataChunk.java     | 149 +++----
 .../impl/VariableLengthDimensionDataChunk.java  | 113 +++---
 .../reader/dimension/AbstractChunkReader.java   |  43 +-
 ...mpressedDimensionChunkFileBasedReaderV1.java |  28 +-
 ...mpressedDimensionChunkFileBasedReaderV2.java |  98 ++---
 ...CompressedMeasureChunkFileBasedReaderV1.java |  15 +-
 ...CompressedMeasureChunkFileBasedReaderV2.java |  74 ++--
 .../chunk/store/DimensionChunkStoreFactory.java |  88 ++++
 .../chunk/store/DimensionDataChunkStore.java    |  95 +++++
 .../chunk/store/MeasureChunkStoreFactory.java   |  97 +++++
 .../chunk/store/MeasureDataChunkStore.java      |  86 ++++
 .../SafeAbsractDimensionDataChunkStore.java     | 126 ++++++
 .../safe/SafeAbstractMeasureDataChunkStore.java | 114 ++++++
 .../impl/safe/SafeByteMeasureChunkStore.java    |  55 +++
 .../impl/safe/SafeDoubleMeasureChunkStore.java  |  54 +++
 .../SafeFixedLengthDimensionDataChunkStore.java | 114 ++++++
 .../impl/safe/SafeIntMeasureChunkStore.java     |  54 +++
 .../impl/safe/SafeLongMeasureChunkStore.java    |  55 +++
 .../impl/safe/SafeShortMeasureChunkStore.java   |  56 +++
 ...feVariableLengthDimensionDataChunkStore.java | 139 +++++++
 .../UnsafeAbstractDimensionDataChunkStore.java  | 173 ++++++++
 .../UnsafeAbstractMeasureDataChunkStore.java    | 128 ++++++
 .../unsafe/UnsafeByteMeasureChunkStore.java     |  58 +++
 .../unsafe/UnsafeDoubleMeasureChunkStore.java   |  60 +++
 ...nsafeFixedLengthDimensionDataChunkStore.java | 147 +++++++
 .../impl/unsafe/UnsafeIntMeasureChunkStore.java |  60 +++
 .../unsafe/UnsafeLongMeasureChunkStore.java     |  59 +++
 .../unsafe/UnsafeShortMeasureChunkStore.java    |  59 +++
 ...afeVariableLengthDimesionDataChunkStore.java | 212 ++++++++++
 .../core/constants/CarbonCommonConstants.java   |  21 +
 .../datastorage/store/NodeMeasureDataStore.java |   1 -
 .../store/compression/Compressor.java           |  12 +
 .../store/compression/CompressorFactory.java    |  30 +-
 .../store/compression/SnappyCompressor.java     | 117 ++++--
 .../compression/ValueCompressonHolder.java      |  39 +-
 .../decimal/UnCompressByteArray.java            | 137 -------
 .../decimal/UnCompressMaxMinByte.java           |  78 ++--
 .../decimal/UnCompressMaxMinDefault.java        |  53 ++-
 .../decimal/UnCompressMaxMinFloat.java          | 102 -----
 .../decimal/UnCompressMaxMinInt.java            |  66 +--
 .../decimal/UnCompressMaxMinLong.java           |  63 +--
 .../decimal/UnCompressMaxMinShort.java          |  65 +--
 .../nondecimal/UnCompressNonDecimalByte.java    |  47 ++-
 .../nondecimal/UnCompressNonDecimalDefault.java |  43 +-
 .../nondecimal/UnCompressNonDecimalFloat.java   | 100 -----
 .../nondecimal/UnCompressNonDecimalInt.java     |  45 ++-
 .../nondecimal/UnCompressNonDecimalLong.java    |  42 +-
 .../UnCompressNonDecimalMaxMinByte.java         |  66 +--
 .../UnCompressNonDecimalMaxMinDefault.java      |  56 ++-
 .../UnCompressNonDecimalMaxMinFloat.java        | 110 -----
 .../UnCompressNonDecimalMaxMinInt.java          |  58 ++-
 .../UnCompressNonDecimalMaxMinLong.java         |  56 ++-
 .../UnCompressNonDecimalMaxMinShort.java        |  59 ++-
 .../nondecimal/UnCompressNonDecimalShort.java   |  43 +-
 .../compression/none/UnCompressNoneByte.java    |  53 ++-
 .../compression/none/UnCompressNoneDefault.java |  37 +-
 .../compression/none/UnCompressNoneFloat.java   | 105 -----
 .../compression/none/UnCompressNoneInt.java     |  67 ++--
 .../compression/none/UnCompressNoneLong.java    |  51 ++-
 .../compression/none/UnCompressNoneShort.java   |  52 ++-
 .../compression/type/UnCompressBigDecimal.java  |  57 +--
 .../type/UnCompressBigDecimalByte.java          | 115 +++---
 .../store/dataholder/CarbonReadDataHolder.java  |  62 +--
 ...yCompressedDoubleArrayDataInMemoryStore.java |   1 -
 .../keygenerator/mdkey/NumberCompressor.java    |  10 +-
 .../core/memory/MemoryAllocatorFactory.java     |  45 +++
 .../carbondata/core/unsafe/CarbonUnsafe.java    |  12 +
 .../core/util/CarbonMetadataUtil.java           |   2 +-
 .../apache/carbondata/core/util/CarbonUtil.java | 136 ++++---
 .../core/util/ValueCompressionUtil.java         |  15 -
 .../impl/AbstractScannedResultCollector.java    |   8 +-
 .../scan/complextypes/ComplexQueryType.java     |  14 +-
 .../scan/complextypes/PrimitiveQueryType.java   |   2 +-
 .../executer/ExcludeFilterExecuterImpl.java     |  64 +--
 .../executer/IncludeFilterExecuterImpl.java     |  58 +--
 .../executer/RowLevelFilterExecuterImpl.java    |  46 +--
 .../RowLevelRangeGrtThanFiterExecuterImpl.java  |   9 +-
 ...elRangeGrtrThanEquaToFilterExecuterImpl.java |  10 +-
 ...velRangeLessThanEqualFilterExecuterImpl.java |   9 +-
 .../RowLevelRangeLessThanFiterExecuterImpl.java |   9 +-
 .../scan/result/AbstractScannedResult.java      |  10 +-
 .../scan/scanner/impl/FilterScanner.java        |   7 +-
 .../impl/CompressedDataMeasureWrapperTest.java  |   2 +-
 .../impl/ColumnGroupDimensionDataChunkTest.java | 106 +++--
 .../impl/FixedLengthDimensionDataChunkTest.java |  17 +-
 ...ressedDimensionChunkFileBasedReaderTest.java | 239 +++++------
 ...mpressedMeasureChunkFileBasedReaderTest.java |   5 +-
 .../DriverQueryStatisticsRecorderImplTest.java  |   1 -
 .../mdkey/NumberCompressorUnitTest.java         |  11 +-
 .../carbondata/core/util/CarbonUtilTest.java    | 162 +++-----
 .../core/util/ValueCompressionUtilTest.java     |  57 +--
 .../DictionaryBasedResultCollectorTest.java     | 304 ++++++--------
 .../impl/RawBasedResultCollectorTest.java       | 216 +++++-----
 .../impl/FilterQueryScannedResultTest.java      | 401 +++++++++----------
 .../impl/NonFilterQueryScannedResultTest.java   | 397 +++++++++---------
 .../scanner/impl/FilterScannerTest.java         | 284 +++++++------
 .../sortdata/CompressedTempSortFileWriter.java  |   3 +-
 .../store/writer/AbstractFactDataWriter.java    |  10 +-
 103 files changed, 4712 insertions(+), 3118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/DimensionChunkAttributes.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/DimensionChunkAttributes.java
 
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/DimensionChunkAttributes.java
deleted file mode 100644
index 4dcf083..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/DimensionChunkAttributes.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.core.carbon.datastore.chunk;
-
-/**
- * Dimension chunk attributes which holds all the
- * property about the dimension chunk data
- */
-public class DimensionChunkAttributes {
-
-  /**
-   * inverted index of the data
-   */
-  private int[] invertedIndexes;
-
-  /**
-   * reverse index of the data
-   */
-  private int[] invertedIndexesReverse;
-
-  /**
-   * each row size
-   */
-  private int columnValueSize;
-
-  /**
-   * is no dictionary
-   */
-  private boolean isNoDictionary;
-
-  /**
-   * @return the invertedIndexes
-   */
-  public int[] getInvertedIndexes() {
-    return invertedIndexes;
-  }
-
-  /**
-   * @param invertedIndexes the invertedIndexes to set
-   */
-  public void setInvertedIndexes(int[] invertedIndexes) {
-    this.invertedIndexes = invertedIndexes;
-  }
-
-  /**
-   * @return the invertedIndexesReverse
-   */
-  public int[] getInvertedIndexesReverse() {
-    return invertedIndexesReverse;
-  }
-
-  /**
-   * @param invertedIndexesReverse the invertedIndexesReverse to set
-   */
-  public void setInvertedIndexesReverse(int[] invertedIndexesReverse) {
-    this.invertedIndexesReverse = invertedIndexesReverse;
-  }
-
-  /**
-   * @return the eachRowSize
-   */
-  public int getColumnValueSize() {
-    return columnValueSize;
-  }
-
-  /**
-   * @param eachRowSize the eachRowSize to set
-   */
-  public void setEachRowSize(int eachRowSize) {
-    this.columnValueSize = eachRowSize;
-  }
-
-  /**
-   * @return the isNoDictionary
-   */
-  public boolean isNoDictionary() {
-    return isNoDictionary;
-  }
-
-  /**
-   * @param isNoDictionary the isNoDictionary to set
-   */
-  public void setNoDictionary(boolean isNoDictionary) {
-    this.isNoDictionary = isNoDictionary;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/DimensionColumnDataChunk.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/DimensionColumnDataChunk.java
 
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/DimensionColumnDataChunk.java
index 5bf200d..efaa48b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/DimensionColumnDataChunk.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/DimensionColumnDataChunk.java
@@ -24,7 +24,7 @@ import 
org.apache.carbondata.scan.result.vector.ColumnVectorInfo;
 /**
  * Interface for dimension column chunk.
  */
-public interface DimensionColumnDataChunk<T> {
+public interface DimensionColumnDataChunk {
 
   /**
    * Below method will be used to fill the data based on offset and row id
@@ -37,22 +37,32 @@ public interface DimensionColumnDataChunk<T> {
 
   /**
    * It uses to convert column data to dictionary integer value
+   *
    * @param rowId
    * @param columnIndex
    * @param row
-   * @param restructuringInfo  @return
+   * @param restructuringInfo @return
    */
   int fillConvertedChunkData(int rowId, int columnIndex, int[] row,
       KeyStructureInfo restructuringInfo);
 
   /**
    * Fill the data to vector
+   * @param vectorInfo
+   * @param column
+   * @param restructuringInfo
+   * @return next column index
    */
   int fillConvertedChunkData(ColumnVectorInfo[] vectorInfo, int column,
       KeyStructureInfo restructuringInfo);
 
   /**
    * Fill the data to vector
+   * @param rowMapping
+   * @param vectorInfo
+   * @param column
+   * @param restructuringInfo
+   * @return next column index
    */
   int fillConvertedChunkData(int[] rowMapping, ColumnVectorInfo[] vectorInfo, 
int column,
       KeyStructureInfo restructuringInfo);
@@ -65,17 +75,37 @@ public interface DimensionColumnDataChunk<T> {
   byte[] getChunkData(int columnIndex);
 
   /**
-   * Below method will be used get the chunk attributes
-   *
-   * @return chunk attributes
+   * @return inverted index
+   */
+  int getInvertedIndex(int index);
+
+  /**
+   * @return whether column is dictionary column or not
+   */
+  boolean isNoDicitionaryColumn();
+
+  /**
+   * @return length of each column
    */
-  DimensionChunkAttributes getAttributes();
+  int getColumnValueSize();
 
   /**
-   * Below method will be used to return the complete data chunk
-   * This will be required during filter query
+   * @return whether columns where explictly sorted or not
+   */
+  boolean isExplicitSorted();
+
+  /**
+   * to compare the data
    *
-   * @return complete chunk
+   * @param index        row index to be compared
+   * @param compareValue value to compare
+   * @return compare result
    */
-  T getCompleteDataChunk();
+  int compareTo(int index, byte[] compareValue);
+
+  /**
+   * below method will be used to free the allocated memory
+   */
+  void freeMemory();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/MeasureColumnDataChunk.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/MeasureColumnDataChunk.java
 
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/MeasureColumnDataChunk.java
index fbe6e95..e4e0d6b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/MeasureColumnDataChunk.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/MeasureColumnDataChunk.java
@@ -68,4 +68,7 @@ public class MeasureColumnDataChunk {
     this.nullValueIndexHolder = nullValueIndexHolder;
   }
 
+  public void freeMemory() {
+    this.measureDataHolder.freeMemory();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/AbstractDimensionDataChunk.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/AbstractDimensionDataChunk.java
 
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/AbstractDimensionDataChunk.java
new file mode 100644
index 0000000..8f6c284
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/AbstractDimensionDataChunk.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.carbon.datastore.chunk.impl;
+
+import 
org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.DimensionDataChunkStore;
+
+/**
+ * Class responsibility is to give access to dimension column data chunk store
+ */
+public abstract class AbstractDimensionDataChunk implements 
DimensionColumnDataChunk {
+
+  /**
+   * data chunks
+   */
+  protected DimensionDataChunkStore dataChunkStore;
+
+  /**
+   * @return whether columns where explicitly sorted or not
+   */
+  @Override public boolean isExplicitSorted() {
+    return dataChunkStore.isExplicitSorted();
+  }
+
+  /**
+   * Below method to get the data based in row id
+   *
+   * @param index row id of the data
+   * @return chunk
+   */
+  @Override public byte[] getChunkData(int index) {
+    return dataChunkStore.getRow(index);
+  }
+
+  /**
+   * @return inverted index
+   */
+  @Override public int getInvertedIndex(int index) {
+    return dataChunkStore.getInvertedIndex(index);
+  }
+
+  /**
+   * @return length of each column
+   */
+  @Override public int getColumnValueSize() {
+    return dataChunkStore.getColumnValueSize();
+  }
+
+  /**
+   * To compare the data
+   *
+   * @param index        row index to be compared
+   * @param compareValue value to compare
+   * @return compare result
+   */
+  @Override public int compareTo(int index, byte[] compareValue) {
+    // TODO Auto-generated method stub
+    return dataChunkStore.compareTo(index, compareValue);
+  }
+
+  /**
+   * below method will be used to free the allocated memory
+   */
+  @Override public void freeMemory() {
+    dataChunkStore.freeMemory();
+  }
+
+  /**
+   * @return column is dictionary column or not
+   */
+  @Override public boolean isNoDicitionaryColumn() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/ColumnGroupDimensionDataChunk.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/ColumnGroupDimensionDataChunk.java
 
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/ColumnGroupDimensionDataChunk.java
index ab467f2..7d3f5cf 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/ColumnGroupDimensionDataChunk.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/ColumnGroupDimensionDataChunk.java
@@ -18,26 +18,15 @@
  */
 package org.apache.carbondata.core.carbon.datastore.chunk.impl;
 
-import 
org.apache.carbondata.core.carbon.datastore.chunk.DimensionChunkAttributes;
-import 
org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.DimensionChunkStoreFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.DimensionChunkStoreFactory.DimensionStoreType;
 import org.apache.carbondata.scan.executor.infos.KeyStructureInfo;
 import org.apache.carbondata.scan.result.vector.ColumnVectorInfo;
 
 /**
- * This class is holder of the dimension column chunk data of the fixed length
- * key size
+ * This class is gives access to column group dimension data chunk store
  */
-public class ColumnGroupDimensionDataChunk implements 
DimensionColumnDataChunk<byte[]> {
-
-  /**
-   * dimension chunk attributes
-   */
-  private DimensionChunkAttributes chunkAttributes;
-
-  /**
-   * data chunks
-   */
-  private byte[] dataChunk;
+public class ColumnGroupDimensionDataChunk extends AbstractDimensionDataChunk {
 
   /**
    * Constructor for this class
@@ -45,56 +34,110 @@ public class ColumnGroupDimensionDataChunk implements 
DimensionColumnDataChunk<b
    * @param dataChunk       data chunk
    * @param chunkAttributes chunk attributes
    */
-  public ColumnGroupDimensionDataChunk(byte[] dataChunk, 
DimensionChunkAttributes chunkAttributes) {
-    this.chunkAttributes = chunkAttributes;
-    this.dataChunk = dataChunk;
+  public ColumnGroupDimensionDataChunk(byte[] dataChunk, int columnValueSize, 
int numberOfRows) {
+    this.dataChunkStore = DimensionChunkStoreFactory.INSTANCE
+        .getDimensionChunkStore(columnValueSize, false, numberOfRows, 
dataChunk.length,
+        DimensionStoreType.FIXEDLENGTH);
+    this.dataChunkStore.putArray(null, null, dataChunk);
   }
 
   /**
    * Below method will be used to fill the data based on offset and row id
    *
-   * @param data             data to filed
-   * @param offset           offset from which data need to be filed
-   * @param rowId            row id of the chunk
+   * @param data              data to filed
+   * @param offset            offset from which data need to be filed
+   * @param rowId             row id of the chunk
    * @param restructuringInfo define the structure of the key
    * @return how many bytes was copied
    */
-  @Override
-  public int fillChunkData(byte[] data, int offset, int rowId,
+  @Override public int fillChunkData(byte[] data, int offset, int rowId,
       KeyStructureInfo restructuringInfo) {
-    byte[] maskedKey =
-        getMaskedKey(dataChunk, rowId * chunkAttributes.getColumnValueSize(), 
restructuringInfo);
+    byte[] row = dataChunkStore.getRow(rowId);
+    byte[] maskedKey = getMaskedKey(row, restructuringInfo);
     System.arraycopy(maskedKey, 0, data, offset, maskedKey.length);
     return maskedKey.length;
   }
 
   /**
    * Converts to column dictionary integer value
+   *
+   * @param rowId
+   * @param columnIndex
+   * @param row
+   * @param restructuringInfo @return
    */
-  @Override
-  public int fillConvertedChunkData(int rowId, int columnIndex, int[] row,
+  @Override public int fillConvertedChunkData(int rowId, int columnIndex, 
int[] row,
       KeyStructureInfo info) {
-    int start = rowId * chunkAttributes.getColumnValueSize();
-    long[] keyArray = info.getKeyGenerator().getKeyArray(dataChunk, start);
+    byte[] data = dataChunkStore.getRow(rowId);
+    long[] keyArray = info.getKeyGenerator().getKeyArray(data);
     int[] ordinal = info.getMdkeyQueryDimensionOrdinal();
     for (int i = 0; i < ordinal.length; i++) {
-      row[columnIndex++] = (int)keyArray[ordinal[i]];
+      row[columnIndex++] = (int) keyArray[ordinal[i]];
     }
     return columnIndex;
   }
 
-  @Override
-  public int fillConvertedChunkData(ColumnVectorInfo[] vectorInfo, int column,
-      KeyStructureInfo info) {
+  /**
+   * Below method will be used to get the masked key
+   *
+   * @param data   data
+   * @param offset offset of
+   * @param info
+   * @return
+   */
+  private byte[] getMaskedKey(byte[] data, KeyStructureInfo info) {
+    byte[] maskedKey = new byte[info.getMaskByteRanges().length];
+    int counter = 0;
+    int byteRange = 0;
+    for (int i = 0; i < info.getMaskByteRanges().length; i++) {
+      byteRange = info.getMaskByteRanges()[i];
+      maskedKey[counter++] = (byte) (data[byteRange] & 
info.getMaxKey()[byteRange]);
+    }
+    return maskedKey;
+  }
+
+  /**
+   * @return inverted index
+   */
+  @Override public int getInvertedIndex(int index) {
+    throw new UnsupportedOperationException("Operation not supported in case 
of cloumn group");
+  }
+
+  /**
+   * @return whether columns where explictly sorted or not
+   */
+  @Override public boolean isExplicitSorted() {
+    return false;
+  }
+
+  /**
+   * to compare the data
+   *
+   * @param index        row index to be compared
+   * @param compareValue value to compare
+   * @return compare result
+   */
+  @Override public int compareTo(int index, byte[] compareValue) {
+    throw new UnsupportedOperationException("Operation not supported in case 
of cloumn group");
+  }
+
+  /**
+   * Fill the data to vector
+   *
+   * @param vectorInfo
+   * @param column
+   * @param restructuringInfo
+   * @return next column index
+   */
+  @Override public int fillConvertedChunkData(ColumnVectorInfo[] vectorInfo, 
int column,
+      KeyStructureInfo restructuringInfo) {
     ColumnVectorInfo columnVectorInfo = vectorInfo[column];
     int offset = columnVectorInfo.offset;
     int vectorOffset = columnVectorInfo.vectorOffset;
     int len = offset + columnVectorInfo.size;
-    int columnValueSize = chunkAttributes.getColumnValueSize();
-    int[] ordinal = info.getMdkeyQueryDimensionOrdinal();
+    int[] ordinal = restructuringInfo.getMdkeyQueryDimensionOrdinal();
     for (int k = offset; k < len; k++) {
-      int start = k * columnValueSize;
-      long[] keyArray = info.getKeyGenerator().getKeyArray(dataChunk, start);
+      long[] keyArray = 
restructuringInfo.getKeyGenerator().getKeyArray(dataChunkStore.getRow(k));
       int index = 0;
       for (int i = column; i < column + ordinal.length; i++) {
         if (vectorInfo[i].directDictionaryGenerator == null) {
@@ -109,18 +152,24 @@ public class ColumnGroupDimensionDataChunk implements 
DimensionColumnDataChunk<b
     return column + ordinal.length;
   }
 
-  @Override
-  public int fillConvertedChunkData(int[] rowMapping, ColumnVectorInfo[] 
vectorInfo, int column,
-      KeyStructureInfo info) {
+  /**
+   * Fill the data to vector
+   *
+   * @param rowMapping
+   * @param vectorInfo
+   * @param column
+   * @param restructuringInfo
+   * @return next column index
+   */
+  @Override public int fillConvertedChunkData(int[] rowMapping, 
ColumnVectorInfo[] vectorInfo,
+      int column, KeyStructureInfo restructuringInfo) {
     ColumnVectorInfo columnVectorInfo = vectorInfo[column];
     int offset = columnVectorInfo.offset;
     int vectorOffset = columnVectorInfo.vectorOffset;
     int len = offset + columnVectorInfo.size;
-    int columnValueSize = chunkAttributes.getColumnValueSize();
-    int[] ordinal = info.getMdkeyQueryDimensionOrdinal();
+    int[] ordinal = restructuringInfo.getMdkeyQueryDimensionOrdinal();
     for (int k = offset; k < len; k++) {
-      int start = rowMapping[k] * columnValueSize;
-      long[] keyArray = info.getKeyGenerator().getKeyArray(dataChunk, start);
+      long[] keyArray = 
restructuringInfo.getKeyGenerator().getKeyArray(dataChunkStore.getRow(k));
       int index = 0;
       for (int i = column; i < column + ordinal.length; i++) {
         if (vectorInfo[i].directDictionaryGenerator == null) {
@@ -134,53 +183,4 @@ public class ColumnGroupDimensionDataChunk implements 
DimensionColumnDataChunk<b
     }
     return column + ordinal.length;
   }
-
-  /**
-   * Below method masks key
-   *
-   */
-  public byte[] getMaskedKey(byte[] data, int offset, KeyStructureInfo info) {
-    byte[] maskedKey = new byte[info.getMaskByteRanges().length];
-    int counter = 0;
-    int byteRange = 0;
-    for (int i = 0; i < info.getMaskByteRanges().length; i++) {
-      byteRange = info.getMaskByteRanges()[i];
-      maskedKey[counter++] = (byte) (data[byteRange + offset] & 
info.getMaxKey()[byteRange]);
-    }
-    return maskedKey;
-  }
-
-  /**
-   * Below method to get the data based in row id
-   *
-   * @param rowId row id of the data
-   * @return chunk
-   */
-  @Override
-  public byte[] getChunkData(int rowId) {
-    byte[] data = new byte[chunkAttributes.getColumnValueSize()];
-    System.arraycopy(dataChunk, rowId * data.length, data, 0, data.length);
-    return data;
-  }
-
-  /**
-   * Below method will be used get the chunk attributes
-   *
-   * @return chunk attributes
-   */
-  @Override
-  public DimensionChunkAttributes getAttributes() {
-    return chunkAttributes;
-  }
-
-  /**
-   * Below method will be used to return the complete data chunk
-   * This will be required during filter query
-   *
-   * @return complete chunk
-   */
-  @Override
-  public byte[] getCompleteDataChunk() {
-    return dataChunk;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/FixedLengthDimensionDataChunk.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/FixedLengthDimensionDataChunk.java
 
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/FixedLengthDimensionDataChunk.java
index 8e06d16..328bb53 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/FixedLengthDimensionDataChunk.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/FixedLengthDimensionDataChunk.java
@@ -18,37 +18,36 @@
  */
 package org.apache.carbondata.core.carbon.datastore.chunk.impl;
 
-import 
org.apache.carbondata.core.carbon.datastore.chunk.DimensionChunkAttributes;
-import 
org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.DimensionChunkStoreFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.DimensionChunkStoreFactory.DimensionStoreType;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.scan.executor.infos.KeyStructureInfo;
 import org.apache.carbondata.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.scan.result.vector.ColumnVectorInfo;
 
 /**
- * This class is holder of the dimension column chunk data of the fixed length
- * key size
+ * This class is gives access to fixed length dimension data chunk store
  */
-public class FixedLengthDimensionDataChunk implements 
DimensionColumnDataChunk<byte[]> {
+public class FixedLengthDimensionDataChunk extends AbstractDimensionDataChunk {
 
   /**
-   * dimension chunk attributes
-   */
-  private DimensionChunkAttributes chunkAttributes;
-
-  /**
-   * data chunks
-   */
-  private byte[] dataChunk;
-
-  /**
-   * Constructor for this class
+   * Constructor
    *
-   * @param dataChunk       data chunk
-   * @param chunkAttributes chunk attributes
+   * @param dataChunk            data chunk
+   * @param invertedIndex        inverted index
+   * @param invertedIndexReverse reverse inverted index
+   * @param numberOfRows         number of rows
+   * @param columnValueSize      size of each column value
    */
-  public FixedLengthDimensionDataChunk(byte[] dataChunk, 
DimensionChunkAttributes chunkAttributes) {
-    this.chunkAttributes = chunkAttributes;
-    this.dataChunk = dataChunk;
+  public FixedLengthDimensionDataChunk(byte[] dataChunk, int[] invertedIndex,
+      int[] invertedIndexReverse, int numberOfRows, int columnValueSize) {
+    long totalSize = null != invertedIndex ?
+        dataChunk.length + (2 * numberOfRows * 
CarbonCommonConstants.INT_SIZE_IN_BYTE) :
+        dataChunk.length;
+    dataChunkStore = DimensionChunkStoreFactory.INSTANCE
+        .getDimensionChunkStore(columnValueSize, null != invertedIndex, 
numberOfRows, totalSize,
+            DimensionStoreType.FIXEDLENGTH);
+    dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunk);
   }
 
   /**
@@ -62,41 +61,42 @@ public class FixedLengthDimensionDataChunk implements 
DimensionColumnDataChunk<b
    */
   @Override public int fillChunkData(byte[] data, int offset, int index,
       KeyStructureInfo keyStructureInfo) {
-    if (chunkAttributes.getInvertedIndexes() != null) {
-      index = chunkAttributes.getInvertedIndexesReverse()[index];
-    }
-    System.arraycopy(dataChunk, index * chunkAttributes.getColumnValueSize(), 
data, offset,
-        chunkAttributes.getColumnValueSize());
-    return chunkAttributes.getColumnValueSize();
+    dataChunkStore.fillRow(index, data, offset);
+    return dataChunkStore.getColumnValueSize();
   }
 
   /**
    * Converts to column dictionary integer value
+   *
+   * @param rowId
+   * @param columnIndex
+   * @param row
+   * @param restructuringInfo
+   * @return
    */
   @Override public int fillConvertedChunkData(int rowId, int columnIndex, 
int[] row,
       KeyStructureInfo restructuringInfo) {
-    if (chunkAttributes.getInvertedIndexes() != null) {
-      rowId = chunkAttributes.getInvertedIndexesReverse()[rowId];
-    }
-    int start = rowId * chunkAttributes.getColumnValueSize();
-    int dict = getInt(chunkAttributes.getColumnValueSize(), start);
-    row[columnIndex] = dict;
+    row[columnIndex] = dataChunkStore.getSurrogate(rowId);
     return columnIndex + 1;
   }
 
+  /**
+   * Fill the data to vector
+   *
+   * @param vectorInfo
+   * @param column
+   * @param restructuringInfo
+   * @return next column index
+   */
   @Override public int fillConvertedChunkData(ColumnVectorInfo[] vectorInfo, 
int column,
       KeyStructureInfo restructuringInfo) {
     ColumnVectorInfo columnVectorInfo = vectorInfo[column];
     int offset = columnVectorInfo.offset;
     int vectorOffset = columnVectorInfo.vectorOffset;
     int len = columnVectorInfo.size + offset;
-    int[] indexesReverse = chunkAttributes.getInvertedIndexesReverse();
-    int columnValueSize = chunkAttributes.getColumnValueSize();
     CarbonColumnVector vector = columnVectorInfo.vector;
     for (int j = offset; j < len; j++) {
-      int start =
-          indexesReverse == null ? j * columnValueSize : indexesReverse[j] * 
columnValueSize;
-      int dict = getInt(columnValueSize, start);
+      int dict = dataChunkStore.getSurrogate(j);
       if (columnVectorInfo.directDictionaryGenerator == null) {
         vector.putInt(vectorOffset++, dict);
       } else {
@@ -119,23 +119,24 @@ public class FixedLengthDimensionDataChunk implements 
DimensionColumnDataChunk<b
     return column + 1;
   }
 
-  @Override
-  public int fillConvertedChunkData(int[] rowMapping, ColumnVectorInfo[] 
vectorInfo, int column,
-      KeyStructureInfo restructuringInfo) {
+  /**
+   * Fill the data to vector
+   *
+   * @param rowMapping
+   * @param vectorInfo
+   * @param column
+   * @param restructuringInfo
+   * @return next column index
+   */
+  @Override public int fillConvertedChunkData(int[] rowMapping, 
ColumnVectorInfo[] vectorInfo,
+      int column, KeyStructureInfo restructuringInfo) {
     ColumnVectorInfo columnVectorInfo = vectorInfo[column];
     int offset = columnVectorInfo.offset;
     int vectorOffset = columnVectorInfo.vectorOffset;
     int len = columnVectorInfo.size + offset;
-    int[] indexesReverse = chunkAttributes.getInvertedIndexesReverse();
-    int columnValueSize = chunkAttributes.getColumnValueSize();
     CarbonColumnVector vector = columnVectorInfo.vector;
     for (int j = offset; j < len; j++) {
-      // calculate the start index to take the dictionary data. Here 
dictionary data size is fixed
-      // so calculate using fixed size of it.
-      int start = indexesReverse == null ?
-          rowMapping[j] * columnValueSize :
-          indexesReverse[rowMapping[j]] * columnValueSize;
-      int dict = getInt(columnValueSize, start);
+      int dict = dataChunkStore.getSurrogate(rowMapping[j]);
       if (columnVectorInfo.directDictionaryGenerator == null) {
         vector.putInt(vectorOffset++, dict);
       } else {
@@ -157,54 +158,4 @@ public class FixedLengthDimensionDataChunk implements 
DimensionColumnDataChunk<b
     }
     return column + 1;
   }
-
-  /**
-   * Create the integer from fixed column size and start index
-   * @param columnValueSize
-   * @param start
-   * @return
-   */
-  private int getInt(int columnValueSize, int start) {
-    int dict = 0;
-    for (int i = start; i < start + columnValueSize; i++) {
-      dict <<= 8;
-      dict ^= dataChunk[i] & 0xFF;
-    }
-    return dict;
-  }
-
-  /**
-   * Below method to get the data based in row id
-   *
-   * @param index row id of the data
-   * @return chunk
-   */
-  @Override public byte[] getChunkData(int index) {
-    byte[] data = new byte[chunkAttributes.getColumnValueSize()];
-    if (chunkAttributes.getInvertedIndexes() != null) {
-      index = chunkAttributes.getInvertedIndexesReverse()[index];
-    }
-    System.arraycopy(dataChunk, index * chunkAttributes.getColumnValueSize(), 
data, 0,
-        chunkAttributes.getColumnValueSize());
-    return data;
-  }
-
-  /**
-   * Below method will be used get the chunk attributes
-   *
-   * @return chunk attributes
-   */
-  @Override public DimensionChunkAttributes getAttributes() {
-    return chunkAttributes;
-  }
-
-  /**
-   * Below method will be used to return the complete data chunk
-   * This will be required during filter query
-   *
-   * @return complete chunk
-   */
-  @Override public byte[] getCompleteDataChunk() {
-    return dataChunk;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/VariableLengthDimensionDataChunk.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/VariableLengthDimensionDataChunk.java
 
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/VariableLengthDimensionDataChunk.java
index 590e481..e93a86c 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/VariableLengthDimensionDataChunk.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/VariableLengthDimensionDataChunk.java
@@ -19,49 +19,43 @@
 package org.apache.carbondata.core.carbon.datastore.chunk.impl;
 
 import java.util.Arrays;
-import java.util.List;
 
-import 
org.apache.carbondata.core.carbon.datastore.chunk.DimensionChunkAttributes;
-import 
org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.DimensionChunkStoreFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.DimensionChunkStoreFactory.DimensionStoreType;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.scan.executor.infos.KeyStructureInfo;
 import org.apache.carbondata.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.scan.result.vector.ColumnVectorInfo;
 
 /**
- * This class is holder of the dimension column chunk data of the variable
- * length key size
+ * This class is gives access to variable length dimension data chunk store
  */
-public class VariableLengthDimensionDataChunk implements 
DimensionColumnDataChunk<List<byte[]>> {
-
-  /**
-   * dimension chunk attributes
-   */
-  private DimensionChunkAttributes chunkAttributes;
-
-  /**
-   * data chunk
-   */
-  private List<byte[]> dataChunk;
+public class VariableLengthDimensionDataChunk extends 
AbstractDimensionDataChunk {
 
   /**
    * Constructor for this class
    *
-   * @param dataChunk       data chunk
+   * @param dataChunkStore  data chunk
    * @param chunkAttributes chunk attributes
    */
-  public VariableLengthDimensionDataChunk(List<byte[]> dataChunk,
-      DimensionChunkAttributes chunkAttributes) {
-    this.chunkAttributes = chunkAttributes;
-    this.dataChunk = dataChunk;
+  public VariableLengthDimensionDataChunk(byte[] dataChunks, int[] 
invertedIndex,
+      int[] invertedIndexReverse, int numberOfRows) {
+    long totalSize = null != invertedIndex ?
+        (dataChunks.length + (2 * numberOfRows * 
CarbonCommonConstants.INT_SIZE_IN_BYTE) + (
+            numberOfRows * CarbonCommonConstants.INT_SIZE_IN_BYTE)) :
+        (dataChunks.length + (numberOfRows * 
CarbonCommonConstants.INT_SIZE_IN_BYTE));
+    dataChunkStore = DimensionChunkStoreFactory.INSTANCE
+        .getDimensionChunkStore(0, null != invertedIndex, numberOfRows, 
totalSize,
+            DimensionStoreType.VARIABLELENGTH);
+    dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunks);
   }
 
   /**
    * Below method will be used to fill the data based on offset and row id
    *
-   * @param data             data to filed
-   * @param offset           offset from which data need to be filed
-   * @param index            row id of the chunk
+   * @param data              data to filed
+   * @param offset            offset from which data need to be filed
+   * @param index             row id of the chunk
    * @param restructuringInfo define the structure of the key
    * @return how many bytes was copied
    */
@@ -74,10 +68,12 @@ public class VariableLengthDimensionDataChunk implements 
DimensionColumnDataChun
 
   /**
    * Converts to column dictionary integer value
+   *
    * @param rowId
    * @param columnIndex
    * @param row
-   * @param restructuringInfo  @return
+   * @param restructuringInfo
+   * @return
    */
   @Override public int fillConvertedChunkData(int rowId, int columnIndex, 
int[] row,
       KeyStructureInfo restructuringInfo) {
@@ -85,28 +81,36 @@ public class VariableLengthDimensionDataChunk implements 
DimensionColumnDataChun
   }
 
   /**
-   * Below method to get the data based in row id
-   *
-   * @param index row id of the data
-   * @return chunk
+   * @return whether column is dictionary column or not
    */
-  @Override public byte[] getChunkData(int index) {
-    if (null != chunkAttributes.getInvertedIndexes()) {
-      index = chunkAttributes.getInvertedIndexesReverse()[index];
-    }
-    return dataChunk.get(index);
+  @Override public boolean isNoDicitionaryColumn() {
+    return true;
   }
 
+  /**
+   * @return length of each column
+   */
+  @Override public int getColumnValueSize() {
+    return -1;
+  }
+
+  /**
+   * Fill the data to vector
+   *
+   * @param vectorInfo
+   * @param column
+   * @param restructuringInfo
+   * @return next column index
+   */
   @Override public int fillConvertedChunkData(ColumnVectorInfo[] vectorInfo, 
int column,
       KeyStructureInfo restructuringInfo) {
-    int[] indexesReverse = chunkAttributes.getInvertedIndexesReverse();
     ColumnVectorInfo columnVectorInfo = vectorInfo[column];
     CarbonColumnVector vector = columnVectorInfo.vector;
     int offset = columnVectorInfo.offset;
     int vectorOffset = columnVectorInfo.vectorOffset;
     int len = offset + columnVectorInfo.size;
     for (int i = offset; i < len; i++) {
-      byte[] value = dataChunk.get(indexesReverse == null ? i : 
indexesReverse[i]);
+      byte[] value = dataChunkStore.getRow(i);
       // Considering only String case now as we support only
       // string in no dictionary case at present.
       if (value == null || 
Arrays.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, value)) {
@@ -118,18 +122,24 @@ public class VariableLengthDimensionDataChunk implements 
DimensionColumnDataChun
     return column + 1;
   }
 
-  @Override
-  public int fillConvertedChunkData(int[] rowMapping, ColumnVectorInfo[] 
vectorInfo, int column,
-      KeyStructureInfo restructuringInfo) {
-    int[] indexesReverse = chunkAttributes.getInvertedIndexesReverse();
+  /**
+   * Fill the data to vector
+   *
+   * @param rowMapping
+   * @param vectorInfo
+   * @param column
+   * @param restructuringInfo
+   * @return next column index
+   */
+  @Override public int fillConvertedChunkData(int[] rowMapping, 
ColumnVectorInfo[] vectorInfo,
+      int column, KeyStructureInfo restructuringInfo) {
     ColumnVectorInfo columnVectorInfo = vectorInfo[column];
     CarbonColumnVector vector = columnVectorInfo.vector;
     int offset = columnVectorInfo.offset;
     int vectorOffset = columnVectorInfo.vectorOffset;
     int len = offset + columnVectorInfo.size;
     for (int i = offset; i < len; i++) {
-      byte[] value =
-          dataChunk.get(indexesReverse == null ? rowMapping[i] : 
indexesReverse[rowMapping[i]]);
+      byte[] value = dataChunkStore.getRow(rowMapping[i]);
       // Considering only String case now as we support only
       // string in no dictionary case at present.
       if (value == null || 
Arrays.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, value)) {
@@ -140,23 +150,4 @@ public class VariableLengthDimensionDataChunk implements 
DimensionColumnDataChun
     }
     return column + 1;
   }
-
-  /**
-   * Below method will be used get the chunk attributes
-   *
-   * @return chunk attributes
-   */
-  @Override public DimensionChunkAttributes getAttributes() {
-    return chunkAttributes;
-  }
-
-  /**
-   * Below method will be used to return the complete data chunk
-   * This will be required during filter query
-   *
-   * @return complete chunk
-   */
-  @Override public List<byte[]> getCompleteDataChunk() {
-    return dataChunk;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/AbstractChunkReader.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/AbstractChunkReader.java
 
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/AbstractChunkReader.java
index 6638c96..df34f8a 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/AbstractChunkReader.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/AbstractChunkReader.java
@@ -18,10 +18,6 @@
  */
 package org.apache.carbondata.core.carbon.datastore.chunk.reader.dimension;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
 import 
org.apache.carbondata.core.carbon.datastore.chunk.reader.DimensionColumnChunkReader;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastorage.store.compression.Compressor;
@@ -38,7 +34,7 @@ public abstract class AbstractChunkReader implements 
DimensionColumnChunkReader
   /**
    * compressor will be used to uncompress the data
    */
-  protected static final Compressor compressor = 
CompressorFactory.getInstance();
+  protected static final Compressor COMPRESSOR = 
CompressorFactory.getInstance().getCompressor();
 
   /**
    * size of the each column value
@@ -61,7 +57,7 @@ public abstract class AbstractChunkReader implements 
DimensionColumnChunkReader
   /**
    * number of element in each chunk
    */
-  private int numberOfElement;
+  protected int numberOfRows;
 
   /**
    * Constructor to get minimum parameter to create
@@ -70,8 +66,8 @@ public abstract class AbstractChunkReader implements 
DimensionColumnChunkReader
    * @param eachColumnValueSize  size of the each column value
    * @param filePath             file from which data will be read
    */
-  public AbstractChunkReader(final int[] eachColumnValueSize,
-      final String filePath) {
+  public AbstractChunkReader(final int[] eachColumnValueSize, final String 
filePath,
+      int numberOfRows) {
     this.eachColumnValueSize = eachColumnValueSize;
     this.filePath = filePath;
     int numberOfElement = 0;
@@ -83,6 +79,7 @@ public abstract class AbstractChunkReader implements 
DimensionColumnChunkReader
       numberOfElement = 
Integer.parseInt(CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL);
     }
     this.numberComressor = new NumberCompressor(numberOfElement);
+    this.numberOfRows = numberOfRows;
   }
 
   /**
@@ -100,34 +97,4 @@ public abstract class AbstractChunkReader implements 
DimensionColumnChunkReader
     }
     return columnIndexTemp;
   }
-
-  /**
-   * In case of no dictionary column size of the each column value
-   * will not be same, so in case of filter query we can not take
-   * advantage of binary search as length with each value will be also
-   * store with the data, so converting this data to two dimension
-   * array format filter query processing will be faster
-   *
-   * @param dataChunkWithLength no dictionary column chunk
-   *                            <Lenght><Data><Lenght><data>
-   *                            Length will store in 2 bytes
-   * @return list of data chuck, one value in list will represent one column 
value
-   */
-  protected List<byte[]> getNoDictionaryDataChunk(byte[] dataChunkWithLength) {
-    List<byte[]> dataChunk = new ArrayList<byte[]>(numberOfElement);
-    // wrapping the chunk to byte buffer
-    ByteBuffer buffer = ByteBuffer.wrap(dataChunkWithLength);
-    buffer.rewind();
-    byte[] data = null;
-    // iterating till all the elements are read
-    while (buffer.hasRemaining()) {
-      // as all the data is stored with length(2 bytes)
-      // first reading the size and then based on size
-      // we need to read the actual value
-      data = new byte[buffer.getShort()];
-      buffer.get(data);
-      dataChunk.add(data);
-    }
-    return dataChunk;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java
 
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java
index abacbd4..a521e1e 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java
@@ -21,7 +21,6 @@ package 
org.apache.carbondata.core.carbon.datastore.chunk.reader.dimension.v1;
 import java.io.IOException;
 import java.util.List;
 
-import 
org.apache.carbondata.core.carbon.datastore.chunk.DimensionChunkAttributes;
 import 
org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
 import 
org.apache.carbondata.core.carbon.datastore.chunk.impl.ColumnGroupDimensionDataChunk;
 import 
org.apache.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk;
@@ -54,7 +53,7 @@ public class CompressedDimensionChunkFileBasedReaderV1 
extends AbstractChunkRead
    */
   public CompressedDimensionChunkFileBasedReaderV1(final BlockletInfo 
blockletInfo,
       final int[] eachColumnValueSize, final String filePath) {
-    super(eachColumnValueSize, filePath);
+    super(eachColumnValueSize, filePath, blockletInfo.getNumberOfRows());
     this.dimensionColumnChunk = blockletInfo.getDimensionColumnChunk();
   }
 
@@ -93,7 +92,7 @@ public class CompressedDimensionChunkFileBasedReaderV1 
extends AbstractChunkRead
     int[] rlePage = null;
 
     // first read the data and uncompressed it
-    dataPage = compressor.unCompressByte(fileReader
+    dataPage = COMPRESSOR.unCompressByte(fileReader
         .readByteArray(filePath, 
dimensionColumnChunk.get(blockIndex).getDataPageOffset(),
             dimensionColumnChunk.get(blockIndex).getDataPageLength()));
     // if row id block is present then read the row id chunk and uncompress it
@@ -103,7 +102,7 @@ public class CompressedDimensionChunkFileBasedReaderV1 
extends AbstractChunkRead
           
.getUnCompressColumnIndex(dimensionColumnChunk.get(blockIndex).getRowIdPageLength(),
               fileReader.readByteArray(filePath,
                   dimensionColumnChunk.get(blockIndex).getRowIdPageOffset(),
-                  dimensionColumnChunk.get(blockIndex).getRowIdPageLength()), 
numberComressor);
+                  dimensionColumnChunk.get(blockIndex).getRowIdPageLength()), 
numberComressor, 0);
       // get the reverse index
       invertedIndexesReverse = getInvertedReverseIndex(invertedIndexes);
     }
@@ -113,33 +112,32 @@ public class CompressedDimensionChunkFileBasedReaderV1 
extends AbstractChunkRead
         .hasEncoding(dimensionColumnChunk.get(blockIndex).getEncodingList(), 
Encoding.RLE)) {
       // read and uncompress the rle block
       rlePage = numberComressor.unCompress(fileReader
-          .readByteArray(filePath, 
dimensionColumnChunk.get(blockIndex).getRlePageOffset(),
-              dimensionColumnChunk.get(blockIndex).getRlePageLength()));
+              .readByteArray(filePath, 
dimensionColumnChunk.get(blockIndex).getRlePageOffset(),
+                  dimensionColumnChunk.get(blockIndex).getRlePageLength()), 0,
+          dimensionColumnChunk.get(blockIndex).getRlePageLength());
       // uncompress the data with rle indexes
       dataPage = UnBlockIndexer.uncompressData(dataPage, rlePage, 
eachColumnValueSize[blockIndex]);
       rlePage = null;
     }
     // fill chunk attributes
-    DimensionChunkAttributes chunkAttributes = new DimensionChunkAttributes();
-    chunkAttributes.setEachRowSize(eachColumnValueSize[blockIndex]);
-    chunkAttributes.setInvertedIndexes(invertedIndexes);
-    chunkAttributes.setInvertedIndexesReverse(invertedIndexesReverse);
     DimensionColumnDataChunk columnDataChunk = null;
-
     if (dimensionColumnChunk.get(blockIndex).isRowMajor()) {
       // to store fixed length column chunk values
-      columnDataChunk = new ColumnGroupDimensionDataChunk(dataPage, 
chunkAttributes);
+      columnDataChunk = new ColumnGroupDimensionDataChunk(dataPage, 
eachColumnValueSize[blockIndex],
+          numberOfRows);
     }
     // if no dictionary column then first create a no dictionary column chunk
     // and set to data chunk instance
     else if (!CarbonUtil
         .hasEncoding(dimensionColumnChunk.get(blockIndex).getEncodingList(), 
Encoding.DICTIONARY)) {
       columnDataChunk =
-          new 
VariableLengthDimensionDataChunk(getNoDictionaryDataChunk(dataPage), 
chunkAttributes);
-      chunkAttributes.setNoDictionary(true);
+          new VariableLengthDimensionDataChunk(dataPage, invertedIndexes, 
invertedIndexesReverse,
+              numberOfRows);
     } else {
       // to store fixed length column chunk values
-      columnDataChunk = new FixedLengthDimensionDataChunk(dataPage, 
chunkAttributes);
+      columnDataChunk =
+          new FixedLengthDimensionDataChunk(dataPage, invertedIndexes, 
invertedIndexesReverse,
+              numberOfRows, eachColumnValueSize[blockIndex]);
     }
     return columnDataChunk;
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
 
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
index 7843d05..c4bc01b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
@@ -21,7 +21,6 @@ package 
org.apache.carbondata.core.carbon.datastore.chunk.reader.dimension.v2;
 import java.io.IOException;
 import java.util.List;
 
-import 
org.apache.carbondata.core.carbon.datastore.chunk.DimensionChunkAttributes;
 import 
org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
 import 
org.apache.carbondata.core.carbon.datastore.chunk.impl.ColumnGroupDimensionDataChunk;
 import 
org.apache.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk;
@@ -30,7 +29,6 @@ import 
org.apache.carbondata.core.carbon.datastore.chunk.reader.dimension.Abstra
 import org.apache.carbondata.core.carbon.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.datastorage.store.FileHolder;
 import org.apache.carbondata.core.datastorage.store.columnar.UnBlockIndexer;
-import 
org.apache.carbondata.core.datastorage.store.compression.CompressorFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.Encoding;
@@ -59,7 +57,7 @@ public class CompressedDimensionChunkFileBasedReaderV2 
extends AbstractChunkRead
    */
   public CompressedDimensionChunkFileBasedReaderV2(final BlockletInfo 
blockletInfo,
       final int[] eachColumnValueSize, final String filePath) {
-    super(eachColumnValueSize, filePath);
+    super(eachColumnValueSize, filePath, blockletInfo.getNumberOfRows());
     this.dimensionChunksOffset = blockletInfo.getDimensionChunkOffsets();
     this.dimensionChunksLength = blockletInfo.getDimensionChunksLength();
 
@@ -135,7 +133,8 @@ public class CompressedDimensionChunkFileBasedReaderV2 
extends AbstractChunkRead
     if (dimensionChunksOffset.size() - 1 == blockIndex) {
       dimensionChunk = fileReader.readByteArray(filePath, 
dimensionChunksOffset.get(blockIndex),
           dimensionChunksLength.get(blockIndex));
-      dimensionColumnChunk = CarbonUtil.readDataChunk(dimensionChunk);
+      dimensionColumnChunk = CarbonUtil
+          .readDataChunk(dimensionChunk, copySourcePoint, 
dimensionChunksLength.get(blockIndex));
       int totalDimensionDataLength =
           dimensionColumnChunk.data_page_length + 
dimensionColumnChunk.rle_page_length
               + dimensionColumnChunk.rowid_page_length;
@@ -146,63 +145,52 @@ public class CompressedDimensionChunkFileBasedReaderV2 
extends AbstractChunkRead
       long currentDimensionOffset = dimensionChunksOffset.get(blockIndex);
       data = fileReader.readByteArray(filePath, currentDimensionOffset,
           (int) (dimensionChunksOffset.get(blockIndex + 1) - 
currentDimensionOffset));
-      dimensionChunk = new byte[dimensionChunksLength.get(blockIndex)];
-      System.arraycopy(data, copySourcePoint, dimensionChunk, 0,
-          dimensionChunksLength.get(blockIndex));
-      dimensionColumnChunk = CarbonUtil.readDataChunk(dimensionChunk);
+      dimensionColumnChunk =
+          CarbonUtil.readDataChunk(data, copySourcePoint, 
dimensionChunksLength.get(blockIndex));
       copySourcePoint += dimensionChunksLength.get(blockIndex);
     }
 
-    byte[] compressedDataPage = new 
byte[dimensionColumnChunk.data_page_length];
-    System.arraycopy(data, copySourcePoint, compressedDataPage, 0,
-        dimensionColumnChunk.data_page_length);
-    copySourcePoint += dimensionColumnChunk.data_page_length;
     // first read the data and uncompressed it
-    dataPage = 
CompressorFactory.getInstance().unCompressByte(compressedDataPage);
+    dataPage =
+        COMPRESSOR.unCompressByte(data, copySourcePoint, 
dimensionColumnChunk.data_page_length);
+    copySourcePoint += dimensionColumnChunk.data_page_length;
     // if row id block is present then read the row id chunk and uncompress it
     if (hasEncoding(dimensionColumnChunk.encoders, Encoding.INVERTED_INDEX)) {
-      byte[] compressedIndexPage = new 
byte[dimensionColumnChunk.rowid_page_length];
-      System.arraycopy(data, copySourcePoint, compressedIndexPage, 0,
-          dimensionColumnChunk.rowid_page_length);
-      copySourcePoint += dimensionColumnChunk.rowid_page_length;
       invertedIndexes = CarbonUtil
-          .getUnCompressColumnIndex(dimensionColumnChunk.rowid_page_length, 
compressedIndexPage,
-              numberComressor);
+          .getUnCompressColumnIndex(dimensionColumnChunk.rowid_page_length, 
data, numberComressor,
+              copySourcePoint);
+      copySourcePoint += dimensionColumnChunk.rowid_page_length;
       // get the reverse index
       invertedIndexesReverse = getInvertedReverseIndex(invertedIndexes);
     }
     // if rle is applied then read the rle block chunk and then uncompress
     //then actual data based on rle block
     if (hasEncoding(dimensionColumnChunk.encoders, Encoding.RLE)) {
-      // read and uncompress the rle block
-      byte[] compressedRLEPage = new 
byte[dimensionColumnChunk.rle_page_length];
-      System.arraycopy(data, copySourcePoint, compressedRLEPage, 0,
-          dimensionColumnChunk.rle_page_length);
-      rlePage = numberComressor.unCompress(compressedRLEPage);
+      rlePage =
+          numberComressor.unCompress(data, copySourcePoint, 
dimensionColumnChunk.rle_page_length);
       // uncompress the data with rle indexes
       dataPage = UnBlockIndexer.uncompressData(dataPage, rlePage, 
eachColumnValueSize[blockIndex]);
       rlePage = null;
     }
     // fill chunk attributes
-    DimensionChunkAttributes chunkAttributes = new DimensionChunkAttributes();
-    chunkAttributes.setEachRowSize(eachColumnValueSize[blockIndex]);
-    chunkAttributes.setInvertedIndexes(invertedIndexes);
-    chunkAttributes.setInvertedIndexesReverse(invertedIndexesReverse);
     DimensionColumnDataChunk columnDataChunk = null;
 
     if (dimensionColumnChunk.isRowMajor()) {
       // to store fixed length column chunk values
-      columnDataChunk = new ColumnGroupDimensionDataChunk(dataPage, 
chunkAttributes);
+      columnDataChunk = new ColumnGroupDimensionDataChunk(dataPage, 
eachColumnValueSize[blockIndex],
+          numberOfRows);
     }
     // if no dictionary column then first create a no dictionary column chunk
     // and set to data chunk instance
     else if (!hasEncoding(dimensionColumnChunk.encoders, Encoding.DICTIONARY)) 
{
       columnDataChunk =
-          new 
VariableLengthDimensionDataChunk(getNoDictionaryDataChunk(dataPage), 
chunkAttributes);
-      chunkAttributes.setNoDictionary(true);
+          new VariableLengthDimensionDataChunk(dataPage, invertedIndexes, 
invertedIndexesReverse,
+              numberOfRows);
     } else {
       // to store fixed length column chunk values
-      columnDataChunk = new FixedLengthDimensionDataChunk(dataPage, 
chunkAttributes);
+      columnDataChunk =
+          new FixedLengthDimensionDataChunk(dataPage, invertedIndexes, 
invertedIndexesReverse,
+              numberOfRows, eachColumnValueSize[blockIndex]);
     }
     return columnDataChunk;
   }
@@ -230,31 +218,24 @@ public class CompressedDimensionChunkFileBasedReaderV2 
extends AbstractChunkRead
     int[] invertedIndexes = null;
     int[] invertedIndexesReverse = null;
     int[] rlePage = null;
-    byte[] dimensionChunk = null;
     DataChunk2 dimensionColumnChunk = null;
     int index = 0;
     for (int i = startBlockIndex; i <= endBlockIndex; i++) {
       invertedIndexes = null;
       invertedIndexesReverse = null;
-      dimensionChunk = new byte[dimensionChunksLength.get(i)];
-      System.arraycopy(data, copySourcePoint, dimensionChunk, 0, 
dimensionChunksLength.get(i));
-      dimensionColumnChunk = CarbonUtil.readDataChunk(dimensionChunk);
+      dimensionColumnChunk =
+          CarbonUtil.readDataChunk(data, copySourcePoint, 
dimensionChunksLength.get(i));
       copySourcePoint += dimensionChunksLength.get(i);
-      byte[] compressedDataPage = new 
byte[dimensionColumnChunk.data_page_length];
-      System.arraycopy(data, copySourcePoint, compressedDataPage, 0,
-          dimensionColumnChunk.data_page_length);
-      copySourcePoint += dimensionColumnChunk.data_page_length;
       // first read the data and uncompressed it
-      dataPage = 
CompressorFactory.getInstance().unCompressByte(compressedDataPage);
+      dataPage =
+          COMPRESSOR.unCompressByte(data, copySourcePoint, 
dimensionColumnChunk.data_page_length);
+      copySourcePoint += dimensionColumnChunk.data_page_length;
       // if row id block is present then read the row id chunk and uncompress 
it
       if (hasEncoding(dimensionColumnChunk.encoders, Encoding.INVERTED_INDEX)) 
{
-        byte[] compressedIndexPage = new 
byte[dimensionColumnChunk.rowid_page_length];
-        System.arraycopy(data, copySourcePoint, compressedIndexPage, 0,
-            dimensionColumnChunk.rowid_page_length);
-        copySourcePoint += dimensionColumnChunk.rowid_page_length;
         invertedIndexes = CarbonUtil
-            .getUnCompressColumnIndex(dimensionColumnChunk.rowid_page_length, 
compressedIndexPage,
-                numberComressor);
+            .getUnCompressColumnIndex(dimensionColumnChunk.rowid_page_length, 
data, numberComressor,
+                copySourcePoint);
+        copySourcePoint += dimensionColumnChunk.rowid_page_length;
         // get the reverse index
         invertedIndexesReverse = getInvertedReverseIndex(invertedIndexes);
       }
@@ -262,34 +243,31 @@ public class CompressedDimensionChunkFileBasedReaderV2 
extends AbstractChunkRead
       //then actual data based on rle block
       if (hasEncoding(dimensionColumnChunk.encoders, Encoding.RLE)) {
         // read and uncompress the rle block
-        byte[] compressedRLEPage = new 
byte[dimensionColumnChunk.rle_page_length];
-        System.arraycopy(data, copySourcePoint, compressedRLEPage, 0,
-            dimensionColumnChunk.rle_page_length);
+        rlePage =
+            numberComressor.unCompress(data, copySourcePoint, 
dimensionColumnChunk.rle_page_length);
         copySourcePoint += dimensionColumnChunk.rle_page_length;
-        rlePage = numberComressor.unCompress(compressedRLEPage);
         // uncompress the data with rle indexes
         dataPage = UnBlockIndexer.uncompressData(dataPage, rlePage, 
eachColumnValueSize[i]);
         rlePage = null;
       }
       // fill chunk attributes
-      DimensionChunkAttributes chunkAttributes = new 
DimensionChunkAttributes();
-      chunkAttributes.setEachRowSize(eachColumnValueSize[i]);
-      chunkAttributes.setInvertedIndexes(invertedIndexes);
-      chunkAttributes.setInvertedIndexesReverse(invertedIndexesReverse);
       DimensionColumnDataChunk columnDataChunk = null;
       if (dimensionColumnChunk.isRowMajor()) {
         // to store fixed length column chunk values
-        columnDataChunk = new ColumnGroupDimensionDataChunk(dataPage, 
chunkAttributes);
+        columnDataChunk =
+            new ColumnGroupDimensionDataChunk(dataPage, 
eachColumnValueSize[i], numberOfRows);
       }
       // if no dictionary column then first create a no dictionary column chunk
       // and set to data chunk instance
       else if (!hasEncoding(dimensionColumnChunk.encoders, 
Encoding.DICTIONARY)) {
-        columnDataChunk = new 
VariableLengthDimensionDataChunk(getNoDictionaryDataChunk(dataPage),
-            chunkAttributes);
-        chunkAttributes.setNoDictionary(true);
+        columnDataChunk =
+            new VariableLengthDimensionDataChunk(dataPage, invertedIndexes, 
invertedIndexesReverse,
+                numberOfRows);
       } else {
         // to store fixed length column chunk values
-        columnDataChunk = new FixedLengthDimensionDataChunk(dataPage, 
chunkAttributes);
+        columnDataChunk =
+            new FixedLengthDimensionDataChunk(dataPage, invertedIndexes, 
invertedIndexesReverse,
+                numberOfRows, eachColumnValueSize[i]);
       }
       dataChunks[index++] = columnDataChunk;
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
 
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
index 3dd0d44..00c0ad7 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
@@ -28,6 +28,7 @@ import 
org.apache.carbondata.core.carbon.metadata.blocklet.datachunk.DataChunk;
 import org.apache.carbondata.core.datastorage.store.FileHolder;
 import 
org.apache.carbondata.core.datastorage.store.compression.ReaderCompressModel;
 import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder.UnCompressValue;
+
 import 
org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
@@ -84,14 +85,12 @@ public class CompressedMeasureChunkFileBasedReaderV1 
extends AbstractMeasureChun
     ValueEncoderMeta meta = 
measureColumnChunks.get(blockIndex).getValueEncoderMeta().get(0);
     ReaderCompressModel compressModel = 
ValueCompressionUtil.getReaderCompressModel(meta);
     UnCompressValue values = 
compressModel.getUnCompressValues().getNew().getCompressorObject();
-    values.setValue(
-        fileReader.readByteArray(filePath, 
measureColumnChunks.get(blockIndex).getDataPageOffset(),
-            measureColumnChunks.get(blockIndex).getDataPageLength()));
-    // get the data holder after uncompressing
-    CarbonReadDataHolder measureDataHolder =
-        values.uncompress(compressModel.getConvertedDataType())
-            .getValues(compressModel.getMantissa(), 
compressModel.getMaxValue());
-
+    CarbonReadDataHolder measureDataHolder = new CarbonReadDataHolder(values
+        .uncompress(compressModel.getConvertedDataType(), fileReader
+                .readByteArray(filePath, 
measureColumnChunks.get(blockIndex).getDataPageOffset(),
+                    measureColumnChunks.get(blockIndex).getDataPageLength()), 
0,
+            measureColumnChunks.get(blockIndex).getDataPageLength(), 
compressModel.getMantissa(),
+            compressModel.getMaxValue()));
     // create and set the data chunk
     MeasureColumnDataChunk datChunk = new MeasureColumnDataChunk();
     datChunk.setMeasureDataHolder(measureDataHolder);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
 
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
index 94b685c..60fe4f7 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
@@ -75,23 +75,22 @@ public class CompressedMeasureChunkFileBasedReaderV2 
extends AbstractMeasureChun
       org.apache.carbondata.format.PresenceMeta presentMetadataThrift) {
     PresenceMeta presenceMeta = new PresenceMeta();
     
presenceMeta.setRepresentNullValues(presentMetadataThrift.isRepresents_presence());
-    presenceMeta.setBitSet(
-        BitSet.valueOf(CompressorFactory.getInstance().unCompressByte(
-            presentMetadataThrift.getPresent_bit_stream())));
+    
presenceMeta.setBitSet(BitSet.valueOf(CompressorFactory.getInstance().getCompressor()
+        .unCompressByte(presentMetadataThrift.getPresent_bit_stream())));
     return presenceMeta;
   }
 
   /**
    * Below method will be used to read the chunk based on block indexes
-   * Reading logic of below method is:
-   * Except last column all the column chunk can be read in group
-   * if not last column then read data of all the column present in block index
-   * together then process it.
-   * For last column read is separately and process
+   * Reading logic of below method is: Except last column all the column chunk
+   * can be read in group if not last column then read data of all the column
+   * present in block index together then process it. For last column read is
+   * separately and process
    *
    * @param fileReader   file reader to read the blocks from file
    * @param blockIndexes blocks range to be read
    * @return measure column chunks
+   * @throws IOException
    */
   public MeasureColumnDataChunk[] readMeasureChunks(FileHolder fileReader, 
int[][] blockIndexes)
       throws IOException {
@@ -131,6 +130,7 @@ public class CompressedMeasureChunkFileBasedReaderV2 
extends AbstractMeasureChun
    * @param fileReader file reader to read the blocks
    * @param blockIndex block to be read
    * @return measure data chunk
+   * @throws IOException
    */
   @Override public MeasureColumnDataChunk readMeasureChunk(FileHolder 
fileReader, int blockIndex)
       throws IOException {
@@ -138,27 +138,23 @@ public class CompressedMeasureChunkFileBasedReaderV2 
extends AbstractMeasureChun
     DataChunk2 measureColumnChunk = null;
     byte[] measureDataChunk = null;
     byte[] data = null;
-    byte[] dataPage = null;
+    int copyPoint = 0;
     if (measureColumnChunkOffsets.size() - 1 == blockIndex) {
       measureDataChunk = fileReader
           .readByteArray(filePath, measureColumnChunkOffsets.get(blockIndex),
               measureColumnChunkLength.get(blockIndex));
-      measureColumnChunk = CarbonUtil.readDataChunk(measureDataChunk);
-      assert measureColumnChunk != null;
-      dataPage = fileReader.readByteArray(filePath,
+      measureColumnChunk = CarbonUtil
+          .readDataChunk(measureDataChunk, copyPoint, 
measureColumnChunkLength.get(blockIndex));
+      data = fileReader.readByteArray(filePath,
           measureColumnChunkOffsets.get(blockIndex) + 
measureColumnChunkLength.get(blockIndex),
           measureColumnChunk.data_page_length);
     } else {
       long currentMeasureOffset = measureColumnChunkOffsets.get(blockIndex);
       data = fileReader.readByteArray(filePath, currentMeasureOffset,
           (int) (measureColumnChunkOffsets.get(blockIndex + 1) - 
currentMeasureOffset));
-      measureDataChunk = new byte[measureColumnChunkLength.get(blockIndex)];
-      System.arraycopy(data, 0, measureDataChunk, 0, 
measureColumnChunkLength.get(blockIndex));
-      measureColumnChunk = CarbonUtil.readDataChunk(measureDataChunk);
-      assert measureColumnChunk != null;
-      dataPage = new byte[measureColumnChunk.data_page_length];
-      System.arraycopy(data, measureColumnChunkLength.get(blockIndex), 
dataPage, 0,
-          measureColumnChunk.data_page_length);
+      measureColumnChunk =
+          CarbonUtil.readDataChunk(data, copyPoint, 
measureColumnChunkLength.get(blockIndex));
+      copyPoint += measureColumnChunkLength.get(blockIndex);
     }
     List<ValueEncoderMeta> valueEncodeMeta = new ArrayList<>();
     for (int i = 0; i < measureColumnChunk.getEncoder_meta().size(); i++) {
@@ -168,13 +164,10 @@ public class CompressedMeasureChunkFileBasedReaderV2 
extends AbstractMeasureChun
     WriterCompressModel compressionModel = 
CarbonUtil.getValueCompressionModel(valueEncodeMeta);
     UnCompressValue values =
         
compressionModel.getUnCompressValues()[0].getNew().getCompressorObject();
-    // create a new uncompressor
-    // read data from file and set to uncompressor
-    values.setValue(dataPage);
-    // get the data holder after uncompressing
-    CarbonReadDataHolder measureDataHolder =
-        values.uncompress(compressionModel.getConvertedDataType()[0])
-            .getValues(compressionModel.getMantissa()[0], 
compressionModel.getMaxValue()[0]);
+    CarbonReadDataHolder measureDataHolder = new CarbonReadDataHolder(values
+        .uncompress(compressionModel.getConvertedDataType()[0], data, 
copyPoint,
+            measureColumnChunk.data_page_length, 
compressionModel.getMantissa()[0],
+            compressionModel.getMaxValue()[0]));
     // set the data chunk
     datChunk.setMeasureDataHolder(measureDataHolder);
     // set the enun value indexes
@@ -183,14 +176,15 @@ public class CompressedMeasureChunkFileBasedReaderV2 
extends AbstractMeasureChun
   }
 
   /**
-   * Below method will be used to read the dimension chunks in group.
-   * This is to enhance the IO performance. Will read the data from start index
-   * to end index(including)
+   * Below method will be used to read the dimension chunks in group. This is
+   * to enhance the IO performance. Will read the data from start index to end
+   * index(including)
    *
    * @param fileReader      stream used for reading
    * @param startBlockIndex start block index
    * @param endBlockIndex   end block index
    * @return measure column chunk array
+   * @throws IOException
    */
   private MeasureColumnDataChunk[] readMeasureChunksInGroup(FileHolder 
fileReader,
       int startBlockIndex, int endBlockIndex) throws IOException {
@@ -202,19 +196,12 @@ public class CompressedMeasureChunkFileBasedReaderV2 
extends AbstractMeasureChun
     MeasureColumnDataChunk dataChunk = null;
     int index = 0;
     int copyPoint = 0;
-    byte[] measureDataChunk = null;
-    byte[] dataPage = null;
     DataChunk2 measureColumnChunk = null;
     for (int i = startBlockIndex; i <= endBlockIndex; i++) {
       dataChunk = new MeasureColumnDataChunk();
-      measureDataChunk = new byte[measureColumnChunkLength.get(i)];
-      System.arraycopy(data, copyPoint, measureDataChunk, 0, 
measureColumnChunkLength.get(i));
-      measureColumnChunk = CarbonUtil.readDataChunk(measureDataChunk);
-      assert measureColumnChunk != null;
-      dataPage = new byte[measureColumnChunk.data_page_length];
+      measureColumnChunk =
+          CarbonUtil.readDataChunk(data, copyPoint, 
measureColumnChunkLength.get(i));
       copyPoint += measureColumnChunkLength.get(i);
-      System.arraycopy(data, copyPoint, dataPage, 0, 
measureColumnChunk.data_page_length);
-      copyPoint += measureColumnChunk.data_page_length;
       List<ValueEncoderMeta> valueEncodeMeta = new ArrayList<>();
       for (int j = 0; j < measureColumnChunk.getEncoder_meta().size(); j++) {
         valueEncodeMeta.add(
@@ -223,13 +210,12 @@ public class CompressedMeasureChunkFileBasedReaderV2 
extends AbstractMeasureChun
       WriterCompressModel compressionModel = 
CarbonUtil.getValueCompressionModel(valueEncodeMeta);
       UnCompressValue values =
           
compressionModel.getUnCompressValues()[0].getNew().getCompressorObject();
-      // create a new uncompressor
-      // read data from file and set to uncompressor
-      values.setValue(dataPage);
       // get the data holder after uncompressing
-      CarbonReadDataHolder measureDataHolder =
-          values.uncompress(compressionModel.getConvertedDataType()[0])
-              .getValues(compressionModel.getMantissa()[0], 
compressionModel.getMaxValue()[0]);
+      CarbonReadDataHolder measureDataHolder = new CarbonReadDataHolder(values
+          .uncompress(compressionModel.getConvertedDataType()[0], data, 
copyPoint,
+              measureColumnChunk.data_page_length, 
compressionModel.getMantissa()[0],
+              compressionModel.getMaxValue()[0]));
+      copyPoint += measureColumnChunk.data_page_length;
       // set the data chunk
       dataChunk.setMeasureDataHolder(measureDataHolder);
       // set the enun value indexes

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/store/DimensionChunkStoreFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/store/DimensionChunkStoreFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/store/DimensionChunkStoreFactory.java
new file mode 100644
index 0000000..d1df23c
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/store/DimensionChunkStoreFactory.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.carbon.datastore.chunk.store;
+
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.impl.safe.SafeFixedLengthDimensionDataChunkStore;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.impl.safe.SafeVariableLengthDimensionDataChunkStore;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.impl.unsafe.UnsafeFixedLengthDimensionDataChunkStore;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.impl.unsafe.UnsafeVariableLengthDimesionDataChunkStore;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+/**
+ * Below class will be used to get the dimension store type
+ */
+public class DimensionChunkStoreFactory {
+
+  /**
+   * store factory instance
+   */
+  public static final DimensionChunkStoreFactory INSTANCE = new 
DimensionChunkStoreFactory();
+
+  /**
+   * is unsafe
+   */
+  private static final boolean isUnsafe;
+
+  static {
+    isUnsafe = Boolean.parseBoolean(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION,
+            
CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION_DEFAULTVALUE));
+  }
+
+  private DimensionChunkStoreFactory() {
+
+  }
+
+  /**
+   * Below method will be used to get the dimension store type
+   *
+   * @param columnValueSize column value size
+   * @param isInvertedIndex is inverted index
+   * @param numberOfRows    number of rows
+   * @param totalSize       total size of data
+   * @param storeType       store type
+   * @return dimension store type
+   */
+  public DimensionDataChunkStore getDimensionChunkStore(int columnValueSize,
+      boolean isInvertedIndex, int numberOfRows, long totalSize, 
DimensionStoreType storeType) {
+
+    if (isUnsafe) {
+      if (storeType == DimensionStoreType.FIXEDLENGTH) {
+        return new UnsafeFixedLengthDimensionDataChunkStore(totalSize, 
columnValueSize,
+            isInvertedIndex, numberOfRows);
+      } else {
+        return new UnsafeVariableLengthDimesionDataChunkStore(totalSize, 
isInvertedIndex,
+            numberOfRows);
+      }
+
+    } else {
+      if (storeType == DimensionStoreType.FIXEDLENGTH) {
+        return new SafeFixedLengthDimensionDataChunkStore(isInvertedIndex, 
columnValueSize);
+      } else {
+        return new SafeVariableLengthDimensionDataChunkStore(isInvertedIndex, 
numberOfRows);
+      }
+    }
+  }
+
+  /**
+   * dimension store type enum
+   */
+  public enum DimensionStoreType {
+    FIXEDLENGTH, VARIABLELENGTH;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/store/DimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/store/DimensionDataChunkStore.java
 
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/store/DimensionDataChunkStore.java
new file mode 100644
index 0000000..a2741bb
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/store/DimensionDataChunkStore.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.core.carbon.datastore.chunk.store;
+
+/**
+ * Interface responsibility is to store dimension data in memory.
+ * storage can be on heap or offheap.
+ */
+public interface DimensionDataChunkStore {
+
+  /**
+   * Below method will be used to put the rows and its metadata in offheap
+   *
+   * @param invertedIndex        inverted index to be stored
+   * @param invertedIndexReverse inverted index reverse to be stored
+   * @param data                 data to be stored
+   */
+  void putArray(int[] invertedIndex, int[] invertedIndexReverse, byte[] data);
+
+  /**
+   * Below method will be used to get the row
+   * based on row id passed
+   *
+   * @param index
+   * @return row
+   */
+  byte[] getRow(int rowId);
+
+  /**
+   * Below method will be used to fill the row values to buffer array
+   *
+   * @param rowId  row id of the data to be filled
+   * @param data   buffer in which data will be filled
+   * @param offset off the of the buffer
+   */
+  void fillRow(int rowId, byte[] buffer, int offset);
+
+  /**
+   * Below method will be used to get the inverted index
+   *
+   * @param rowId row id
+   * @return inverted index based on row id passed
+   */
+  int getInvertedIndex(int rowId);
+
+  /**
+   * Below method will be used to get the surrogate key of the
+   * based on the row id passed
+   *
+   * @param rowId row id
+   * @return surrogate key
+   */
+  int getSurrogate(int rowId);
+
+  /**
+   * @return size of each column value
+   */
+  int getColumnValueSize();
+
+  /**
+   * @return whether column was explicitly sorted or not
+   */
+  boolean isExplicitSorted();
+
+  /**
+   * Below method will be used to free the memory occupied by
+   * the column chunk
+   */
+  void freeMemory();
+
+  /**
+   * to compare the two byte array
+   *
+   * @param index        index of first byte array
+   * @param compareValue value of to be compared
+   * @return compare result
+   */
+  int compareTo(int index, byte[] compareValue);
+}

Reply via email to