Reverting big decimal compression as it has below issue
when big decimal scale value is more then 18 then result is not accurate

Fixed big decimal reader issue


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/9e24a3dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/9e24a3dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/9e24a3dd

Branch: refs/heads/master
Commit: 9e24a3dd726a0aba87e00e1becff4cb8817519eb
Parents: 254af99
Author: ashok.blend <ashok.bl...@gmail.com>
Authored: Fri Jan 20 07:15:16 2017 +0800
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Fri Jan 20 21:02:03 2017 +0530

----------------------------------------------------------------------
 .../core/compression/BigDecimalCompressor.java  |  74 --------
 .../measure/AbstractMeasureChunkReader.java     |  10 +-
 ...CompressedMeasureChunkFileBasedReaderV1.java |   4 +-
 ...CompressedMeasureChunkFileBasedReaderV2.java |   6 +-
 .../chunk/store/MeasureChunkStoreFactory.java   |   8 +-
 .../safe/SafeBigDecimalMeasureChunkStore.java   |  98 +++++++++++
 .../UnsafeBigDecimalMeasureChunkStore.java      | 140 +++++++++++++++
 ...afeVariableLengthDimesionDataChunkStore.java |  63 ++++---
 .../compression/ValueCompressionHolder.java     |  55 +++---
 .../compression/decimal/CompressByteArray.java  | 108 ++++++++++++
 .../decimal/CompressionMaxMinByte.java          |  32 ++--
 .../decimal/CompressionMaxMinDefault.java       |  29 ++--
 .../decimal/CompressionMaxMinInt.java           |  28 +--
 .../decimal/CompressionMaxMinLong.java          |  30 ++--
 .../decimal/CompressionMaxMinShort.java         |  27 +--
 .../nondecimal/CompressionNonDecimalByte.java   |  32 ++--
 .../CompressionNonDecimalDefault.java           |  29 ++--
 .../nondecimal/CompressionNonDecimalInt.java    |  30 ++--
 .../nondecimal/CompressionNonDecimalLong.java   |  31 ++--
 .../CompressionNonDecimalMaxMinByte.java        |  34 ++--
 .../CompressionNonDecimalMaxMinDefault.java     |  30 ++--
 .../CompressionNonDecimalMaxMinInt.java         |  26 ++-
 .../CompressionNonDecimalMaxMinLong.java        |  30 ++--
 .../CompressionNonDecimalMaxMinShort.java       |  27 +--
 .../nondecimal/CompressionNonDecimalShort.java  |  30 ++--
 .../compression/none/CompressionNoneByte.java   |  28 +--
 .../none/CompressionNoneDefault.java            |  29 ++--
 .../compression/none/CompressionNoneInt.java    |  28 +--
 .../compression/none/CompressionNoneLong.java   |  26 ++-
 .../compression/none/CompressionNoneShort.java  |  28 +--
 .../compression/type/CompressionBigDecimal.java | 144 ----------------
 .../dataholder/CarbonWriteDataHolder.java       |  50 ------
 ...ractHeavyCompressedDoubleArrayDataStore.java |   3 +-
 .../core/util/BigDecimalCompressionFinder.java  |  92 ----------
 .../carbondata/core/util/DataTypeUtil.java      |  13 ++
 .../core/util/ValueCompressionUtil.java         |  59 +------
 ...mpressedMeasureChunkFileBasedReaderTest.java | 171 ++++++++-----------
 .../store/CarbonFactDataHandlerColumnar.java    |  71 +-------
 38 files changed, 851 insertions(+), 902 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9e24a3dd/core/src/main/java/org/apache/carbondata/core/compression/BigDecimalCompressor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/compression/BigDecimalCompressor.java
 
