http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/SnappyCompressor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/SnappyCompressor.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/SnappyCompressor.java
index d9b627f..62d3b53 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/SnappyCompressor.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/SnappyCompressor.java
@@ -20,19 +20,38 @@
 package org.apache.carbondata.core.datastorage.store.compression;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 
 import org.xerial.snappy.Snappy;
+import org.xerial.snappy.SnappyNative;
 
 public class SnappyCompressor implements Compressor {
 
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(SnappyCompressor.class.getName());
 
-  @Override
-  public byte[] compressByte(byte[] unCompInput) {
+  private final SnappyNative snappyNative;
+
+  public SnappyCompressor() {
+    Snappy snappy = new Snappy();
+    Field privateField = null;
+    try {
+      privateField = snappy.getClass().getDeclaredField("impl");
+    } catch (NoSuchFieldException | SecurityException e) {
+      throw new RuntimeException(e);
+    }
+    privateField.setAccessible(true);
+    try {
+      snappyNative = (SnappyNative) privateField.get(snappy);
+    } catch (IllegalArgumentException | IllegalAccessException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override public byte[] compressByte(byte[] unCompInput) {
     try {
       return Snappy.rawCompress(unCompInput, unCompInput.length);
     } catch (IOException e) {
@@ -41,8 +60,7 @@ public class SnappyCompressor implements Compressor {
     }
   }
 
-  @Override
-  public byte[] unCompressByte(byte[] compInput) {
+  @Override public byte[] unCompressByte(byte[] compInput) {
     try {
       return Snappy.uncompress(compInput);
     } catch (IOException e) {
@@ -51,8 +69,20 @@ public class SnappyCompressor implements Compressor {
     return compInput;
   }
 
-  @Override
-  public byte[] compressShort(short[] unCompInput) {
+  @Override public byte[] unCompressByte(byte[] compInput, int offset, int 
length) {
+    int uncompressedLength = 0;
+    byte[] data = null;
+    try {
+      uncompressedLength = Snappy.uncompressedLength(compInput, offset, 
length);
+      data = new byte[uncompressedLength];
+      Snappy.uncompress(compInput, offset, length, data, 0);
+    } catch (IOException e) {
+      LOGGER.error(e, e.getMessage());
+    }
+    return data;
+  }
+
+  @Override public byte[] compressShort(short[] unCompInput) {
     try {
       return Snappy.compress(unCompInput);
     } catch (IOException e) {
@@ -61,8 +91,7 @@ public class SnappyCompressor implements Compressor {
     }
   }
 
-  @Override
-  public short[] unCompressShort(byte[] compInput) {
+  @Override public short[] unCompressShort(byte[] compInput) {
     try {
       return Snappy.uncompressShortArray(compInput);
     } catch (IOException e) {
@@ -71,8 +100,16 @@ public class SnappyCompressor implements Compressor {
     return null;
   }
 
-  @Override
-  public byte[] compressInt(int[] unCompInput) {
+  @Override public short[] unCompressShort(byte[] compInput, int offset, int 
lenght) {
+    try {
+      return Snappy.uncompressShortArray(compInput, offset, lenght);
+    } catch (IOException e) {
+      LOGGER.error(e, e.getMessage());
+    }
+    return null;
+  }
+
+  @Override public byte[] compressInt(int[] unCompInput) {
     try {
       return Snappy.compress(unCompInput);
     } catch (IOException e) {
@@ -81,8 +118,7 @@ public class SnappyCompressor implements Compressor {
     }
   }
 
-  @Override
-  public int[] unCompressInt(byte[] compInput) {
+  @Override public int[] unCompressInt(byte[] compInput) {
     try {
       return Snappy.uncompressIntArray(compInput);
     } catch (IOException e) {
@@ -91,8 +127,16 @@ public class SnappyCompressor implements Compressor {
     return null;
   }
 
-  @Override
-  public byte[] compressLong(long[] unCompInput) {
+  @Override public int[] unCompressInt(byte[] compInput, int offset, int 
length) {
+    try {
+      return Snappy.uncompressIntArray(compInput, offset, length);
+    } catch (IOException e) {
+      LOGGER.error(e, e.getMessage());
+    }
+    return null;
+  }
+
+  @Override public byte[] compressLong(long[] unCompInput) {
     try {
       return Snappy.compress(unCompInput);
     } catch (IOException e) {
@@ -101,8 +145,7 @@ public class SnappyCompressor implements Compressor {
     }
   }
 
-  @Override
-  public long[] unCompressLong(byte[] compInput) {
+  @Override public long[] unCompressLong(byte[] compInput) {
     try {
       return Snappy.uncompressLongArray(compInput);
     } catch (IOException e) {
@@ -111,8 +154,16 @@ public class SnappyCompressor implements Compressor {
     return null;
   }
 
-  @Override
-  public byte[] compressFloat(float[] unCompInput) {
+  @Override public long[] unCompressLong(byte[] compInput, int offset, int 
length) {
+    try {
+      return Snappy.uncompressLongArray(compInput, offset, length);
+    } catch (IOException e) {
+      LOGGER.error(e, e.getMessage());
+    }
+    return null;
+  }
+
+  @Override public byte[] compressFloat(float[] unCompInput) {
     try {
       return Snappy.compress(unCompInput);
     } catch (IOException e) {
@@ -121,8 +172,7 @@ public class SnappyCompressor implements Compressor {
     }
   }
 
-  @Override
-  public float[] unCompressFloat(byte[] compInput) {
+  @Override public float[] unCompressFloat(byte[] compInput) {
     try {
       return Snappy.uncompressFloatArray(compInput);
     } catch (IOException e) {
@@ -131,8 +181,16 @@ public class SnappyCompressor implements Compressor {
     return null;
   }
 
-  @Override
-  public byte[] compressDouble(double[] unCompInput) {
+  @Override public float[] unCompressFloat(byte[] compInput, int offset, int 
length) {
+    try {
+      return Snappy.uncompressFloatArray(compInput, offset, length);
+    } catch (IOException e) {
+      LOGGER.error(e, e.getMessage());
+    }
+    return null;
+  }
+
+  @Override public byte[] compressDouble(double[] unCompInput) {
     try {
       return Snappy.compress(unCompInput);
     } catch (IOException e) {
@@ -141,8 +199,7 @@ public class SnappyCompressor implements Compressor {
     }
   }
 
-  @Override
-  public double[] unCompressDouble(byte[] compInput) {
+  @Override public double[] unCompressDouble(byte[] compInput) {
     try {
       return Snappy.uncompressDoubleArray(compInput);
     } catch (IOException e) {
@@ -150,4 +207,16 @@ public class SnappyCompressor implements Compressor {
     }
     return null;
   }
+
+  @Override public double[] unCompressDouble(byte[] compInput, int offset, int 
length) {
+    try {
+      int uncompressedLength = Snappy.uncompressedLength(compInput, offset, 
length);
+      double[] result = new double[uncompressedLength / 8];
+      snappyNative.rawUncompress(compInput, offset, length, result, 0);
+      return result;
+    } catch (IOException e) {
+      LOGGER.error(e, e.getMessage());
+    }
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/ValueCompressonHolder.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/ValueCompressonHolder.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/ValueCompressonHolder.java
index f161b9a..eca12ef 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/ValueCompressonHolder.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/ValueCompressonHolder.java
@@ -19,7 +19,8 @@
 
 package org.apache.carbondata.core.datastorage.store.compression;
 
-import 
org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
+import java.math.BigDecimal;
+
 import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 /**
@@ -30,7 +31,7 @@ public final class ValueCompressonHolder {
   /**
    * byteCompressor.
    */
-  private static Compressor compressor = CompressorFactory.getInstance();
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
 
   private ValueCompressonHolder() {
 
@@ -41,26 +42,33 @@ public final class ValueCompressonHolder {
    * @param value
    * @param data
    */
-  public static void unCompress(DataType dataType, UnCompressValue value, 
byte[] data) {
+  public static void unCompress(DataType dataType, UnCompressValue value, 
byte[] data, int offset,
+      int length, int decimal, Object maxValueObject) {
     switch (dataType) {
       case DATA_BYTE:
-        value.setValue(compressor.unCompressByte(data));
+        value.setUncompressValues(compressor.unCompressByte(data, offset, 
length), decimal,
+            maxValueObject);
         break;
       case DATA_SHORT:
-        value.setValue(compressor.unCompressShort(data));
+        value.setUncompressValues(compressor.unCompressShort(data, offset, 
length), decimal,
+            maxValueObject);
         break;
       case DATA_INT:
-        value.setValue(compressor.unCompressInt(data));
+        value.setUncompressValues(compressor.unCompressInt(data, offset, 
length), decimal,
+            maxValueObject);
         break;
       case DATA_LONG:
       case DATA_BIGINT:
-        value.setValue(compressor.unCompressLong(data));
+        value.setUncompressValues(compressor.unCompressLong(data, offset, 
length), decimal,
+            maxValueObject);
         break;
       case DATA_FLOAT:
-        value.setValue(compressor.unCompressFloat(data));
+        value.setUncompressValues(compressor.unCompressFloat(data, offset, 
length), decimal,
+            maxValueObject);
         break;
       default:
-        value.setValue(compressor.unCompressDouble(data));
+        value.setUncompressValues(compressor.unCompressDouble(data, offset, 
length), decimal,
+            maxValueObject);
         break;
     }
   }
@@ -78,13 +86,22 @@ public final class ValueCompressonHolder {
 
     UnCompressValue compress();
 
-    UnCompressValue uncompress(DataType dataType);
+    UnCompressValue uncompress(DataType dataType, byte[] compressData, int 
offset, int length,
+        int decimal, Object maxValueObject);
+
+    void setUncompressValues(T data, int decimal, Object maxValueObject);
 
     byte[] getBackArrayData();
 
     UnCompressValue getCompressorObject();
 
-    CarbonReadDataHolder getValues(int decimal, Object maxValue);
+    long getLongValue(int index);
+
+    double getDoubleValue(int index);
+
+    BigDecimal getBigDecimalValue(int index);
+
+    void freeMemory();
 
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/decimal/UnCompressByteArray.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/decimal/UnCompressByteArray.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/decimal/UnCompressByteArray.java
deleted file mode 100644
index 1b9b163..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/decimal/UnCompressByteArray.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.core.datastorage.store.compression.decimal;
-
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.compression.Compressor;
-import 
org.apache.carbondata.core.datastorage.store.compression.CompressorFactory;
-import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
-import 
org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil;
-
-public class UnCompressByteArray implements 
ValueCompressonHolder.UnCompressValue<byte[]> {
-  /**
-   * Attribute for Carbon LOGGER
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(UnCompressMaxMinByte.class.getName());
-  /**
-   * compressor.
-   */
-  private static Compressor compressor = CompressorFactory.getInstance();
-
-  private ByteArrayType arrayType;
-  /**
-   * value.
-   */
-  private byte[] value;
-
-  public UnCompressByteArray(ByteArrayType type) {
-    if (type == ByteArrayType.BYTE_ARRAY) {
-      arrayType = ByteArrayType.BYTE_ARRAY;
-    } else {
-      arrayType = ByteArrayType.BIG_DECIMAL;
-    }
-
-  }
-
-  @Override public void setValue(byte[] value) {
-    this.value = value;
-
-  }
-
-  @Override public void setValueInBytes(byte[] value) {
-    this.value = value;
-
-  }
-
-  @Override public ValueCompressonHolder.UnCompressValue<byte[]> getNew() {
-    try {
-      return (ValueCompressonHolder.UnCompressValue) clone();
-    } catch (CloneNotSupportedException e) {
-      LOGGER.error(e, e.getMessage());
-    }
-    return null;
-  }
-
-  @Override public ValueCompressonHolder.UnCompressValue compress() {
-    UnCompressByteArray byte1 = new UnCompressByteArray(arrayType);
-    byte1.setValue(compressor.compressByte(value));
-    return byte1;
-  }
-
-  @Override
-  public ValueCompressonHolder.UnCompressValue 
uncompress(ValueCompressionUtil.DataType dataType) {
-    ValueCompressonHolder.UnCompressValue byte1 = new 
UnCompressByteArray(arrayType);
-    byte1.setValue(compressor.unCompressByte(value));
-    return byte1;
-  }
-
-  @Override public byte[] getBackArrayData() {
-    return this.value;
-  }
-
-  @Override public ValueCompressonHolder.UnCompressValue getCompressorObject() 
{
-    return new UnCompressByteArray(arrayType);
-  }
-
-  @Override public CarbonReadDataHolder getValues(int decimal, Object 
maxValueObject) {
-    List<byte[]> valsList = new 
ArrayList<byte[]>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    ByteBuffer buffer = ByteBuffer.wrap(value);
-    buffer.rewind();
-    int length = 0;
-    byte[] actualValue = null;
-    //CHECKSTYLE:OFF    Approval No:Approval-367
-    while (buffer.hasRemaining()) {//CHECKSTYLE:ON
-      length = buffer.getInt();
-      actualValue = new byte[length];
-      buffer.get(actualValue);
-      valsList.add(actualValue);
-
-    }
-    CarbonReadDataHolder holder = new CarbonReadDataHolder();
-    byte[][] value = new byte[valsList.size()][];
-    valsList.toArray(value);
-    if (arrayType == ByteArrayType.BIG_DECIMAL) {
-      BigDecimal[] bigDecimalValues = new BigDecimal[value.length];
-      for (int i = 0; i < value.length; i++) {
-        bigDecimalValues[i] = DataTypeUtil.byteToBigDecimal(value[i]);
-      }
-      holder.setReadableBigDecimalValues(bigDecimalValues);
-      return holder;
-    }
-    holder.setReadableByteValues(value);
-    return holder;
-  }
-
-  public enum ByteArrayType {
-    BYTE_ARRAY,
-    BIG_DECIMAL
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/decimal/UnCompressMaxMinByte.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/decimal/UnCompressMaxMinByte.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/decimal/UnCompressMaxMinByte.java
index 50caa62..637e35b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/decimal/UnCompressMaxMinByte.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/decimal/UnCompressMaxMinByte.java
@@ -1,5 +1,5 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
+3 * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
@@ -19,13 +19,16 @@
 
 package org.apache.carbondata.core.datastorage.store.compression.decimal;
 
+import java.math.BigDecimal;
+
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureChunkStoreFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastorage.store.compression.Compressor;
 import 
org.apache.carbondata.core.datastorage.store.compression.CompressorFactory;
 import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
 import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder.UnCompressValue;
-import 
org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
 import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
@@ -39,17 +42,21 @@ public class UnCompressMaxMinByte implements 
UnCompressValue<byte[]> {
   /**
    * compressor.
    */
-  private static Compressor compressor = CompressorFactory.getInstance();
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
   /**
    * value.
    */
   protected byte[] value;
 
+  private MeasureDataChunkStore<byte[]> measureChunkStore;
+
   /**
    * actual data type
    */
   protected DataType actualDataType;
 
+  private double maxValue;
+
   //TODO SIMIAN
 
   public UnCompressMaxMinByte(DataType actualDataType) {
@@ -76,10 +83,13 @@ public class UnCompressMaxMinByte implements 
UnCompressValue<byte[]> {
     return byte1;
   }
 
-  @Override public UnCompressValue uncompress(DataType dataType) {
-    UnCompressValue byte1 = 
ValueCompressionUtil.getUnCompressDecimalMaxMin(dataType,
-        actualDataType);
-    ValueCompressonHolder.unCompress(dataType, byte1, value);
+  @Override
+  public UnCompressValue uncompress(DataType dataType, byte[] compressData, 
int offset, int length,
+      int decimalPlaces, Object maxValueObject) {
+    UnCompressValue byte1 =
+        ValueCompressionUtil.getUnCompressDecimalMaxMin(dataType, 
actualDataType);
+    ValueCompressonHolder
+        .unCompress(dataType, byte1, compressData, offset, length, 
decimalPlaces, maxValueObject);
     return byte1;
   }
 
@@ -98,38 +108,40 @@ public class UnCompressMaxMinByte implements 
UnCompressValue<byte[]> {
     return new UnCompressMaxMinByte(actualDataType);
   }
 
-  @Override public CarbonReadDataHolder getValues(int decimal, Object 
maxValueObject) {
-    switch (actualDataType) {
-      case DATA_BIGINT:
-        return unCompressLong(maxValueObject);
-      default:
-        return unCompressDouble(maxValueObject);
+  @Override public long getLongValue(int index) {
+    byte byteValue = measureChunkStore.getByte(index);
+    if (byteValue == (byte) 0) {
+      return (long) maxValue;
+    } else {
+      return (short) (maxValue - byteValue);
     }
   }
 
-  private CarbonReadDataHolder unCompressLong(Object maxValueObject) {
-    long maxValue = (long) maxValueObject;
-    long[] vals = new long[value.length];
-    CarbonReadDataHolder dataHolder = new CarbonReadDataHolder();
-    for (int i = 0; i < vals.length; i++) {
-      if (value[i] == 0) {
-        vals[i] = maxValue;
-      } else {
-        vals[i] = maxValue - value[i];
-      }
+  @Override public double getDoubleValue(int index) {
+    byte byteValue = measureChunkStore.getByte(index);
+    if (byteValue == (byte) 0) {
+      return maxValue;
+    } else {
+      return (short) (maxValue - byteValue);
     }
-    dataHolder.setReadableLongValues(vals);
-    return dataHolder;
   }
 
-  private CarbonReadDataHolder unCompressDouble(Object maxValueObject) {
-    double maxValue = (double) maxValueObject;
-    double[] vals = new double[value.length];
-    CarbonReadDataHolder dataHolder = new CarbonReadDataHolder();
-    for (int i = 0; i < vals.length; i++) {
-      vals[i] = maxValue - value[i];
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException("Get big decimal value is not 
supported");
+  }
+
+  @Override public void setUncompressValues(byte[] data, int decimalPlaces, 
Object maxValueObject) {
+    this.measureChunkStore =
+        
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_BYTE, 
data.length);
+    this.measureChunkStore.putData(data);
+    if (maxValueObject instanceof Long) {
+      this.maxValue = (long) maxValueObject;
+    } else {
+      this.maxValue = (double) maxValueObject;
     }
-    dataHolder.setReadableDoubleValues(vals);
-    return dataHolder;
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/decimal/UnCompressMaxMinDefault.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/decimal/UnCompressMaxMinDefault.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/decimal/UnCompressMaxMinDefault.java
index dc03694..5d5c057 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/decimal/UnCompressMaxMinDefault.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/decimal/UnCompressMaxMinDefault.java
@@ -19,14 +19,17 @@
 
 package org.apache.carbondata.core.datastorage.store.compression.decimal;
 
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureChunkStoreFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastorage.store.compression.Compressor;
 import 
org.apache.carbondata.core.datastorage.store.compression.CompressorFactory;
 import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
-import 
org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
+import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder.UnCompressValue;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
 import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
@@ -41,17 +44,21 @@ public class UnCompressMaxMinDefault implements 
ValueCompressonHolder.UnCompress
   /**
    * compressor.
    */
-  private static Compressor compressor = CompressorFactory.getInstance();
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
   /**
    * value.
    */
   private double[] value;
 
+  private MeasureDataChunkStore<double[]> measureChunkStore;
+
   /**
    * actual data type
    */
   private DataType actualDataType;
 
+  private double maxValue;
+
   public UnCompressMaxMinDefault(DataType actualDataType) {
     this.actualDataType = actualDataType;
   }
@@ -76,7 +83,9 @@ public class UnCompressMaxMinDefault implements 
ValueCompressonHolder.UnCompress
     return byte1;
   }
 
-  @Override public ValueCompressonHolder.UnCompressValue uncompress(DataType 
dataType) {
+  @Override
+  public UnCompressValue uncompress(DataType dataType, byte[] compressData, 
int offset, int length,
+      int decimalPlaces, Object maxValueObject) {
     return null;
   }
 
@@ -96,16 +105,36 @@ public class UnCompressMaxMinDefault implements 
ValueCompressonHolder.UnCompress
     return new UnCompressMaxMinByte(actualDataType);
   }
 
-  //TODO SIMIAN
-  @Override public CarbonReadDataHolder getValues(int decimal, Object 
maxValueObject) {
-    double maxValue = (double) maxValueObject;
-    double[] vals = new double[value.length];
-    CarbonReadDataHolder dataHolderInfoObj = new CarbonReadDataHolder();
-    for (int i = 0; i < vals.length; i++) {
-      vals[i] = maxValue - value[i];
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException("Get long value is not supported");
+  }
+
+  @Override public double getDoubleValue(int index) {
+    double doubleValue = measureChunkStore.getDouble(index);
+    if (doubleValue == 0) {
+      return maxValue;
+    } else {
+      return maxValue - doubleValue;
+    }
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException("Get big decimal value is not 
supported");
+  }
+
+  @Override
+  public void setUncompressValues(double[] data, int decimalPlaces, Object 
maxValueObject) {
+    this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
+        .getMeasureDataChunkStore(DataType.DATA_DOUBLE, data.length);
+    this.measureChunkStore.putData(data);
+    if (maxValueObject instanceof Long) {
+      this.maxValue = (long) maxValueObject;
+    } else {
+      this.maxValue = (double) maxValueObject;
     }
-    dataHolderInfoObj.setReadableDoubleValues(vals);
-    return dataHolderInfoObj;
   }
 
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/decimal/UnCompressMaxMinFloat.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/decimal/UnCompressMaxMinFloat.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/decimal/UnCompressMaxMinFloat.java
deleted file mode 100644
index 44bb60d..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/decimal/UnCompressMaxMinFloat.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.core.datastorage.store.compression.decimal;
-
-import java.nio.ByteBuffer;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastorage.store.compression.Compressor;
-import 
org.apache.carbondata.core.datastorage.store.compression.CompressorFactory;
-import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder.UnCompressValue;
-import 
org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
-import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
-
-public class UnCompressMaxMinFloat implements UnCompressValue<float[]> {
-
-  /**
-   * Attribute for Carbon LOGGER
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(UnCompressMaxMinFloat.class.getName());
-  /**
-   * floatCompressor
-   */
-  private static Compressor compressor = CompressorFactory.getInstance();
-  /**
-   * value.
-   */
-  private float[] value;
-
-  private DataType actualDataType;
-
-  public UnCompressMaxMinFloat(DataType actualDataType) {
-    this.actualDataType = actualDataType;
-  }
-
-  @Override public void setValue(float[] value) {
-    this.value = value;
-
-  }
-
-  @Override public UnCompressValue getNew() {
-    try {
-      return (UnCompressValue) clone();
-    } catch (CloneNotSupportedException ex4) {
-      LOGGER.error(ex4, ex4.getMessage());
-    }
-    return null;
-  }
-
-  @Override public UnCompressValue compress() {
-    UnCompressMaxMinByte byte1 = new UnCompressMaxMinByte(actualDataType);
-    byte1.setValue(compressor.compressFloat(value));
-    return byte1;
-  }
-
-  @Override public UnCompressValue uncompress(DataType dTypeVal) {
-    return null;
-  }
-
-  @Override public byte[] getBackArrayData() {
-    return ValueCompressionUtil.convertToBytes(value);
-  }
-
-  @Override public void setValueInBytes(byte[] value) {
-    ByteBuffer buffer = ByteBuffer.wrap(value);
-    this.value = ValueCompressionUtil.convertToFloatArray(buffer, 
value.length);
-  }
-
-  @Override public UnCompressValue getCompressorObject() {
-    return new UnCompressMaxMinByte(actualDataType);
-  }
-
-  @Override public CarbonReadDataHolder getValues(int decimal, Object 
maxValueObject) {
-    double maxValue = (double) maxValueObject;
-    double[] vals = new double[value.length];
-    CarbonReadDataHolder dataHolderVal = new CarbonReadDataHolder();
-    for (int i = 0; i < vals.length; i++) {
-      vals[i] = maxValue - value[i];
-    }
-    dataHolderVal.setReadableDoubleValues(vals);
-    return dataHolderVal;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/decimal/UnCompressMaxMinInt.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/decimal/UnCompressMaxMinInt.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/decimal/UnCompressMaxMinInt.java
index 8e42c64..f58f8a7 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/decimal/UnCompressMaxMinInt.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/decimal/UnCompressMaxMinInt.java
@@ -19,14 +19,17 @@
 
 package org.apache.carbondata.core.datastorage.store.compression.decimal;
 
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureChunkStoreFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastorage.store.compression.Compressor;
 import 
org.apache.carbondata.core.datastorage.store.compression.CompressorFactory;
 import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
-import 
org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
+import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder.UnCompressValue;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
 import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
@@ -40,7 +43,10 @@ public class UnCompressMaxMinInt implements 
ValueCompressonHolder.UnCompressValu
   /**
    * compressor.
    */
-  private static Compressor compressor = CompressorFactory.getInstance();
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
+
+  private MeasureDataChunkStore<int[]> measureChunkStore;
+
   /**
    * value.
    */
@@ -48,6 +54,8 @@ public class UnCompressMaxMinInt implements 
ValueCompressonHolder.UnCompressValu
 
   private DataType actualDataType;
 
+  private double maxValue;
+
   public UnCompressMaxMinInt(DataType actualDataType) {
     this.actualDataType = actualDataType;
   }
@@ -72,8 +80,9 @@ public class UnCompressMaxMinInt implements 
ValueCompressonHolder.UnCompressValu
     return byte1;
   }
 
-  @Override public ValueCompressonHolder.UnCompressValue uncompress(
-      ValueCompressionUtil.DataType dataTypeValue) {
+  @Override
+  public UnCompressValue uncompress(DataType dataType, byte[] compressData, 
int offset, int length,
+      int decimalPlaces, Object maxValueObject) {
     return null;
   }
 
@@ -93,35 +102,40 @@ public class UnCompressMaxMinInt implements 
ValueCompressonHolder.UnCompressValu
     return new UnCompressMaxMinByte(actualDataType);
   }
 
-  @Override public CarbonReadDataHolder getValues(int decVal, Object 
maxValueObject) {
-    switch (actualDataType) {
-      case DATA_BIGINT:
-        return unCompressLong(maxValueObject);
-      default:
-        return unCompressDouble(maxValueObject);
+  @Override public long getLongValue(int index) {
+    int intValue = measureChunkStore.getInt(index);
+    if (intValue == 0) {
+      return (long) maxValue;
+    } else {
+      return (long) maxValue - intValue;
     }
   }
 
-  private CarbonReadDataHolder unCompressDouble(Object maxValueObject) {
-    double maxValue = (double) maxValueObject;
-    double[] vals = new double[value.length];
-    CarbonReadDataHolder dataHolder = new CarbonReadDataHolder();
-    for (int i = 0; i < vals.length; i++) {
-      vals[i] = maxValue - value[i];
+  @Override public double getDoubleValue(int index) {
+    int intValue = measureChunkStore.getInt(index);
+    if (intValue == 0) {
+      return maxValue;
+    } else {
+      return maxValue - intValue;
     }
-    dataHolder.setReadableDoubleValues(vals);
-    return dataHolder;
   }
 
-  private CarbonReadDataHolder unCompressLong(Object maxValueObject) {
-    long maxValue = (long) maxValueObject;
-    long[] vals = new long[value.length];
-    CarbonReadDataHolder dataHolder = new CarbonReadDataHolder();
-    for (int i = 0; i < vals.length; i++) {
-      vals[i] = maxValue - value[i];
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException("Get big decimal value is not 
supported");
+  }
+
+  @Override public void setUncompressValues(int[] data, int decimalPlaces, 
Object maxValueObject) {
+    this.measureChunkStore =
+        
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_INT, 
data.length);
+    this.measureChunkStore.putData(data);
+    if (maxValueObject instanceof Long) {
+      this.maxValue = (long) maxValueObject;
+    } else {
+      this.maxValue = (double) maxValueObject;
     }
-    dataHolder.setReadableLongValues(vals);
-    return dataHolder;
   }
 
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/decimal/UnCompressMaxMinLong.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/decimal/UnCompressMaxMinLong.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/decimal/UnCompressMaxMinLong.java
index 5a266fa..6d64c73 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/decimal/UnCompressMaxMinLong.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/decimal/UnCompressMaxMinLong.java
@@ -19,14 +19,17 @@
 
 package org.apache.carbondata.core.datastorage.store.compression.decimal;
 
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureChunkStoreFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastorage.store.compression.Compressor;
 import 
org.apache.carbondata.core.datastorage.store.compression.CompressorFactory;
 import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
-import 
org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
+import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder.UnCompressValue;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
 import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
@@ -39,7 +42,9 @@ public class UnCompressMaxMinLong implements 
ValueCompressonHolder.UnCompressVal
   /**
    * compressor.
    */
-  private static Compressor compressor = CompressorFactory.getInstance();
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
+
+  private MeasureDataChunkStore<long[]> measureChunkStore;
   /**
    * value.
    */
@@ -47,6 +52,8 @@ public class UnCompressMaxMinLong implements 
ValueCompressonHolder.UnCompressVal
 
   protected DataType actualDataType;
 
+  private double maxValue;
+
   public UnCompressMaxMinLong(DataType actualDataType) {
     this.actualDataType = actualDataType;
   }
@@ -72,7 +79,8 @@ public class UnCompressMaxMinLong implements 
ValueCompressonHolder.UnCompressVal
   }
 
   @Override
-  public ValueCompressonHolder.UnCompressValue 
uncompress(ValueCompressionUtil.DataType dataType) {
+  public UnCompressValue uncompress(DataType dataType, byte[] compressData, 
int offset, int length,
+      int decimalPlaces, Object maxValueObject) {
     return null;
   }
 
@@ -92,35 +100,40 @@ public class UnCompressMaxMinLong implements 
ValueCompressonHolder.UnCompressVal
     return new UnCompressMaxMinByte(actualDataType);
   }
 
-  @Override public CarbonReadDataHolder getValues(int decimal, Object 
maxValueObject) {
-    switch (actualDataType) {
-      case DATA_BIGINT:
-        return unCompressLong(maxValueObject);
-      default:
-        return unCompressDouble(maxValueObject);
+  @Override public long getLongValue(int index) {
+    long longValue = measureChunkStore.getLong(index);
+    if (longValue == 0) {
+      return (long) maxValue;
+    } else {
+      return (long) maxValue - longValue;
     }
   }
 
-  private CarbonReadDataHolder unCompressDouble(Object maxValueObject) {
-    double maxValue = (double) maxValueObject;
-    double[] vals = new double[value.length];
-    CarbonReadDataHolder data = new CarbonReadDataHolder();
-    for (int i = 0; i < vals.length; i++) {
-      vals[i] = maxValue - value[i];
+  @Override public double getDoubleValue(int index) {
+    long longValue = measureChunkStore.getLong(index);
+    if (longValue == 0) {
+      return maxValue;
+    } else {
+      return maxValue - longValue;
     }
-    data.setReadableDoubleValues(vals);
-    return data;
   }
 
-  private CarbonReadDataHolder unCompressLong(Object maxValueObject) {
-    long maxValue = (long) maxValueObject;
-    long[] vals = new long[value.length];
-    CarbonReadDataHolder data = new CarbonReadDataHolder();
-    for (int i = 0; i < vals.length; i++) {
-      vals[i] = maxValue - value[i];
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException("Get big decimal value is not 
supported");
+  }
+
+  @Override public void setUncompressValues(long[] data, int decimalPlaces, 
Object maxValueObject) {
+    this.measureChunkStore =
+        
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_LONG, 
data.length);
+    this.measureChunkStore.putData(data);
+    if (maxValueObject instanceof Long) {
+      this.maxValue = (long) maxValueObject;
+    } else {
+      this.maxValue = (double) maxValueObject;
     }
-    data.setReadableLongValues(vals);
-    return data;
   }
 
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/decimal/UnCompressMaxMinShort.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/decimal/UnCompressMaxMinShort.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/decimal/UnCompressMaxMinShort.java
index edade84..a138379 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/decimal/UnCompressMaxMinShort.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/decimal/UnCompressMaxMinShort.java
@@ -19,14 +19,17 @@
 
 package org.apache.carbondata.core.datastorage.store.compression.decimal;
 
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureChunkStoreFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastorage.store.compression.Compressor;
 import 
org.apache.carbondata.core.datastorage.store.compression.CompressorFactory;
 import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
-import 
org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
+import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder.UnCompressValue;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
 import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
@@ -39,13 +42,16 @@ public class UnCompressMaxMinShort implements 
ValueCompressonHolder.UnCompressVa
   /**
    * shortCompressor.
    */
-  private static Compressor compressor = CompressorFactory.getInstance();
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
+
+  private MeasureDataChunkStore<short[]> measureChunkStore;
   /**
    * value.
    */
   private short[] value;
 
   private DataType actualDataType;
+  private double maxValue;
 
   public UnCompressMaxMinShort(DataType actualDataType) {
     this.actualDataType = actualDataType;
@@ -56,7 +62,9 @@ public class UnCompressMaxMinShort implements 
ValueCompressonHolder.UnCompressVa
 
   }
 
-  @Override public ValueCompressonHolder.UnCompressValue uncompress(DataType 
dataType) {
+  @Override
+  public UnCompressValue uncompress(DataType dataType, byte[] compressData, 
int offset, int length,
+      int decimalPlaces, Object maxValueObject) {
     return null;
   }
 
@@ -91,36 +99,41 @@ public class UnCompressMaxMinShort implements 
ValueCompressonHolder.UnCompressVa
     return new UnCompressMaxMinByte(actualDataType);
   }
 
-  @Override public CarbonReadDataHolder getValues(int decimal, Object 
maxValueObject) {
-    switch (actualDataType) {
-      case DATA_BIGINT:
-        return unCompressLong(maxValueObject);
-      default:
-        return unCompressDouble(maxValueObject);
+  @Override public long getLongValue(int index) {
+    short shortValue = measureChunkStore.getShort(index);
+    if (shortValue == 0) {
+      return (long) maxValue;
+    } else {
+      return (long) maxValue - shortValue;
     }
-
   }
 
-  private CarbonReadDataHolder unCompressDouble(Object maxValueObject) {
-    double maxValue = (double) maxValueObject;
-    double[] vals = new double[value.length];
-    CarbonReadDataHolder carbonDataHolderObj = new CarbonReadDataHolder();
-    for (int i = 0; i < vals.length; i++) {
-      vals[i] = maxValue - value[i];
+  @Override public double getDoubleValue(int index) {
+    short shortValue = measureChunkStore.getShort(index);
+    if (shortValue == 0) {
+      return maxValue;
+    } else {
+      return maxValue - shortValue;
     }
-    carbonDataHolderObj.setReadableDoubleValues(vals);
-    return carbonDataHolderObj;
   }
 
-  private CarbonReadDataHolder unCompressLong(Object maxValueObject) {
-    long maxValue = (long) maxValueObject;
-    long[] vals = new long[value.length];
-    CarbonReadDataHolder carbonDataHolderObj = new CarbonReadDataHolder();
-    for (int i = 0; i < vals.length; i++) {
-      vals[i] = maxValue - value[i];
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException("Get big decimal value is not 
supported");
+  }
+
+  @Override
+  public void setUncompressValues(short[] data, int decimalPlaces, Object 
maxValueObject) {
+    this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
+        .getMeasureDataChunkStore(DataType.DATA_SHORT, data.length);
+    this.measureChunkStore.putData(data);
+    if (maxValueObject instanceof Long) {
+      this.maxValue = (long) maxValueObject;
+    } else {
+      this.maxValue = (double) maxValueObject;
     }
-    carbonDataHolderObj.setReadableLongValues(vals);
-    return carbonDataHolderObj;
   }
 
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalByte.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalByte.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalByte.java
index 9718609..da11443 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalByte.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalByte.java
@@ -19,12 +19,15 @@
 
 package org.apache.carbondata.core.datastorage.store.compression.nondecimal;
 
+import java.math.BigDecimal;
+
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureChunkStoreFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastorage.store.compression.Compressor;
 import 
org.apache.carbondata.core.datastorage.store.compression.CompressorFactory;
 import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
-import 
org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
 import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
@@ -37,12 +40,16 @@ public class UnCompressNonDecimalByte implements 
ValueCompressonHolder.UnCompres
   /**
    * compressor.
    */
-  private static Compressor compressor = CompressorFactory.getInstance();
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
   /**
    * value.
    */
   private byte[] value;
 
+  private MeasureDataChunkStore<byte[]> measureChunkStore;
+
+  private double divisionFactory;
+
   @Override public void setValue(byte[] value) {
     this.value = value;
   }
@@ -62,10 +69,13 @@ public class UnCompressNonDecimalByte implements 
ValueCompressonHolder.UnCompres
     return byte1;
   }
 
-  @Override public ValueCompressonHolder.UnCompressValue uncompress(DataType 
dataType) {
+  @Override
+  public ValueCompressonHolder.UnCompressValue uncompress(DataType dataType, 
byte[] compressData,
+      int offset, int length, int decimalPlaces, Object maxValueObject) {
     ValueCompressonHolder.UnCompressValue byte1 =
         ValueCompressionUtil.getUnCompressNonDecimal(dataType);
-    ValueCompressonHolder.unCompress(dataType, byte1, value);
+    ValueCompressonHolder
+        .unCompress(dataType, byte1, compressData, offset, length, 
decimalPlaces, maxValueObject);
     return byte1;
   }
 
@@ -84,13 +94,26 @@ public class UnCompressNonDecimalByte implements 
ValueCompressonHolder.UnCompres
     return new UnCompressNonDecimalByte();
   }
 
-  @Override public CarbonReadDataHolder getValues(int decimal, Object 
maxValueObject) {
-    double[] vals = new double[value.length];
-    CarbonReadDataHolder dataHolder = new CarbonReadDataHolder();
-    for (int i = 0; i < vals.length; i++) {
-      vals[i] = value[i] / Math.pow(10, decimal);
-    }
-    dataHolder.setReadableDoubleValues(vals);
-    return dataHolder;
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException("Get long value is not supported");
+  }
+
+  @Override public double getDoubleValue(int index) {
+    return (measureChunkStore.getByte(index) / this.divisionFactory);
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException("Get big decimal value is not 
supported");
+  }
+
+  @Override public void setUncompressValues(byte[] data, int decimalPlaces, 
Object maxValueObject) {
+    this.measureChunkStore =
+        
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_BYTE, 
data.length);
+    this.measureChunkStore.putData(data);
+    this.divisionFactory = Math.pow(10, decimalPlaces);
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalDefault.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalDefault.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalDefault.java
index b345700..d5c08be 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalDefault.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalDefault.java
@@ -19,14 +19,17 @@
 
 package org.apache.carbondata.core.datastorage.store.compression.nondecimal;
 
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureChunkStoreFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastorage.store.compression.Compressor;
 import 
org.apache.carbondata.core.datastorage.store.compression.CompressorFactory;
 import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
-import 
org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
+import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder.UnCompressValue;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
 import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
@@ -40,12 +43,15 @@ public class UnCompressNonDecimalDefault
   /**
    * doubleCompressor.
    */
-  private static Compressor compressor = CompressorFactory.getInstance();
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
   /**
    * value.
    */
   private double[] value;
 
+  private MeasureDataChunkStore<double[]> measureChunkStore;
+  private double divisionFactory;
+
   @Override public ValueCompressonHolder.UnCompressValue getNew() {
     try {
       return (ValueCompressonHolder.UnCompressValue) clone();
@@ -61,7 +67,9 @@ public class UnCompressNonDecimalDefault
     return byte1;
   }
 
-  @Override public ValueCompressonHolder.UnCompressValue uncompress(DataType 
dataType) {
+  @Override
+  public UnCompressValue uncompress(DataType dataType, byte[] compressData, 
int offset, int length,
+      int decimalPlaces, Object maxValueObject) {
     return null;
   }
 
@@ -83,14 +91,27 @@ public class UnCompressNonDecimalDefault
     return new UnCompressNonDecimalByte();
   }
 
-  @Override public CarbonReadDataHolder getValues(int decimal, Object 
maxValueObject) {
-    double[] dblVals = new double[value.length];
-    for (int i = 0; i < dblVals.length; i++) {
-      dblVals[i] = value[i] / Math.pow(10, decimal);
-    }
-    CarbonReadDataHolder dataHolder = new CarbonReadDataHolder();
-    dataHolder.setReadableDoubleValues(dblVals);
-    return dataHolder;
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException("Get long value is not supported");
   }
 
+  @Override public double getDoubleValue(int index) {
+    return (measureChunkStore.getDouble(index) / divisionFactory);
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException("Get big decimal value is not 
supported");
+  }
+
+  @Override
+  public void setUncompressValues(double[] data, int decimalPlaces, Object 
maxValueObject) {
+    this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
+        .getMeasureDataChunkStore(DataType.DATA_DOUBLE, data.length);
+    this.measureChunkStore.putData(data);
+    this.divisionFactory = Math.pow(10, decimalPlaces);
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalFloat.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalFloat.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalFloat.java
deleted file mode 100644
index ef126ff..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalFloat.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.core.datastorage.store.compression.nondecimal;
-
-import java.nio.ByteBuffer;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastorage.store.compression.Compressor;
-import 
org.apache.carbondata.core.datastorage.store.compression.CompressorFactory;
-import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
-import 
org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
-import org.apache.carbondata.core.util.ValueCompressionUtil;
-
-public class UnCompressNonDecimalFloat implements 
ValueCompressonHolder.UnCompressValue<float[]> {
-  /**
-   * Attribute for Carbon LOGGER
-   */
-  private static final LogService LOGGER =
-      
LogServiceFactory.getLogService(UnCompressNonDecimalFloat.class.getName());
-  /**
-   * floatCompressor
-   */
-  private static Compressor compressor = CompressorFactory.getInstance();
-  /**
-   * value.
-   */
-
-  private float[] value;
-
-  @Override public void setValue(float[] value) {
-    this.value = value;
-
-  }
-
-  @Override public ValueCompressonHolder.UnCompressValue getNew() {
-    try {
-      return (ValueCompressonHolder.UnCompressValue) clone();
-    } catch (CloneNotSupportedException cnsexception) {
-      LOGGER
-          .error(cnsexception, cnsexception.getMessage());
-    }
-    return null;
-  }
-
-  public byte[] getBackArrayData() {
-    return ValueCompressionUtil.convertToBytes(value);
-  }
-
-  @Override public ValueCompressonHolder.UnCompressValue compress() {
-    UnCompressNonDecimalByte byte1 = new UnCompressNonDecimalByte();
-    byte1.setValue(compressor.compressFloat(value));
-    return byte1;
-  }
-
-  @Override
-  public ValueCompressonHolder.UnCompressValue 
uncompress(ValueCompressionUtil.DataType dataType) {
-    return null;
-  }
-
-  @Override public void setValueInBytes(byte[] value) {
-    ByteBuffer buffer = ByteBuffer.wrap(value);
-    this.value = ValueCompressionUtil.convertToFloatArray(buffer, 
value.length);
-  }
-
-  /**
-   * @see ValueCompressonHolder.UnCompressValue#getCompressorObject()
-   */
-  @Override public ValueCompressonHolder.UnCompressValue getCompressorObject() 
{
-    return new UnCompressNonDecimalByte();
-  }
-
-  @Override public CarbonReadDataHolder getValues(int decimal, Object 
maxValueObject) {
-    double[] vals = new double[value.length];
-    for (int m = 0; m < vals.length; m++) {
-      vals[m] = value[m] / Math.pow(10, decimal);
-    }
-    CarbonReadDataHolder dataHolder = new CarbonReadDataHolder();
-    dataHolder.setReadableDoubleValues(vals);
-    return dataHolder;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalInt.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalInt.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalInt.java
index cd47bff..c81aef1 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalInt.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalInt.java
@@ -19,14 +19,16 @@
 
 package org.apache.carbondata.core.datastorage.store.compression.nondecimal;
 
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureChunkStoreFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastorage.store.compression.Compressor;
 import 
org.apache.carbondata.core.datastorage.store.compression.CompressorFactory;
 import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder.UnCompressValue;
-import 
org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
 import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
@@ -39,12 +41,16 @@ public class UnCompressNonDecimalInt implements 
UnCompressValue<int[]> {
   /**
    * intCompressor.
    */
-  private static Compressor compressor = CompressorFactory.getInstance();
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
   /**
    * value.
    */
   private int[] value;
 
+  private MeasureDataChunkStore<int[]> measureChunkStore;
+
+  private double divisionFactory;
+
   @Override public void setValue(int[] value) {
     this.value = value;
   }
@@ -77,18 +83,33 @@ public class UnCompressNonDecimalInt implements 
UnCompressValue<int[]> {
     return new UnCompressNonDecimalByte();
   }
 
-  @Override public CarbonReadDataHolder getValues(int mantissa, Object 
maxValueObject) {
-    double[] vals = new double[value.length];
-    for (int k = 0; k < vals.length; k++) {
-      vals[k] = value[k] / Math.pow(10, mantissa);
-    }
-    CarbonReadDataHolder dataHolder = new CarbonReadDataHolder();
-    dataHolder.setReadableDoubleValues(vals);
-    return dataHolder;
+  @Override
+  public UnCompressValue uncompress(DataType dataType, byte[] compressData, 
int offset, int length,
+      int decimalPlaces, Object maxValueObject) {
+    return null;
   }
 
-  @Override public UnCompressValue uncompress(DataType dataType) {
-    return null;
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException("Get long value is not supported");
+  }
+
+  @Override public double getDoubleValue(int index) {
+    return (measureChunkStore.getInt(index) / this.divisionFactory);
   }
 
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException("Get big decimalPlaces value is 
not supported");
+  }
+
+  @Override public void setUncompressValues(int[] data, int decimalPlaces, 
Object maxValueObject) {
+    this.measureChunkStore =
+        
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_INT, 
data.length);
+    this.measureChunkStore.putData(data);
+    this.divisionFactory = Math.pow(10, decimalPlaces);
+
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalLong.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalLong.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalLong.java
index c717846..f9935cb 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalLong.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalLong.java
@@ -19,14 +19,16 @@
 
 package org.apache.carbondata.core.datastorage.store.compression.nondecimal;
 
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureChunkStoreFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastorage.store.compression.Compressor;
 import 
org.apache.carbondata.core.datastorage.store.compression.CompressorFactory;
 import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder.UnCompressValue;
-import 
org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
 import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
@@ -40,13 +42,17 @@ public class UnCompressNonDecimalLong implements 
UnCompressValue<long[]> {
   /**
    * longCompressor.
    */
-  private static Compressor compressor = CompressorFactory.getInstance();
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
 
   /**
    * value.
    */
   private long[] value;
 
+  private MeasureDataChunkStore<long[]> measureChunkStore;
+
+  private double divisionFactory;
+
   @Override public void setValue(long[] value) {
     this.value = value;
   }
@@ -66,7 +72,9 @@ public class UnCompressNonDecimalLong implements 
UnCompressValue<long[]> {
     return null;
   }
 
-  @Override public UnCompressValue uncompress(DataType dataType) {
+  @Override
+  public UnCompressValue uncompress(DataType dataType, byte[] compressData, 
int offset, int length,
+      int decimalPlaces, Object maxValueObject) {
     return null;
   }
 
@@ -83,14 +91,26 @@ public class UnCompressNonDecimalLong implements 
UnCompressValue<long[]> {
     return new UnCompressNonDecimalByte();
   }
 
-  @Override public CarbonReadDataHolder getValues(int decimal, Object 
maxValueObject) {
-    double[] vals = new double[value.length];
-    for (int i = 0; i < vals.length; i++) {
-      vals[i] = value[i] / Math.pow(10, decimal);
-    }
-    CarbonReadDataHolder dataHolder = new CarbonReadDataHolder();
-    dataHolder.setReadableDoubleValues(vals);
-    return dataHolder;
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException("Get long value is not supported");
+  }
+
+  @Override public double getDoubleValue(int index) {
+    return (measureChunkStore.getLong(index) / this.divisionFactory);
   }
 
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException("Get big decimalPlaces value is 
not supported");
+  }
+
+  @Override public void setUncompressValues(long[] data, int decimalPlaces, 
Object maxValueObject) {
+    this.measureChunkStore =
+        
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_LONG, 
data.length);
+    this.measureChunkStore.putData(data);
+    this.divisionFactory = Math.pow(10, decimalPlaces);
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalMaxMinByte.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalMaxMinByte.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalMaxMinByte.java
index 42629ff..dcc7850 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalMaxMinByte.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalMaxMinByte.java
@@ -23,10 +23,11 @@ import java.math.BigDecimal;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureChunkStoreFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastorage.store.compression.Compressor;
 import 
org.apache.carbondata.core.datastorage.store.compression.CompressorFactory;
 import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
-import 
org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
 import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
@@ -40,18 +41,23 @@ public class UnCompressNonDecimalMaxMinByte
   /**
    * compressor.
    */
-  private static Compressor compressor = CompressorFactory.getInstance();
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
   /**
    * value.
    */
   private byte[] value;
 
+  private MeasureDataChunkStore<byte[]> measureChunkStore;
+
+  private BigDecimal maxValue;
+
+  private double divisionFactor;
+
   @Override public ValueCompressonHolder.UnCompressValue<byte[]> getNew() {
     try {
       return (ValueCompressonHolder.UnCompressValue<byte[]>) clone();
     } catch (CloneNotSupportedException cloneNotSupportedException) {
-      LOGGER.error(cloneNotSupportedException,
-          cloneNotSupportedException.getMessage());
+      LOGGER.error(cloneNotSupportedException, 
cloneNotSupportedException.getMessage());
     }
     return null;
   }
@@ -62,10 +68,13 @@ public class UnCompressNonDecimalMaxMinByte
     return byte1;
   }
 
-  @Override public ValueCompressonHolder.UnCompressValue uncompress(DataType 
dataType) {
+  @Override
+  public ValueCompressonHolder.UnCompressValue uncompress(DataType dataType, 
byte[] compressData,
+      int offset, int length, int decimalPlaces, Object maxValueObject) {
     ValueCompressonHolder.UnCompressValue byte1 =
         ValueCompressionUtil.getUnCompressNonDecimalMaxMin(dataType);
-    ValueCompressonHolder.unCompress(dataType, byte1, value);
+    ValueCompressonHolder
+        .unCompress(dataType, byte1, compressData, offset, length, 
decimalPlaces, maxValueObject);
     return byte1;
   }
 
@@ -84,28 +93,37 @@ public class UnCompressNonDecimalMaxMinByte
     this.value = value;
   }
 
-  @Override public CarbonReadDataHolder getValues(int decimalVal, Object 
maxValueObject) {
-    double maxValue = (double) maxValueObject;
-    double[] vals = new double[value.length];
-    CarbonReadDataHolder dataHolder = new CarbonReadDataHolder();
-    for (int i = 0; i < vals.length; i++) {
-      vals[i] = value[i] / Math.pow(10, decimalVal);
-
-      if (value[i] == 0) {
-        vals[i] = maxValue;
-      } else {
-        BigDecimal diff = BigDecimal.valueOf(value[i] / Math.pow(10, 
decimalVal));
-        BigDecimal max = BigDecimal.valueOf(maxValue);
-        vals[i] = max.subtract(diff).doubleValue();
-      }
+  @Override public void setValue(byte[] value) {
+    this.value = value;
+  }
 
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException("Get long value is not supported");
+  }
+
+  @Override public double getDoubleValue(int index) {
+    byte byteValue = measureChunkStore.getByte(index);
+    if (byteValue == 0) {
+      return this.maxValue.doubleValue();
+    } else {
+      BigDecimal diff = BigDecimal.valueOf(byteValue / this.divisionFactor);
+      return maxValue.subtract(diff).doubleValue();
     }
-    dataHolder.setReadableDoubleValues(vals);
-    return dataHolder;
   }
 
-  @Override public void setValue(byte[] value) {
-    this.value = value;
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException("Get bigdecimal value is not 
supported");
   }
 
+  @Override public void setUncompressValues(byte[] data, int decimalPlaces, 
Object maxValueObject) {
+    this.measureChunkStore =
+        
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_BYTE, 
data.length);
+    this.measureChunkStore.putData(data);
+    this.maxValue = BigDecimal.valueOf((double) maxValueObject);
+    this.divisionFactor = Math.pow(10, decimalPlaces);
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalMaxMinDefault.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalMaxMinDefault.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalMaxMinDefault.java
index 190adbf..d720889 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalMaxMinDefault.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalMaxMinDefault.java
@@ -24,10 +24,12 @@ import java.nio.ByteBuffer;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureChunkStoreFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastorage.store.compression.Compressor;
 import 
org.apache.carbondata.core.datastorage.store.compression.CompressorFactory;
+import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
 import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder.UnCompressValue;
-import 
org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
 import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
@@ -40,12 +42,16 @@ public class UnCompressNonDecimalMaxMinDefault implements 
UnCompressValue<double
   /**
    * doubleCompressor.
    */
-  private static Compressor compressor = CompressorFactory.getInstance();
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
   /**
    * value.
    */
   private double[] value;
 
+  private MeasureDataChunkStore<double[]> measureChunkStore;
+  private BigDecimal maxValue;
+  private double divisionFactor;
+
   @Override public void setValue(double[] value) {
     this.value = value;
   }
@@ -78,28 +84,40 @@ public class UnCompressNonDecimalMaxMinDefault implements 
UnCompressValue<double
     return new UnCompressNonDecimalMaxMinByte();
   }
 
-  @Override public UnCompressValue uncompress(DataType dataType) {
+  @Override
+  public ValueCompressonHolder.UnCompressValue uncompress(DataType dataType, 
byte[] compressData,
+      int offset, int length, int mantissa, Object maxValueObject) {
     return null;
   }
 
-  @Override public CarbonReadDataHolder getValues(int decimal, Object 
maxValueObject) {
-    double maxVal = (double) maxValueObject;
-    double[] vals = new double[value.length];
-    CarbonReadDataHolder holder = new CarbonReadDataHolder();
-    for (int i = 0; i < vals.length; i++) {
-      vals[i] = value[i] / Math.pow(10, decimal);
-
-      if (value[i] == 0) {
-        vals[i] = maxVal;
-      } else {
-        BigDecimal diff = BigDecimal.valueOf(value[i] / Math.pow(10, decimal));
-        BigDecimal max = BigDecimal.valueOf(maxVal);
-        vals[i] = max.subtract(diff).doubleValue();
-      }
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException("Get long value is not supported");
+  }
 
+  @Override public double getDoubleValue(int index) {
+    double doubleValue = measureChunkStore.getDouble(index);
+    if (doubleValue == 0) {
+      return maxValue.doubleValue();
+    } else {
+      BigDecimal diff = BigDecimal.valueOf(doubleValue / this.divisionFactor);
+      return maxValue.subtract(diff).doubleValue();
     }
-    holder.setReadableDoubleValues(vals);
-    return holder;
   }
 
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException("Get big decimal value is not 
supported");
+  }
+
+  @Override
+  public void setUncompressValues(double[] data, int decimalPlaces, Object 
maxValueObject) {
+    this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
+        .getMeasureDataChunkStore(DataType.DATA_DOUBLE, data.length);
+    this.measureChunkStore.putData(data);
+    this.maxValue = BigDecimal.valueOf((double) maxValueObject);
+    this.divisionFactor = Math.pow(10, decimalPlaces);
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalMaxMinFloat.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalMaxMinFloat.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalMaxMinFloat.java
deleted file mode 100644
index 54ca7b4..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalMaxMinFloat.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.core.datastorage.store.compression.nondecimal;
-
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastorage.store.compression.Compressor;
-import 
org.apache.carbondata.core.datastorage.store.compression.CompressorFactory;
-import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
-import 
org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
-import org.apache.carbondata.core.util.ValueCompressionUtil;
-
-public class UnCompressNonDecimalMaxMinFloat
-    implements ValueCompressonHolder.UnCompressValue<float[]> {
-  /**
-   * Attribute for Carbon LOGGER
-   */
-  private static final LogService LOGGER =
-      
LogServiceFactory.getLogService(UnCompressNonDecimalMaxMinFloat.class.getName());
-  /**
-   * floatCompressor
-   */
-  private static Compressor compressor = CompressorFactory.getInstance();
-  /**
-   * value.
-   */
-  private float[] value;
-
-  @Override public void setValue(float[] value) {
-    this.value = value;
-
-  }
-
-  @Override public ValueCompressonHolder.UnCompressValue getNew() {
-    try {
-      return (ValueCompressonHolder.UnCompressValue) clone();
-    } catch (CloneNotSupportedException exc1) {
-      LOGGER.error(exc1, exc1.getMessage());
-    }
-    return null;
-  }
-
-  @Override public ValueCompressonHolder.UnCompressValue compress() {
-
-    UnCompressNonDecimalMaxMinByte byte1 = new 
UnCompressNonDecimalMaxMinByte();
-    byte1.setValue(compressor.compressFloat(value));
-    return byte1;
-  }
-
-  @Override public byte[] getBackArrayData() {
-    return ValueCompressionUtil.convertToBytes(value);
-  }
-
-  @Override
-  public ValueCompressonHolder.UnCompressValue 
uncompress(ValueCompressionUtil.DataType dataType) {
-    return null;
-  }
-
-  @Override public void setValueInBytes(byte[] value) {
-    ByteBuffer buffer = ByteBuffer.wrap(value);
-    this.value = ValueCompressionUtil.convertToFloatArray(buffer, 
value.length);
-  }
-
-  /**
-   * @see ValueCompressonHolder.UnCompressValue#getCompressorObject()
-   */
-  @Override public ValueCompressonHolder.UnCompressValue getCompressorObject() 
{
-    return new UnCompressNonDecimalMaxMinByte();
-  }
-
-  @Override public CarbonReadDataHolder getValues(int decimal, Object 
maxValueObject) {
-    double maxValue = (double) maxValueObject;
-    double[] vals = new double[value.length];
-    CarbonReadDataHolder holder = new CarbonReadDataHolder();
-    for (int i = 0; i < vals.length; i++) {
-      vals[i] = value[i] / Math.pow(10, decimal);
-
-      if (value[i] == 0) {
-        vals[i] = maxValue;
-      } else {
-        BigDecimal diff = BigDecimal.valueOf(value[i] / Math.pow(10, decimal));
-        BigDecimal max = BigDecimal.valueOf(maxValue);
-        vals[i] = max.subtract(diff).doubleValue();
-      }
-    }
-    holder.setReadableDoubleValues(vals);
-    return holder;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8d9babe3/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalMaxMinInt.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalMaxMinInt.java
 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalMaxMinInt.java
index 3e1b241..d834f6a 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalMaxMinInt.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastorage/store/compression/nondecimal/UnCompressNonDecimalMaxMinInt.java
@@ -24,10 +24,12 @@ import java.nio.ByteBuffer;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureChunkStoreFactory;
+import 
org.apache.carbondata.core.carbon.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastorage.store.compression.Compressor;
 import 
org.apache.carbondata.core.datastorage.store.compression.CompressorFactory;
+import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
 import 
org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder.UnCompressValue;
-import 
org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
 import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
@@ -40,12 +42,16 @@ public class UnCompressNonDecimalMaxMinInt implements 
UnCompressValue<int[]> {
   /**
    * intCompressor.
    */
-  private static Compressor compressor = CompressorFactory.getInstance();
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
   /**
    * value.
    */
   private int[] value;
 
+  private MeasureDataChunkStore<int[]> measureChunkStore;
+  private double divisionFactor;
+  private BigDecimal maxValue;
+
   @Override public void setValue(int[] value) {
     this.value = value;
 
@@ -81,28 +87,40 @@ public class UnCompressNonDecimalMaxMinInt implements 
UnCompressValue<int[]> {
     return new UnCompressNonDecimalMaxMinByte();
   }
 
-  @Override public CarbonReadDataHolder getValues(int decimal, Object 
maxValueObject) {
-    double maxValue = (double) maxValueObject;
-    double[] vals = new double[value.length];
-    CarbonReadDataHolder dataHolderInfo = new CarbonReadDataHolder();
-    for (int i = 0; i < vals.length; i++) {
-      vals[i] = value[i] / Math.pow(10, decimal);
-
-      if (value[i] == 0) {
-        vals[i] = maxValue;
-      } else {
-        BigDecimal diff = BigDecimal.valueOf(value[i] / Math.pow(10, decimal));
-        BigDecimal max = BigDecimal.valueOf(maxValue);
-        vals[i] = max.subtract(diff).doubleValue();
-      }
+  @Override
+  public ValueCompressonHolder.UnCompressValue uncompress(DataType dataType, 
byte[] compressData,
+      int offset, int length, int decimalPlaces, Object maxValueObject) {
+    return null;
+  }
+
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException("Get long value is not supported");
+  }
 
+  @Override public double getDoubleValue(int index) {
+    int intValue = measureChunkStore.getInt(index);
+    if (intValue == 0) {
+      return maxValue.doubleValue();
+    } else {
+      BigDecimal diff = BigDecimal.valueOf(intValue / this.divisionFactor);
+      return maxValue.subtract(diff).doubleValue();
     }
-    dataHolderInfo.setReadableDoubleValues(vals);
-    return dataHolderInfo;
   }
 
-  @Override public UnCompressValue uncompress(DataType dataType) {
-    return null;
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException("Get big decimal value is not 
supported");
   }
 
+  @Override public void setUncompressValues(int[] data, int decimalPlaces, 
Object maxValueObject) {
+    this.measureChunkStore =
+        
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_INT, 
data.length);
+    this.measureChunkStore.putData(data);
+    this.maxValue = BigDecimal.valueOf((double) maxValueObject);
+    this.divisionFactor = Math.pow(10, decimalPlaces);
+
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
 }

Reply via email to