b/core/src/main/java/org/apache/carbondata/core/compression/BigDecimalCompressor.java
deleted file mode 100644
index b3cfacc..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/compression/BigDecimalCompressor.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.compression;
-
-import org.apache.carbondata.core.datastore.dataholder.CarbonWriteDataHolder;
-import org.apache.carbondata.core.util.BigDecimalCompressionFinder;
-import org.apache.carbondata.core.util.CompressionFinder;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
-
-/**
- * Big decimal data type compressor
- *
- */
-public class BigDecimalCompressor extends BigIntCompressor {
-
-  private boolean readLeft = true;
-
-  @Override
-  public Object getCompressedValues(CompressionFinder compressionFinder,
-      CarbonWriteDataHolder dataHolder, Object maxValue, int decimal) {
-    BigDecimalCompressionFinder bigdCompressionFinder =
-        (BigDecimalCompressionFinder) compressionFinder;
-    Long[] maxValues = (Long[]) maxValue;
-    Object leftCompressedValue = getCompressedValues(
-        bigdCompressionFinder.getLeftCompType(), dataHolder,
-        bigdCompressionFinder.getLeftConvertedDataType(), maxValues[0], 0);
-    readLeft = false;
-    Object rightCompressedValue = getCompressedValues(
-        bigdCompressionFinder.getRightCompType(), dataHolder,
-        bigdCompressionFinder.getRightConvertedDataType(), maxValues[1], 0);
-    return new Object[] { leftCompressedValue, rightCompressedValue };
-  }
-
-  @Override
-  protected Object compressMaxMin(DataType convertedDataType,
-      CarbonWriteDataHolder dataHolder, Object max) {
-    long maxValue = (long) max;
-    long[][] writableBigDValues = dataHolder.getWritableBigDecimalValues();
-    long[] value = null;
-    if (readLeft) {
-      value = writableBigDValues[0];
-    } else {
-      value = writableBigDValues[1];
-    }
-    return compressValue(convertedDataType, value, maxValue, true);
-  }
-
-  @Override
-  protected Object compressAdaptive(DataType convertedDataType,
-      CarbonWriteDataHolder dataHolder) {
-    long[][] writableBigDValues = dataHolder.getWritableBigDecimalValues();
-    long[] value = null;
-    if (readLeft) {
-      value = writableBigDValues[0];
-    } else {
-      value = writableBigDValues[1];
-    }
-    return compressValue(convertedDataType, value, 0, false);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9e24a3dd/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
index 656e677..611d01a 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
@@ -29,11 +29,17 @@ public abstract class AbstractMeasureChunkReader implements 
MeasureColumnChunkRe
   protected String filePath;
 
   /**
+   * number of rows for blocklet
+   */
+  protected int numberOfRows;
+
+  /**
    * Constructor to get minimum parameter to create instance of this class
    *
-   * @param filePath           file from which data will be read
+   * @param filePath file from which data will be read
    */
-  public AbstractMeasureChunkReader(String filePath) {
+  public AbstractMeasureChunkReader(String filePath, int numberOfRows) {
     this.filePath = filePath;
+    this.numberOfRows = numberOfRows;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9e24a3dd/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
index 48e78d3..750da37 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v1/CompressedMeasureChunkFileBasedReaderV1.java
@@ -48,7 +48,7 @@ public class CompressedMeasureChunkFileBasedReaderV1 extends 
AbstractMeasureChun
    */
   public CompressedMeasureChunkFileBasedReaderV1(final BlockletInfo 
blockletInfo,
       final String filePath) {
-    super(filePath);
+    super(filePath, blockletInfo.getNumberOfRows());
     this.measureColumnChunks = blockletInfo.getMeasureColumnChunk();
   }
 
@@ -90,7 +90,7 @@ public class CompressedMeasureChunkFileBasedReaderV1 extends 
AbstractMeasureChun
     // unCompress data
     values.uncompress(compressModel.getConvertedDataType(), dataPage, 0,
             measureColumnChunks.get(blockIndex).getDataPageLength(), 
compressModel.getMantissa(),
-            compressModel.getMaxValue());
+            compressModel.getMaxValue(), numberOfRows);
 
     CarbonReadDataHolder measureDataHolder = new CarbonReadDataHolder(values);
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9e24a3dd/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
index e8bdafd..d92da61 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
@@ -57,7 +57,7 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends 
AbstractMeasureChun
    */
   public CompressedMeasureChunkFileBasedReaderV2(final BlockletInfo 
blockletInfo,
       final String filePath) {
-    super(filePath);
+    super(filePath, blockletInfo.getNumberOfRows());
     this.measureColumnChunkOffsets = blockletInfo.getMeasureChunkOffsets();
     this.measureColumnChunkLength = blockletInfo.getMeasureChunksLength();
   }
@@ -166,7 +166,7 @@ public class CompressedMeasureChunkFileBasedReaderV2 
extends AbstractMeasureChun
     // uncompress
     values.uncompress(compressionModel.getConvertedDataType()[0], data,
         copyPoint, measureColumnChunk.data_page_length, 
compressionModel.getMantissa()[0],
-            compressionModel.getMaxValue()[0]);
+            compressionModel.getMaxValue()[0], numberOfRows);
 
     CarbonReadDataHolder measureDataHolder = new CarbonReadDataHolder(values);
 
@@ -217,7 +217,7 @@ public class CompressedMeasureChunkFileBasedReaderV2 
extends AbstractMeasureChun
       // uncompress
       values.uncompress(compressionModel.getConvertedDataType()[0], data, 
copyPoint,
               measureColumnChunk.data_page_length, 
compressionModel.getMantissa()[0],
-              compressionModel.getMaxValue()[0]);
+              compressionModel.getMaxValue()[0], numberOfRows);
 
       CarbonReadDataHolder measureDataHolder = new 
CarbonReadDataHolder(values);
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9e24a3dd/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/MeasureChunkStoreFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/MeasureChunkStoreFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/MeasureChunkStoreFactory.java
index d5db61e..2a3a2a7 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/MeasureChunkStoreFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/MeasureChunkStoreFactory.java
@@ -18,11 +18,13 @@
 package org.apache.carbondata.core.datastore.chunk.store;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import 
org.apache.carbondata.core.datastore.chunk.store.impl.safe.SafeBigDecimalMeasureChunkStore;
 import 
org.apache.carbondata.core.datastore.chunk.store.impl.safe.SafeByteMeasureChunkStore;
 import 
org.apache.carbondata.core.datastore.chunk.store.impl.safe.SafeDoubleMeasureChunkStore;
 import 
org.apache.carbondata.core.datastore.chunk.store.impl.safe.SafeIntMeasureChunkStore;
 import 
org.apache.carbondata.core.datastore.chunk.store.impl.safe.SafeLongMeasureChunkStore;
 import 
org.apache.carbondata.core.datastore.chunk.store.impl.safe.SafeShortMeasureChunkStore;
+import 
org.apache.carbondata.core.datastore.chunk.store.impl.unsafe.UnsafeBigDecimalMeasureChunkStore;
 import 
org.apache.carbondata.core.datastore.chunk.store.impl.unsafe.UnsafeByteMeasureChunkStore;
 import 
org.apache.carbondata.core.datastore.chunk.store.impl.unsafe.UnsafeDoubleMeasureChunkStore;
 import 
org.apache.carbondata.core.datastore.chunk.store.impl.unsafe.UnsafeIntMeasureChunkStore;
@@ -73,8 +75,9 @@ public class MeasureChunkStoreFactory {
           return new SafeIntMeasureChunkStore(numberOfRows);
         case DATA_LONG:
           return new SafeLongMeasureChunkStore(numberOfRows);
+        case DATA_BIGDECIMAL:
+          return new SafeBigDecimalMeasureChunkStore(numberOfRows);
         case DATA_DOUBLE:
-          return new SafeDoubleMeasureChunkStore(numberOfRows);
         default:
           return new SafeDoubleMeasureChunkStore(numberOfRows);
       }
@@ -88,8 +91,9 @@ public class MeasureChunkStoreFactory {
           return new UnsafeIntMeasureChunkStore(numberOfRows);
         case DATA_LONG:
           return new UnsafeLongMeasureChunkStore(numberOfRows);
+        case DATA_BIGDECIMAL:
+          return new UnsafeBigDecimalMeasureChunkStore(numberOfRows);
         case DATA_DOUBLE:
-          return new UnsafeDoubleMeasureChunkStore(numberOfRows);
         default:
           return new UnsafeDoubleMeasureChunkStore(numberOfRows);
       }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9e24a3dd/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeBigDecimalMeasureChunkStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeBigDecimalMeasureChunkStore.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeBigDecimalMeasureChunkStore.java
new file mode 100644
index 0000000..03f59fc
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeBigDecimalMeasureChunkStore.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.store.impl.safe;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
+/**
+ * Responsibility is store the big decimal measure data in memory,
+ */
+public class SafeBigDecimalMeasureChunkStore extends 
SafeAbstractMeasureDataChunkStore<byte[]> {
+
+  /**
+   * data chunk
+   */
+  private byte[] dataChunk;
+
+  /**
+   * offset of actual data
+   */
+  private int[] dataOffsets;
+
+  public SafeBigDecimalMeasureChunkStore(int numberOfRows) {
+    super(numberOfRows);
+    this.dataOffsets = new int[numberOfRows];
+  }
+
+  @Override public void putData(byte[] data) {
+    this.dataChunk = data;
+    // As data is of variable length and data format is
+    // <length in int><data><length in int><data>
+    // we need to store offset of each data so data can be accessed directly
+    // for example:
+    //data = {0,0,0,0,5,1,2,3,4,5,0,0,0,0,6,0,1,2,3,4,5,0,0,0,0,2,8,9}
+    //so value stored in offset will be position of actual data
+    // [5,14,24]
+    // to store this value we need to get the actual data length + 4 bytes 
used for storing the
+    // length
+
+    // start position will be used to store the current data position
+    int startOffset = 0;
+    // as first position will be start from 4 byte as data is stored first in 
the memory block
+    // we need to skip first two bytes this is because first two bytes will be 
length of the data
+    // which we have to skip
+    dataOffsets[0] = CarbonCommonConstants.INT_SIZE_IN_BYTE;
+    // creating a byte buffer which will wrap the length of the row
+    ByteBuffer buffer = 
ByteBuffer.allocate(CarbonCommonConstants.INT_SIZE_IN_BYTE);
+    for (int i = 1; i < numberOfRows; i++) {
+      buffer.put(data, startOffset, CarbonCommonConstants.INT_SIZE_IN_BYTE);
+      buffer.flip();
+      // so current row position will be
+      // previous row length + 4 bytes used for storing previous row data
+      startOffset += buffer.getInt() + CarbonCommonConstants.INT_SIZE_IN_BYTE;
+      // as same byte buffer is used to avoid creating many byte buffer for 
each row
+      // we need to clear the byte buffer
+      buffer.clear();
+      dataOffsets[i] = startOffset + CarbonCommonConstants.INT_SIZE_IN_BYTE;
+    }
+  }
+
+  /**
+   * to get the byte value
+   *
+   * @param index
+   * @return byte value based on index
+   */
+  @Override public BigDecimal getBigDecimal(int index) {
+    int currentDataOffset = dataOffsets[index];
+    int length = 0;
+    // calculating the length of data
+    if (index < numberOfRows - 1) {
+      length = (int) (dataOffsets[index + 1] - (currentDataOffset
+          + CarbonCommonConstants.INT_SIZE_IN_BYTE));
+    } else {
+      // for last record
+      length = (int) (this.dataChunk.length - currentDataOffset);
+    }
+    return DataTypeUtil.byteToBigDecimal(dataChunk, currentDataOffset, length);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9e24a3dd/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeBigDecimalMeasureChunkStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeBigDecimalMeasureChunkStore.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeBigDecimalMeasureChunkStore.java
new file mode 100644
index 0000000..52bd74b
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeBigDecimalMeasureChunkStore.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.store.impl.unsafe;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.core.memory.MemoryAllocatorFactory;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
+/**
+ * Responsible for storing big decimal array data to memory. memory can be on 
heap or
+ * offheap based on the user configuration using unsafe interface
+ */
+public class UnsafeBigDecimalMeasureChunkStore extends 
UnsafeAbstractMeasureDataChunkStore<byte[]> {
+
+  /**
+   * start position of data offsets
+   */
+  private long offsetStartPosition;
+
+  public UnsafeBigDecimalMeasureChunkStore(int numberOfRows) {
+    super(numberOfRows);
+  }
+
+  @Override public void putData(byte[] data) {
+    assert (!this.isMemoryOccupied);
+    this.dataPageMemoryBlock = 
MemoryAllocatorFactory.INSATANCE.getMemoryAllocator()
+        .allocate(data.length + (numberOfRows * 
CarbonCommonConstants.INT_SIZE_IN_BYTE));
+    this.offsetStartPosition = data.length;
+    // copy the data to memory
+    CarbonUnsafe.unsafe
+        .copyMemory(data, CarbonUnsafe.BYTE_ARRAY_OFFSET, 
dataPageMemoryBlock.getBaseObject(),
+            dataPageMemoryBlock.getBaseOffset(), dataPageMemoryBlock.size());
+    // As data is of variable length and data format is
+    // <length in short><data><length in short><data>
+    // we need to store offset of each data so data can be accessed directly
+    // for example:
+    //data = {0,0,0,0,5,1,2,3,4,5,0,0,0,0,6,0,1,2,3,4,5,0,0,0,0,2,8,9}
+    //so value stored in offset will be position of actual data
+    // [5,14,24]
+    // to store this value we need to get the actual data length + 4 bytes 
used for storing the
+    // length
+    // start position will be used to store the current data position
+    int startOffset = 0;
+    // position from where offsets will start
+    long pointerOffsets = this.offsetStartPosition;
+    // as first position will be start from 4 byte as data is stored first in 
the memory block
+    // we need to skip first two bytes this is because first two bytes will be 
length of the data
+    // which we have to skip
+    CarbonUnsafe.unsafe.putInt(dataPageMemoryBlock.getBaseObject(),
+        dataPageMemoryBlock.getBaseOffset() + pointerOffsets,
+        CarbonCommonConstants.INT_SIZE_IN_BYTE);
+    // incrementing the pointers as first value is already filled and as we 
are storing as int
+    // we need to increment the 4 bytes to set the position of the next value 
to set
+    pointerOffsets += CarbonCommonConstants.INT_SIZE_IN_BYTE;
+    // creating a byte buffer which will wrap the length of the row
+    // using byte buffer as unsafe will return bytes in little-endian encoding
+    ByteBuffer buffer = 
ByteBuffer.allocate(CarbonCommonConstants.INT_SIZE_IN_BYTE);
+    // store length of data
+    byte[] length = new byte[CarbonCommonConstants.INT_SIZE_IN_BYTE];
+    // as first offset is already stored, we need to start from the 2nd row in 
data array
+    for (int i = 1; i < numberOfRows; i++) {
+      // first copy the length of previous row
+      CarbonUnsafe.unsafe.copyMemory(dataPageMemoryBlock.getBaseObject(),
+          dataPageMemoryBlock.getBaseOffset() + startOffset, length, 
CarbonUnsafe.BYTE_ARRAY_OFFSET,
+          CarbonCommonConstants.INT_SIZE_IN_BYTE);
+      buffer.put(length);
+      buffer.flip();
+      // so current row position will be
+      // previous row length + 4 bytes used for storing previous row data
+      startOffset += CarbonCommonConstants.INT_SIZE_IN_BYTE + buffer.getInt();
+      // as same byte buffer is used to avoid creating many byte buffer for 
each row
+      // we need to clear the byte buffer
+      buffer.clear();
+      // now put the offset of current row, here we need to add 4 more bytes 
as current will
+      // also have length part so we have to skip length
+      CarbonUnsafe.unsafe.putInt(dataPageMemoryBlock.getBaseObject(),
+          dataPageMemoryBlock.getBaseOffset() + pointerOffsets,
+          startOffset + CarbonCommonConstants.INT_SIZE_IN_BYTE);
+      // incrementing the pointers as first value is already filled and as we 
are storing as int
+      // we need to increment the 4 bytes to set the position of the next 
value to set
+      pointerOffsets += CarbonCommonConstants.INT_SIZE_IN_BYTE;
+
+      this.isMemoryOccupied = true;
+    }
+  }
+
+  /**
+   * to get the byte value
+   *
+   * @param index
+   * @return byte value based on index
+   */
+  @Override public BigDecimal getBigDecimal(int index) {
+    // now to get the row from memory block we need to do following thing
+    // 1. first get the current offset
+    // 2. if it's not a last row- get the next row offset
+    // Subtract the current row offset + 4 bytes(to skip the data length) with 
next row offset
+    // else subtract the current row offset
+    // with complete data length get the offset of set of data
+    int currentDataOffset = 
CarbonUnsafe.unsafe.getInt(dataPageMemoryBlock.getBaseObject(),
+        dataPageMemoryBlock.getBaseOffset() + this.offsetStartPosition + (index
+            * CarbonCommonConstants.INT_SIZE_IN_BYTE));
+    int length = 0;
+    // calculating the length of data
+    if (index < numberOfRows - 1) {
+      int OffsetOfNextdata = 
CarbonUnsafe.unsafe.getInt(dataPageMemoryBlock.getBaseObject(),
+          dataPageMemoryBlock.getBaseOffset() + this.offsetStartPosition + 
((index + 1)
+              * CarbonCommonConstants.INT_SIZE_IN_BYTE));
+      length =
+          (short) (OffsetOfNextdata - (currentDataOffset + 
CarbonCommonConstants.INT_SIZE_IN_BYTE));
+    } else {
+      // for last record we need to subtract with data length
+      length = (short) (this.offsetStartPosition - currentDataOffset);
+    }
+    byte[] row = new byte[length];
+    CarbonUnsafe.unsafe.copyMemory(dataPageMemoryBlock.getBaseObject(),
+        dataPageMemoryBlock.getBaseOffset() + currentDataOffset, row,
+        CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
+    return DataTypeUtil.byteToBigDecimal(row);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9e24a3dd/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java
index 6d9ad31..10ac5cd 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimesionDataChunkStore.java
@@ -122,42 +122,37 @@ public class UnsafeVariableLengthDimesionDataChunkStore
    * @return row
    */
   @Override public byte[] getRow(int rowId) {
-    byte[] data = null;
-    try {
-      // if column was explicitly sorted we need to get the rowid based 
inverted index reverse
-      if (isExplicitSorted) {
-        rowId = CarbonUnsafe.unsafe.getInt(dataPageMemoryBlock.getBaseObject(),
-            dataPageMemoryBlock.getBaseOffset() + 
this.invertedIndexReverseOffset + (rowId
-                * CarbonCommonConstants.INT_SIZE_IN_BYTE));
-      }
-      // now to get the row from memory block we need to do following thing
-      // 1. first get the current offset
-      // 2. if it's not a last row- get the next row offset
-      // Subtract the current row offset + 2 bytes(to skip the data length) 
with next row offset
-      // else subtract the current row offset + 2 bytes(to skip the data 
length)
-      // with complete data length
-      int currentDataOffset = 
CarbonUnsafe.unsafe.getInt(dataPageMemoryBlock.getBaseObject(),
-          dataPageMemoryBlock.getBaseOffset() + this.dataPointersOffsets + 
(rowId
+    // if column was explicitly sorted we need to get the rowid based inverted 
index reverse
+    if (isExplicitSorted) {
+      rowId = CarbonUnsafe.unsafe.getInt(dataPageMemoryBlock.getBaseObject(),
+          dataPageMemoryBlock.getBaseOffset() + 
this.invertedIndexReverseOffset + (rowId
               * CarbonCommonConstants.INT_SIZE_IN_BYTE));
-      short length = 0;
-      // calculating the length of data
-      if (rowId < numberOfRows - 1) {
-        int OffsetOfNextdata = 
CarbonUnsafe.unsafe.getInt(dataPageMemoryBlock.getBaseObject(),
-            dataPageMemoryBlock.getBaseOffset() + this.dataPointersOffsets + 
((rowId + 1)
-                * CarbonCommonConstants.INT_SIZE_IN_BYTE));
-        length = (short) (OffsetOfNextdata - (currentDataOffset
-            + CarbonCommonConstants.SHORT_SIZE_IN_BYTE));
-      } else {
-        // for last record we need to subtract with data length
-        length = (short) (this.dataLength - currentDataOffset);
-      }
-      data = new byte[length];
-      CarbonUnsafe.unsafe.copyMemory(dataPageMemoryBlock.getBaseObject(),
-          dataPageMemoryBlock.getBaseOffset() + currentDataOffset, data,
-          CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
-    } catch (Throwable t) {
-      System.out.println();
     }
+    // now to get the row from memory block we need to do following thing
+    // 1. first get the current offset
+    // 2. if it's not a last row- get the next row offset
+    // Subtract the current row offset + 2 bytes(to skip the data length) with 
next row offset
+    // else subtract the current row offset + 2 bytes(to skip the data length)
+    // with complete data length
+    int currentDataOffset = 
CarbonUnsafe.unsafe.getInt(dataPageMemoryBlock.getBaseObject(),
+        dataPageMemoryBlock.getBaseOffset() + this.dataPointersOffsets + (rowId
+            * CarbonCommonConstants.INT_SIZE_IN_BYTE));
+    short length = 0;
+    // calculating the length of data
+    if (rowId < numberOfRows - 1) {
+      int OffsetOfNextdata = 
CarbonUnsafe.unsafe.getInt(dataPageMemoryBlock.getBaseObject(),
+          dataPageMemoryBlock.getBaseOffset() + this.dataPointersOffsets + 
((rowId + 1)
+              * CarbonCommonConstants.INT_SIZE_IN_BYTE));
+      length = (short) (OffsetOfNextdata - (currentDataOffset
+          + CarbonCommonConstants.SHORT_SIZE_IN_BYTE));
+    } else {
+      // for last record we need to subtract with data length
+      length = (short) (this.dataLength - currentDataOffset);
+    }
+    byte[] data = new byte[length];
+    CarbonUnsafe.unsafe.copyMemory(dataPageMemoryBlock.getBaseObject(),
+        dataPageMemoryBlock.getBaseOffset() + currentDataOffset, data,
+        CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
     return data;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9e24a3dd/core/src/main/java/org/apache/carbondata/core/datastore/compression/ValueCompressionHolder.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/ValueCompressionHolder.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/ValueCompressionHolder.java
index 26cf236..9b3a18a 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/ValueCompressionHolder.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/ValueCompressionHolder.java
@@ -21,8 +21,6 @@ import java.math.BigDecimal;
 
 import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
-
-
 /**
  * ValueCompressionHolder is the base class for handling
  * compression / decompression of the measure data chunk
@@ -36,70 +34,80 @@ public abstract class ValueCompressionHolder<T> {
 
   /**
    * @param compressor the compressor used to decompress the data
-   * @param dataType data type of the data
-   * @param data compressed data
+   * @param dataType   data type of the data
+   * @param data       compressed data
    */
-  public void unCompress(Compressor compressor, DataType dataType, byte[] data,
-      int offset, int length) {
+  protected void unCompress(Compressor compressor, DataType dataType, byte[] 
data, int offset,
+      int length, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     switch (dataType) {
       case DATA_BYTE:
-        setValue((T)compressor.unCompressByte(data, offset, length));
+        setValue((T) compressor.unCompressByte(data, offset, length), 
numberOfRows, maxValueObject,
+            decimalPlaces);
         break;
       case DATA_SHORT:
-        setValue((T)compressor.unCompressShort(data, offset, length));
+        setValue((T) compressor.unCompressShort(data, offset, length), 
numberOfRows, maxValueObject,
+            decimalPlaces);
         break;
       case DATA_INT:
-        setValue((T)compressor.unCompressInt(data, offset, length));
+        setValue((T) compressor.unCompressInt(data, offset, length), 
numberOfRows, maxValueObject,
+            decimalPlaces);
         break;
       case DATA_LONG:
       case DATA_BIGINT:
-        setValue((T)compressor.unCompressLong(data, offset, length));
+        setValue((T) compressor.unCompressLong(data, offset, length), 
numberOfRows, maxValueObject,
+            decimalPlaces);
         break;
       case DATA_FLOAT:
-        setValue((T)compressor.unCompressFloat(data, offset, length));
+        setValue((T) compressor.unCompressFloat(data, offset, length), 
numberOfRows, maxValueObject,
+            decimalPlaces);
         break;
       default:
-        setValue((T)compressor.unCompressDouble(data, offset, length));
+        setValue((T) compressor.unCompressDouble(data, offset, length), 
numberOfRows,
+            maxValueObject, decimalPlaces);
         break;
     }
   }
 
   /**
    * @param compressor the compressor used to compress the data
-   * @param dataType data type of the data
-   * @param data original data
+   * @param dataType   data type of the data
+   * @param data       original data
    */
   public byte[] compress(Compressor compressor, DataType dataType, Object 
data) {
     switch (dataType) {
       case DATA_BYTE:
-        return compressor.compressByte((byte[])data);
+        return compressor.compressByte((byte[]) data);
       case DATA_SHORT:
-        return compressor.compressShort((short[])data);
+        return compressor.compressShort((short[]) data);
       case DATA_INT:
-        return compressor.compressInt((int[])data);
+        return compressor.compressInt((int[]) data);
       case DATA_LONG:
       case DATA_BIGINT:
-        return compressor.compressLong((long[])data);
+        return compressor.compressLong((long[]) data);
       case DATA_FLOAT:
-        return compressor.compressFloat((float[])data);
+        return compressor.compressFloat((float[]) data);
       case DATA_DOUBLE:
       default:
-        return compressor.compressDouble((double[])data);
+        return compressor.compressDouble((double[]) data);
     }
   }
 
   public abstract void setValue(T value);
 
+  public abstract void setValue(T data, int numberOfRows, Object 
maxValueObject, int decimalPlaces);
+
   public abstract T getValue();
 
   public abstract void setValueInBytes(byte[] value);
 
   public abstract void compress();
 
-  public abstract void uncompress(DataType dataType, byte[] compressData, int 
offset,
-      int length, int decimal, Object maxValueObject);
+  public abstract void uncompress(DataType dataType, byte[] compressData, int 
offset, int length,
+      int decimal, Object maxValueObject, int numberOfRows);
 
-  public byte[] getCompressedData() { return compressedValue; }
+  public byte[] getCompressedData() {
+    return compressedValue;
+  }
 
   public abstract long getLongValue(int index);
 
@@ -109,5 +117,4 @@ public abstract class ValueCompressionHolder<T> {
 
   public abstract void freeMemory();
 
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9e24a3dd/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressByteArray.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressByteArray.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressByteArray.java
new file mode 100644
index 0000000..b5e3a9d
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressByteArray.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.carbondata.core.datastore.compression.decimal;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import 
org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+public class CompressByteArray extends ValueCompressionHolder<byte[]> {
+
+  /**
+   * compressor.
+   */
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
+
+  private MeasureDataChunkStore<byte[]> measureChunkStore;
+
+  /**
+   * value.
+   */
+  private byte[] value;
+
+  @Override public void setValue(byte[] value) {
+    this.value = value;
+
+  }
+
+  @Override public void setValueInBytes(byte[] value) {
+    this.value = value;
+
+  }
+
+  @Override public void compress() {
+    compressedValue = compressor.compressByte(value);
+  }
+
+  @Override public void uncompress(DataType dataType, byte[] compressedData, 
int offset, int length,
+      int decimal, Object maxValueObject, int numberOfRows) {
+    super.unCompress(compressor, dataType, compressedData, offset, length, 
numberOfRows,
+        maxValueObject, decimal);
+  }
+
+  @Override public byte[] getValue() {
+    List<byte[]> valsList = new 
ArrayList<byte[]>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    ByteBuffer buffer = ByteBuffer.wrap(value);
+    buffer.rewind();
+    int length = 0;
+    byte[] actualValue = null;
+    //CHECKSTYLE:OFF    Approval No:Approval-367
+    while (buffer.hasRemaining()) {//CHECKSTYLE:ON
+      length = buffer.getInt();
+      actualValue = new byte[length];
+      buffer.get(actualValue);
+      valsList.add(actualValue);
+
+    }
+    return valsList.get(0);
+  }
+
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException("Get long value is not defined for 
CompressByteArray");
+  }
+
+  @Override public double getDoubleValue(int index) {
+    throw new UnsupportedOperationException(
+        "Get double value is not defined for CompressByteArray");
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    return this.measureChunkStore.getBigDecimal(index);
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+
+  @Override
+  public void setValue(byte[] data, int numberOfRows, Object maxValueObject, 
int decimalPlaces) {
+    this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
+        .getMeasureDataChunkStore(DataType.DATA_BIGDECIMAL, numberOfRows);
+    this.measureChunkStore.putData(data);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9e24a3dd/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinByte.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinByte.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinByte.java
index e5cbce9..ed50cd3 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinByte.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinByte.java
@@ -59,7 +59,9 @@ public class CompressionMaxMinByte extends 
ValueCompressionHolder<byte[]> {
     this.actualDataType = actualDataType;
   }
 
-  @Override public byte[] getValue() {return this.value; }
+  @Override public byte[] getValue() {
+    return this.value;
+  }
 
   @Override public void setValue(byte[] value) {
     this.value = value;
@@ -69,12 +71,10 @@ public class CompressionMaxMinByte extends 
ValueCompressionHolder<byte[]> {
     compressedValue = super.compress(compressor, DataType.DATA_BYTE, value);
   }
 
-  @Override
-  public void uncompress(DataType dataType, byte[] compressedData, int offset, 
int length,
-      int decimalPlaces, Object maxValueObject) {
-    super.unCompress(compressor, dataType, compressedData, offset, length);
-    setUncompressedValues(value, maxValueObject);
-
+  @Override public void uncompress(DataType dataType, byte[] compressedData, 
int offset, int length,
+      int decimalPlaces, Object maxValueObject, int numberOfRows) {
+    super.unCompress(compressor, dataType, compressedData, offset, length, 
numberOfRows,
+        maxValueObject, decimalPlaces);
   }
 
   @Override public void setValueInBytes(byte[] value) {
@@ -93,21 +93,23 @@ public class CompressionMaxMinByte extends 
ValueCompressionHolder<byte[]> {
 
   @Override public BigDecimal getBigDecimalValue(int index) {
     throw new UnsupportedOperationException(
-      "Big decimal value is not defined for CompressionMaxMinByte");
+        "Big decimal value is not defined for CompressionMaxMinByte");
   }
 
-  private void setUncompressedValues(byte[] data, Object maxValueObject) {
-    this.measureChunkStore =
-        
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_BYTE, 
data.length);
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+
+  @Override
+  public void setValue(byte[] data, int numberOfRows, Object maxValueObject, 
int decimalPlaces) {
+    this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
+        .getMeasureDataChunkStore(DataType.DATA_BYTE, numberOfRows);
     this.measureChunkStore.putData(data);
     if (maxValueObject instanceof Long) {
-      this.maxValue = (long)maxValueObject;
+      this.maxValue = (long) maxValueObject;
     } else {
       this.maxValue = (double) maxValueObject;
     }
-  }
 
-  @Override public void freeMemory() {
-    this.measureChunkStore.freeMemory();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9e24a3dd/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinDefault.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinDefault.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinDefault.java
index 2b1e6fc..6550c46 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinDefault.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinDefault.java
@@ -30,7 +30,6 @@ import 
org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
 import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
-
 public class CompressionMaxMinDefault extends ValueCompressionHolder<double[]> 
{
 
   /**
@@ -66,16 +65,18 @@ public class CompressionMaxMinDefault extends 
ValueCompressionHolder<double[]> {
     this.value = value;
   }
 
-  @Override public double[] getValue() {return this.value; }
+  @Override public double[] getValue() {
+    return this.value;
+  }
 
   @Override public void compress() {
     compressedValue = super.compress(compressor, DataType.DATA_DOUBLE, value);
   }
 
-  @Override public void uncompress(DataType dataType, byte[] compressedData,
-      int offset, int length, int decimalPlaces, Object maxValueObject) {
-    super.unCompress(compressor, dataType, compressedData, offset, length);
-    setUncompressedValues(value, maxValueObject);
+  @Override public void uncompress(DataType dataType, byte[] compressedData, 
int offset, int length,
+      int decimalPlaces, Object maxValueObject, int numberOfRows) {
+    super.unCompress(compressor, dataType, compressedData, offset, length, 
numberOfRows,
+        maxValueObject, decimalPlaces);
   }
 
   @Override public void setValueInBytes(byte[] value) {
@@ -85,7 +86,7 @@ public class CompressionMaxMinDefault extends 
ValueCompressionHolder<double[]> {
 
   @Override public long getLongValue(int index) {
     throw new UnsupportedOperationException(
-      "Long value is not defined for CompressionMaxMinDefault");
+        "Long value is not defined for CompressionMaxMinDefault");
   }
 
   @Override public double getDoubleValue(int index) {
@@ -95,21 +96,23 @@ public class CompressionMaxMinDefault extends 
ValueCompressionHolder<double[]> {
 
   @Override public BigDecimal getBigDecimalValue(int index) {
     throw new UnsupportedOperationException(
-      "Big decimal value is not defined for CompressionMaxMinDefault");
+        "Big decimal value is not defined for CompressionMaxMinDefault");
   }
 
-  private void setUncompressedValues(double[] data, Object maxValueObject) {
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+
+  @Override
+  public void setValue(double[] data, int numberOfRows, Object maxValueObject, 
int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-            .getMeasureDataChunkStore(DataType.DATA_DOUBLE, data.length);
+        .getMeasureDataChunkStore(DataType.DATA_DOUBLE, numberOfRows);
     this.measureChunkStore.putData(data);
     if (maxValueObject instanceof Long) {
       this.maxValue = (long) maxValueObject;
     } else {
       this.maxValue = (double) maxValueObject;
     }
-  }
 
-  @Override public void freeMemory() {
-    this.measureChunkStore.freeMemory();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9e24a3dd/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinInt.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinInt.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinInt.java
index 22d3776..512cf2c 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinInt.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinInt.java
@@ -61,13 +61,14 @@ public class CompressionMaxMinInt extends 
ValueCompressionHolder<int[]> {
     this.value = value;
   }
 
-  @Override public int[] getValue() { return this.value; }
+  @Override public int[] getValue() {
+    return this.value;
+  }
 
-  @Override
-  public void uncompress(DataType dataType, byte[] compressedData, int offset, 
int length,
-      int decimalPlaces, Object maxValueObject) {
-    super.unCompress(compressor, dataType, compressedData, offset, length);
-    setUncompressedValues(value, maxValueObject);
+  @Override public void uncompress(DataType dataType, byte[] compressedData, 
int offset, int length,
+      int decimalPlaces, Object maxValueObject, int numberOfRows) {
+    super.unCompress(compressor, dataType, compressedData, offset, length, 
numberOfRows,
+        maxValueObject, decimalPlaces);
   }
 
   @Override public void compress() {
@@ -91,12 +92,17 @@ public class CompressionMaxMinInt extends 
ValueCompressionHolder<int[]> {
 
   @Override public BigDecimal getBigDecimalValue(int index) {
     throw new UnsupportedOperationException(
-      "Big decimal value is not defined for CompressionMaxMinInt");
+        "Big decimal value is not defined for CompressionMaxMinInt");
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
   }
 
-  private void setUncompressedValues(int[] data, Object maxValueObject) {
+  @Override
+  public void setValue(int[] data, int numberOfRows, Object maxValueObject, 
int decimalPlaces) {
     this.measureChunkStore =
-        
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_INT, 
data.length);
+        
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_INT, 
numberOfRows);
     this.measureChunkStore.putData(data);
     if (maxValueObject instanceof Long) {
       this.maxValue = (long) maxValueObject;
@@ -104,8 +110,4 @@ public class CompressionMaxMinInt extends 
ValueCompressionHolder<int[]> {
       this.maxValue = (double) maxValueObject;
     }
   }
-
-  @Override public void freeMemory() {
-    this.measureChunkStore.freeMemory();
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9e24a3dd/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinLong.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinLong.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinLong.java
index 16589e5..ca91d44 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinLong.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinLong.java
@@ -65,14 +65,15 @@ public class CompressionMaxMinLong extends 
ValueCompressionHolder<long[]> {
 
   }
 
-  @Override
-  public void uncompress(DataType dataType, byte[] compressedData, int offset, 
int length,
-      int decimalPlaces, Object maxValueObject) {
-    super.unCompress(compressor, dataType, compressedData, offset, length);
-    setUncompressValues(value, maxValueObject);
+  @Override public void uncompress(DataType dataType, byte[] compressedData, 
int offset, int length,
+      int decimalPlaces, Object maxValueObject, int numberOfRows) {
+    super.unCompress(compressor, dataType, compressedData, offset, length, 
numberOfRows,
+        maxValueObject, decimalPlaces);
   }
 
-  @Override public long[] getValue() {return this.value; }
+  @Override public long[] getValue() {
+    return this.value;
+  }
 
   @Override public void setValueInBytes(byte[] value) {
     ByteBuffer buffer = ByteBuffer.wrap(value);
@@ -91,12 +92,17 @@ public class CompressionMaxMinLong extends 
ValueCompressionHolder<long[]> {
 
   @Override public BigDecimal getBigDecimalValue(int index) {
     throw new UnsupportedOperationException(
-      "Big decimal value is not defined for CompressionMaxMinLong");
+        "Big decimal value is not defined for CompressionMaxMinLong");
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
   }
 
-  private void setUncompressValues(long[] data, Object maxValueObject) {
-    this.measureChunkStore =
-        
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_LONG, 
data.length);
+  @Override
+  public void setValue(long[] data, int numberOfRows, Object maxValueObject, 
int decimalPlaces) {
+    this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
+        .getMeasureDataChunkStore(DataType.DATA_LONG, numberOfRows);
     this.measureChunkStore.putData(data);
     if (maxValueObject instanceof Long) {
       this.maxValue = (long) maxValueObject;
@@ -104,8 +110,4 @@ public class CompressionMaxMinLong extends 
ValueCompressionHolder<long[]> {
       this.maxValue = (double) maxValueObject;
     }
   }
-
-  @Override public void freeMemory() {
-    this.measureChunkStore.freeMemory();
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9e24a3dd/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinShort.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinShort.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinShort.java
index 8e2b889..d44875a 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinShort.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinShort.java
@@ -63,14 +63,15 @@ public class CompressionMaxMinShort extends 
ValueCompressionHolder<short[]> {
 
   }
 
-  @Override
-  public void uncompress(DataType dataType, byte[] compressedData, int offset, 
int length,
-      int decimalPlaces, Object maxValueObject) {
-    super.unCompress(compressor, dataType, compressedData, offset, length);
-    setUncompressedValues(value, maxValueObject);
+  @Override public void uncompress(DataType dataType, byte[] compressedData, 
int offset, int length,
+      int decimalPlaces, Object maxValueObject, int numberOfRows) {
+    super.unCompress(compressor, dataType, compressedData, offset, length, 
numberOfRows,
+        maxValueObject, decimalPlaces);
   }
 
-  @Override public short[] getValue() {return this.value; }
+  @Override public short[] getValue() {
+    return this.value;
+  }
 
   @Override public void compress() {
     compressedValue = super.compress(compressor, DataType.DATA_SHORT, value);
@@ -93,21 +94,23 @@ public class CompressionMaxMinShort extends 
ValueCompressionHolder<short[]> {
 
   @Override public BigDecimal getBigDecimalValue(int index) {
     throw new UnsupportedOperationException(
-      "Big decimal value is not defined for CompressionMaxMinShort");
+        "Big decimal value is not defined for CompressionMaxMinShort");
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
   }
 
-  private void setUncompressedValues(short[] data, Object maxValueObject) {
+  @Override
+  public void setValue(short[] data, int numberOfRows, Object maxValueObject, 
int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_SHORT, data.length);
+        .getMeasureDataChunkStore(DataType.DATA_SHORT, numberOfRows);
     this.measureChunkStore.putData(data);
     if (maxValueObject instanceof Long) {
       this.maxValue = (long) maxValueObject;
     } else {
       this.maxValue = (double) maxValueObject;
     }
-  }
 
-  @Override public void freeMemory() {
-    this.measureChunkStore.freeMemory();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9e24a3dd/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalByte.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalByte.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalByte.java
index bc309a5..13f962f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalByte.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalByte.java
@@ -53,17 +53,18 @@ public class CompressionNonDecimalByte extends 
ValueCompressionHolder<byte[]> {
     this.value = value;
   }
 
-  @Override public byte[] getValue() {return this.value; }
+  @Override public byte[] getValue() {
+    return this.value;
+  }
 
   @Override public void compress() {
     compressedValue = super.compress(compressor, DataType.DATA_BYTE, value);
   }
 
-  @Override
-  public void uncompress(DataType dataType, byte[] compressedData,
-      int offset, int length, int decimalPlaces, Object maxValueObject) {
-    super.unCompress(compressor, dataType, compressedData, offset, length);
-    setUncompressedValues(value, decimalPlaces);
+  @Override public void uncompress(DataType dataType, byte[] compressedData, 
int offset, int length,
+      int decimalPlaces, Object maxValueObject, int numberOfRows) {
+    super.unCompress(compressor, dataType, compressedData, offset, length, 
numberOfRows,
+        maxValueObject, decimalPlaces);
   }
 
   @Override public void setValueInBytes(byte[] value) {
@@ -72,7 +73,7 @@ public class CompressionNonDecimalByte extends 
ValueCompressionHolder<byte[]> {
 
   @Override public long getLongValue(int index) {
     throw new UnsupportedOperationException(
-      "Long value is not defined for CompressionNonDecimalByte");
+        "Long value is not defined for CompressionNonDecimalByte");
   }
 
   @Override public double getDoubleValue(int index) {
@@ -81,17 +82,18 @@ public class CompressionNonDecimalByte extends 
ValueCompressionHolder<byte[]> {
 
   @Override public BigDecimal getBigDecimalValue(int index) {
     throw new UnsupportedOperationException(
-      "Big decimal value is not defined for CompressionNonDecimalByte");
-  }
-
-  private void setUncompressedValues(byte[] data, int decimalPlaces) {
-    this.measureChunkStore =
-        
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_BYTE, 
data.length);
-    this.measureChunkStore.putData(data);
-    this.divisionFactory = Math.pow(10, decimalPlaces);
+        "Big decimal value is not defined for CompressionNonDecimalByte");
   }
 
   @Override public void freeMemory() {
     this.measureChunkStore.freeMemory();
   }
+
+  @Override
+  public void setValue(byte[] data, int numberOfRows, Object maxValueObject, 
int decimalPlaces) {
+    this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
+        .getMeasureDataChunkStore(DataType.DATA_BYTE, numberOfRows);
+    this.measureChunkStore.putData(data);
+    this.divisionFactory = Math.pow(10, decimalPlaces);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9e24a3dd/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalDefault.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalDefault.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalDefault.java
index aea48e9..15c7bb3 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalDefault.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalDefault.java
@@ -49,11 +49,10 @@ public class CompressionNonDecimalDefault extends 
ValueCompressionHolder<double[
 
   private double divisionFactory;
 
-  @Override
-  public void uncompress(DataType dataType, byte[] compressedData, int offset, 
int length,
-      int decimalPlaces, Object maxValueObject) {
-    super.unCompress(compressor, dataType, compressedData, offset, length);
-    setUncompressedValues(value, decimalPlaces);
+  @Override public void uncompress(DataType dataType, byte[] compressedData, 
int offset, int length,
+      int decimalPlaces, Object maxValueObject, int numberOfRows) {
+    super.unCompress(compressor, dataType, compressedData, offset, length, 
numberOfRows,
+        maxValueObject, decimalPlaces);
   }
 
   @Override public void compress() {
@@ -65,7 +64,9 @@ public class CompressionNonDecimalDefault extends 
ValueCompressionHolder<double[
 
   }
 
-  @Override public double[] getValue() {return this.value; }
+  @Override public double[] getValue() {
+    return this.value;
+  }
 
   @Override public void setValueInBytes(byte[] value) {
     ByteBuffer buffer = ByteBuffer.wrap(value);
@@ -74,7 +75,7 @@ public class CompressionNonDecimalDefault extends 
ValueCompressionHolder<double[
 
   @Override public long getLongValue(int index) {
     throw new UnsupportedOperationException(
-      "Long value is not defined for CompressionNonDecimalDefault");
+        "Long value is not defined for CompressionNonDecimalDefault");
   }
 
   @Override public double getDoubleValue(int index) {
@@ -83,17 +84,19 @@ public class CompressionNonDecimalDefault extends 
ValueCompressionHolder<double[
 
   @Override public BigDecimal getBigDecimalValue(int index) {
     throw new UnsupportedOperationException(
-      "Big decimal value is not defined for CompressionNonDecimalDefault");
+        "Big decimal value is not defined for CompressionNonDecimalDefault");
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
   }
 
-  private void setUncompressedValues(double[] data, int decimalPlaces) {
+  @Override
+  public void setValue(double[] data, int numberOfRows, Object maxValueObject, 
int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_DOUBLE, data.length);
+        .getMeasureDataChunkStore(DataType.DATA_DOUBLE, numberOfRows);
     this.measureChunkStore.putData(data);
     this.divisionFactory = Math.pow(10, decimalPlaces);
-  }
 
-  @Override public void freeMemory() {
-    this.measureChunkStore.freeMemory();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9e24a3dd/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalInt.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalInt.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalInt.java
index 2b27eff..7e3606e 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalInt.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalInt.java
@@ -53,7 +53,9 @@ public class CompressionNonDecimalInt extends 
ValueCompressionHolder<int[]> {
     this.value = value;
   }
 
-  @Override public int[] getValue() { return this.value; }
+  @Override public int[] getValue() {
+    return this.value;
+  }
 
   @Override public void compress() {
     compressedValue = super.compress(compressor, DataType.DATA_INT, value);
@@ -64,16 +66,15 @@ public class CompressionNonDecimalInt extends 
ValueCompressionHolder<int[]> {
     this.value = ValueCompressionUtil.convertToIntArray(buffer, 
bytesArr.length);
   }
 
-  @Override
-  public void uncompress(DataType dataType, byte[] compressedData, int offset, 
int length,
-      int decimalPlaces, Object maxValueObject) {
-    super.unCompress(compressor, dataType, compressedData, offset, length);
-    setUncompressedValues(value, decimalPlaces);
+  @Override public void uncompress(DataType dataType, byte[] compressedData, 
int offset, int length,
+      int decimalPlaces, Object maxValueObject, int numberOfRows) {
+    super.unCompress(compressor, dataType, compressedData, offset, length, 
numberOfRows,
+        maxValueObject, decimalPlaces);
   }
 
   @Override public long getLongValue(int index) {
     throw new UnsupportedOperationException(
-      "Long value is not defined for CompressionNonDecimalInt");
+        "Long value is not defined for CompressionNonDecimalInt");
   }
 
   @Override public double getDoubleValue(int index) {
@@ -82,17 +83,18 @@ public class CompressionNonDecimalInt extends 
ValueCompressionHolder<int[]> {
 
   @Override public BigDecimal getBigDecimalValue(int index) {
     throw new UnsupportedOperationException(
-      "Big decimal value is not defined for CompressionNonDecmialInt");
+        "Big decimal value is not defined for CompressionNonDecmialInt");
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
   }
 
-  private void setUncompressedValues(int[] data, int decimalPlaces) {
+  @Override
+  public void setValue(int[] data, int numberOfRows, Object maxValueObject, 
int decimalPlaces) {
     this.measureChunkStore =
-        
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_INT, 
data.length);
+        
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_INT, 
numberOfRows);
     this.measureChunkStore.putData(data);
     this.divisionFactory = Math.pow(10, decimalPlaces);
   }
-
-  @Override public void freeMemory() {
-    this.measureChunkStore.freeMemory();
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9e24a3dd/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalLong.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalLong.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalLong.java
index 55d250f..d810972 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalLong.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalLong.java
@@ -55,16 +55,18 @@ public class CompressionNonDecimalLong extends 
ValueCompressionHolder<long[]> {
     this.value = value;
   }
 
-  @Override public long[] getValue() { return this.value; }
+  @Override public long[] getValue() {
+    return this.value;
+  }
 
   @Override public void compress() {
     compressedValue = super.compress(compressor, DataType.DATA_LONG, value);
   }
 
-  @Override public void uncompress(DataType dataType, byte[] compressedData, 
int offset,
-      int length, int decimalPlaces, Object maxValueObject) {
-    super.unCompress(compressor, dataType, compressedData, offset, length);
-    setUncompressedValues(value, decimalPlaces);
+  @Override public void uncompress(DataType dataType, byte[] compressedData, 
int offset, int length,
+      int decimalPlaces, Object maxValueObject, int numberOfRows) {
+    super.unCompress(compressor, dataType, compressedData, offset, length, 
numberOfRows,
+        maxValueObject, decimalPlaces);
   }
 
   @Override public void setValueInBytes(byte[] bytes) {
@@ -74,7 +76,7 @@ public class CompressionNonDecimalLong extends 
ValueCompressionHolder<long[]> {
 
   @Override public long getLongValue(int index) {
     throw new UnsupportedOperationException(
-      "Long value is not defined for CompressionNonDecimalLong");
+        "Long value is not defined for CompressionNonDecimalLong");
   }
 
   @Override public double getDoubleValue(int index) {
@@ -83,17 +85,18 @@ public class CompressionNonDecimalLong extends 
ValueCompressionHolder<long[]> {
 
   @Override public BigDecimal getBigDecimalValue(int index) {
     throw new UnsupportedOperationException(
-      "Big decimal value is not defined for CompressionNonDecimalLong");
-  }
-
-  private void setUncompressedValues(long[] data, int decimalPlaces) {
-    this.measureChunkStore =
-        
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_LONG, 
data.length);
-    this.measureChunkStore.putData(data);
-    this.divisionFactory = Math.pow(10, decimalPlaces);
+        "Big decimal value is not defined for CompressionNonDecimalLong");
   }
 
   @Override public void freeMemory() {
     this.measureChunkStore.freeMemory();
   }
+
+  @Override
+  public void setValue(long[] data, int numberOfRows, Object maxValueObject, 
int decimalPlaces) {
+    this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
+        .getMeasureDataChunkStore(DataType.DATA_LONG, numberOfRows);
+    this.measureChunkStore.putData(data);
+    this.divisionFactory = Math.pow(10, decimalPlaces);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9e24a3dd/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinByte.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinByte.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinByte.java
index 80cf24e..83e8a1a 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinByte.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinByte.java
@@ -51,11 +51,10 @@ public class CompressionNonDecimalMaxMinByte extends 
ValueCompressionHolder<byte
 
   private double divisionFactor;
 
-  @Override
-  public void uncompress(DataType dataType, byte[] compressedData,
-      int offset, int length, int decimalPlaces, Object maxValueObject) {
-    super.unCompress(compressor, dataType, compressedData, offset, length);
-    setUncompressedValues(value, decimalPlaces, maxValueObject );
+  @Override public void uncompress(DataType dataType, byte[] compressedData, 
int offset, int length,
+      int decimalPlaces, Object maxValueObject, int numberOfRows) {
+    super.unCompress(compressor, dataType, compressedData, offset, length, 
numberOfRows,
+        maxValueObject, decimalPlaces);
   }
 
   @Override public void compress() {
@@ -72,7 +71,7 @@ public class CompressionNonDecimalMaxMinByte extends 
ValueCompressionHolder<byte
 
   @Override public long getLongValue(int index) {
     throw new UnsupportedOperationException(
-      "Long value is not defined for CompressionNonDecimalMaxMinByte");
+        "Long value is not defined for CompressionNonDecimalMaxMinByte");
   }
 
   @Override public double getDoubleValue(int index) {
@@ -84,20 +83,23 @@ public class CompressionNonDecimalMaxMinByte extends 
ValueCompressionHolder<byte
 
   @Override public BigDecimal getBigDecimalValue(int index) {
     throw new UnsupportedOperationException(
-      "Big decimal value is not defined for CompressionNonDecimalMaxMinByte");
-  }
-
-  private void setUncompressedValues(byte[] data, int decimalPlaces, Object 
maxValueObject) {
-    this.measureChunkStore =
-        
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_BYTE, 
data.length);
-    this.measureChunkStore.putData(data);
-    this.maxValue = BigDecimal.valueOf((double) maxValueObject);
-    this.divisionFactor = Math.pow(10, decimalPlaces);
+        "Big decimal value is not defined for 
CompressionNonDecimalMaxMinByte");
   }
 
   @Override public void freeMemory() {
     this.measureChunkStore.freeMemory();
   }
 
-  @Override public byte[] getValue() { return this.value; }
+  @Override public byte[] getValue() {
+    return this.value;
+  }
+
+  @Override
+  public void setValue(byte[] data, int numberOfRows, Object maxValueObject, 
int decimalPlaces) {
+    this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
+        .getMeasureDataChunkStore(DataType.DATA_BYTE, numberOfRows);
+    this.measureChunkStore.putData(data);
+    this.maxValue = BigDecimal.valueOf((double) maxValueObject);
+    this.divisionFactor = Math.pow(10, decimalPlaces);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9e24a3dd/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinDefault.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinDefault.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinDefault.java
index d228836..c408d9a 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinDefault.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinDefault.java
@@ -57,7 +57,9 @@ public class CompressionNonDecimalMaxMinDefault extends 
ValueCompressionHolder<d
     this.value = value;
   }
 
-  @Override public double[] getValue() { return this.value; }
+  @Override public double[] getValue() {
+    return this.value;
+  }
 
   @Override public void compress() {
     compressedValue = super.compress(compressor, DataType.DATA_DOUBLE, value);
@@ -68,16 +70,15 @@ public class CompressionNonDecimalMaxMinDefault extends 
ValueCompressionHolder<d
     this.value = ValueCompressionUtil.convertToDoubleArray(buffer, 
value.length);
   }
 
-  @Override
-  public void uncompress(DataType dataType, byte[] compressedData,
-      int offset, int length, int decimalPlaces, Object maxValueObject) {
-    super.unCompress(compressor,dataType,compressedData, offset, length);
-    setUncompressedValues(value, decimalPlaces, maxValueObject);
+  @Override public void uncompress(DataType dataType, byte[] compressedData, 
int offset, int length,
+      int decimalPlaces, Object maxValueObject, int numberOfRows) {
+    super.unCompress(compressor, dataType, compressedData, offset, length, 
numberOfRows,
+        maxValueObject, decimalPlaces);
   }
 
   @Override public long getLongValue(int index) {
     throw new UnsupportedOperationException(
-      "Long value is not defined for CompressionNonDecimalMaxMinDefault");
+        "Long value is not defined for CompressionNonDecimalMaxMinDefault");
   }
 
   @Override public double getDoubleValue(int index) {
@@ -88,18 +89,19 @@ public class CompressionNonDecimalMaxMinDefault extends 
ValueCompressionHolder<d
 
   @Override public BigDecimal getBigDecimalValue(int index) {
     throw new UnsupportedOperationException(
-      "Big decimal value is not defined for 
CompressionNonDecimalMaxMinDefault");
+        "Big decimal value is not defined for 
CompressionNonDecimalMaxMinDefault");
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
   }
 
-  private void setUncompressedValues(double[] data, int decimalPlaces, Object 
maxValueObject) {
+  @Override
+  public void setValue(double[] data, int numberOfRows, Object maxValueObject, 
int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_DOUBLE, data.length);
+        .getMeasureDataChunkStore(DataType.DATA_DOUBLE, numberOfRows);
     this.measureChunkStore.putData(data);
     this.maxValue = BigDecimal.valueOf((double) maxValueObject);
     this.divisionFactor = Math.pow(10, decimalPlaces);
   }
-
-  @Override public void freeMemory() {
-    this.measureChunkStore.freeMemory();
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9e24a3dd/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinInt.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinInt.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinInt.java
index 2b40d60..3df2948 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinInt.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinInt.java
@@ -55,7 +55,9 @@ public class CompressionNonDecimalMaxMinInt extends 
ValueCompressionHolder<int[]
     this.value = value;
   }
 
-  @Override public int[] getValue() {return this.value; }
+  @Override public int[] getValue() {
+    return this.value;
+  }
 
   @Override public void compress() {
     compressedValue = super.compress(compressor, DataType.DATA_INT, value);
@@ -66,16 +68,16 @@ public class CompressionNonDecimalMaxMinInt extends 
ValueCompressionHolder<int[]
     this.value = ValueCompressionUtil.convertToIntArray(buffer, value.length);
   }
 
-  @Override
-  public void uncompress(DataType dataType, byte[] compressedData,
-      int offset, int length, int decimalPlaces, Object maxValueObject) {
-    super.unCompress(compressor, dataType, compressedData, offset, length);
+  @Override public void uncompress(DataType dataType, byte[] compressedData, 
int offset, int length,
+      int decimalPlaces, Object maxValueObject, int numberOfRows) {
+    super.unCompress(compressor, dataType, compressedData, offset, length, 
numberOfRows,
+        maxValueObject, decimalPlaces);
     setUncompressedValues(value, decimalPlaces, maxValueObject);
   }
 
   @Override public long getLongValue(int index) {
     throw new UnsupportedOperationException(
-      "Long value is not defined for CompressionNonDecimalMaxMinInt");
+        "Long value is not defined for CompressionNonDecimalMaxMinInt");
   }
 
   @Override public double getDoubleValue(int index) {
@@ -86,7 +88,7 @@ public class CompressionNonDecimalMaxMinInt extends 
ValueCompressionHolder<int[]
 
   @Override public BigDecimal getBigDecimalValue(int index) {
     throw new UnsupportedOperationException(
-      "Big decimal value is not defined for CompressionNonDecimalMaxMinInt");
+        "Big decimal value is not defined for CompressionNonDecimalMaxMinInt");
   }
 
   private void setUncompressedValues(int[] data, int decimalPlaces, Object 
maxValueObject) {
@@ -100,4 +102,14 @@ public class CompressionNonDecimalMaxMinInt extends 
ValueCompressionHolder<int[]
   @Override public void freeMemory() {
     this.measureChunkStore.freeMemory();
   }
+
+  @Override
+  public void setValue(int[] data, int numberOfRows, Object maxValueObject, 
int decimalPlaces) {
+    this.measureChunkStore =
+        
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_INT, 
numberOfRows);
+    this.measureChunkStore.putData(data);
+    this.maxValue = BigDecimal.valueOf((double) maxValueObject);
+    this.divisionFactor = Math.pow(10, decimalPlaces);
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9e24a3dd/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinLong.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinLong.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinLong.java
index 60ac51d..f8d8ed5 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinLong.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinLong.java
@@ -57,13 +57,14 @@ public class CompressionNonDecimalMaxMinLong extends 
ValueCompressionHolder<long
 
   }
 
-  @Override public long[] getValue() { return this.value; }
+  @Override public long[] getValue() {
+    return this.value;
+  }
 
-  @Override
-  public void uncompress(DataType dataType, byte[] compressedData,
-      int offset, int length, int decimalPlaces, Object maxValueObject) {
-    super.unCompress(compressor, dataType, compressedData, offset, length);
-    setUncompressedValues(value, decimalPlaces, maxValueObject);
+  @Override public void uncompress(DataType dataType, byte[] compressedData, 
int offset, int length,
+      int decimalPlaces, Object maxValueObject, int numberOfRows) {
+    super.unCompress(compressor, dataType, compressedData, offset, length, 
numberOfRows,
+        maxValueObject, decimalPlaces);
   }
 
   @Override public void compress() {
@@ -77,7 +78,7 @@ public class CompressionNonDecimalMaxMinLong extends 
ValueCompressionHolder<long
 
   @Override public long getLongValue(int index) {
     throw new UnsupportedOperationException(
-      "Long value is not defined for CompressionNonDecimalMaxMinLong");
+        "Long value is not defined for CompressionNonDecimalMaxMinLong");
   }
 
   @Override public double getDoubleValue(int index) {
@@ -90,15 +91,16 @@ public class CompressionNonDecimalMaxMinLong extends 
ValueCompressionHolder<long
     throw new UnsupportedOperationException("Big decimal value is not 
supported");
   }
 
-  private void setUncompressedValues(long[] data, int decimalPlaces, Object 
maxValueObject) {
-    this.measureChunkStore =
-      
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_LONG, 
data.length);
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+
+  @Override
+  public void setValue(long[] data, int numberOfRows, Object maxValueObject, 
int decimalPlaces) {
+    this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
+        .getMeasureDataChunkStore(DataType.DATA_LONG, numberOfRows);
     this.measureChunkStore.putData(data);
     this.maxValue = BigDecimal.valueOf((double) maxValueObject);
     this.divisionFactor = Math.pow(10, decimalPlaces);
   }
-
-  @Override public void freeMemory() {
-    this.measureChunkStore.freeMemory();
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9e24a3dd/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinShort.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinShort.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinShort.java
index b22ec65..14648ba 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinShort.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinShort.java
@@ -55,13 +55,14 @@ public class CompressionNonDecimalMaxMinShort extends 
ValueCompressionHolder<sho
     this.value = value;
   }
 
-  @Override public short[] getValue() { return this.value; }
+  @Override public short[] getValue() {
+    return this.value;
+  }
 
-  @Override
-  public void uncompress(DataType dataType, byte[] compressedData,
-      int offset, int length, int decimalPlaces, Object maxValueObject) {
-    super.unCompress(compressor, dataType, compressedData, offset, length);
-    setUncompressedValues(value, decimalPlaces, maxValueObject);
+  @Override public void uncompress(DataType dataType, byte[] compressedData, 
int offset, int length,
+      int decimalPlaces, Object maxValueObject, int numberOfRows) {
+    super.unCompress(compressor, dataType, compressedData, offset, length, 
numberOfRows,
+        maxValueObject, decimalPlaces);
   }
 
   @Override public void compress() {
@@ -75,7 +76,7 @@ public class CompressionNonDecimalMaxMinShort extends 
ValueCompressionHolder<sho
 
   @Override public long getLongValue(int index) {
     throw new UnsupportedOperationException(
-      "Long value is not defined for CompressionNonDecimalMaxMinShort");
+        "Long value is not defined for CompressionNonDecimalMaxMinShort");
   }
 
   @Override public double getDoubleValue(int index) {
@@ -88,15 +89,17 @@ public class CompressionNonDecimalMaxMinShort extends 
ValueCompressionHolder<sho
     throw new UnsupportedOperationException("Get big decimal value is not 
supported");
   }
 
-  private void setUncompressedValues(short[] data, int decimalPlaces, Object 
maxValueObject) {
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+
+  @Override
+  public void setValue(short[] data, int numberOfRows, Object maxValueObject, 
int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_SHORT, data.length);
+        .getMeasureDataChunkStore(DataType.DATA_SHORT, numberOfRows);
     this.measureChunkStore.putData(data);
     this.maxValue = BigDecimal.valueOf((double) maxValueObject);
     this.divisionFactor = Math.pow(10, decimalPlaces);
-  }
 
-  @Override public void freeMemory() {
-    this.measureChunkStore.freeMemory();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9e24a3dd/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalShort.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalShort.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalShort.java
index 65e2d93..8536630 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalShort.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalShort.java
@@ -53,17 +53,18 @@ public class CompressionNonDecimalShort extends 
ValueCompressionHolder<short[]>
     this.value = value;
   }
 
-  @Override public short[] getValue() { return this.value; }
+  @Override public short[] getValue() {
+    return this.value;
+  }
 
   @Override public void compress() {
     compressedValue = super.compress(compressor, DataType.DATA_SHORT, value);
   }
 
-  @Override
-  public void uncompress(DataType dataType, byte[] compressedData,
-      int offset, int length, int decimalPlaces, Object maxValueObject) {
-    super.unCompress(compressor,dataType,compressedData, offset, length);
-    setUncompressedValues(value, decimalPlaces);
+  @Override public void uncompress(DataType dataType, byte[] compressedData, 
int offset, int length,
+      int decimalPlaces, Object maxValueObject, int numberOfRows) {
+    super.unCompress(compressor, dataType, compressedData, offset, length, 
numberOfRows,
+        maxValueObject, decimalPlaces);
   }
 
   @Override public void setValueInBytes(byte[] value) {
@@ -73,7 +74,7 @@ public class CompressionNonDecimalShort extends 
ValueCompressionHolder<short[]>
 
   @Override public long getLongValue(int index) {
     throw new UnsupportedOperationException(
-      "Long value is not defined for CompressionNonDecimalShort");
+        "Long value is not defined for CompressionNonDecimalShort");
   }
 
   @Override public double getDoubleValue(int index) {
@@ -82,17 +83,18 @@ public class CompressionNonDecimalShort extends 
ValueCompressionHolder<short[]>
 
   @Override public BigDecimal getBigDecimalValue(int index) {
     throw new UnsupportedOperationException(
-      "Big decimal value is not defined for CompressionNonDecimalShort");
+        "Big decimal value is not defined for CompressionNonDecimalShort");
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
   }
 
-  private void setUncompressedValues(short[] data, int decimalPlaces) {
+  @Override
+  public void setValue(short[] data, int numberOfRows, Object maxValueObject, 
int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_SHORT, data.length);
+        .getMeasureDataChunkStore(DataType.DATA_SHORT, numberOfRows);
     this.measureChunkStore.putData(data);
     this.divisionFactory = Math.pow(10, decimalPlaces);
   }
-
-  @Override public void freeMemory() {
-    this.measureChunkStore.freeMemory();
-  }
 }


Reply via email to