http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/compression/WriterCompressModel.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/WriterCompressModel.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/WriterCompressModel.java
new file mode 100644
index 0000000..368209f
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/WriterCompressModel.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastore.compression;
+
+import org.apache.carbondata.core.util.CompressionFinder;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
+
+public class WriterCompressModel {
+
+  /**
+   * DataType[]  variable.
+   */
+  private ValueCompressionUtil.DataType[] convertedDataType;
+  /**
+   * DataType[]  variable.
+   */
+  private ValueCompressionUtil.DataType[] actualDataType;
+
+  /**
+   * maxValue
+   */
+  private Object[] maxValue;
+  /**
+   * minValue.
+   */
+  private Object[] minValue;
+
+  /**
+   * uniqueValue
+   */
+  private Object[] uniqueValue;
+  /**
+   * mantissa.
+   */
+  private int[] mantissa;
+
+  /**
+   * aggType
+   */
+  private char[] type;
+
+  /**
+   * dataTypeSelected
+   */
+  private byte[] dataTypeSelected;
+  /**
+   * unCompressValues.
+   */
+  private ValueCompressionHolder[] valueHolder;
+
+  private CompressionFinder[] compressionFinders;
+
+  /**
+   * @return the convertedDataType
+   */
+  public ValueCompressionUtil.DataType[] getConvertedDataType() {
+    return convertedDataType;
+  }
+
+  /**
+   * @param convertedDataType the convertedDataType to set
+   */
+  public void setConvertedDataType(ValueCompressionUtil.DataType[] 
convertedDataType) {
+    this.convertedDataType = convertedDataType;
+  }
+
+  /**
+   * @return the actualDataType
+   */
+  public ValueCompressionUtil.DataType[] getActualDataType() {
+    return actualDataType;
+  }
+
+  /**
+   * @param actualDataType
+   */
+  public void setActualDataType(ValueCompressionUtil.DataType[] 
actualDataType) {
+    this.actualDataType = actualDataType;
+  }
+
+  /**
+   * @return the maxValue
+   */
+  public Object[] getMaxValue() {
+    return maxValue;
+  }
+
+  /**
+   * @param maxValue the maxValue to set
+   */
+  public void setMaxValue(Object[] maxValue) {
+    this.maxValue = maxValue;
+  }
+
+  /**
+   * @return the mantissa
+   */
+  public int[] getMantissa() {
+    return mantissa;
+  }
+
+  /**
+   * @param mantissa the mantissa to set
+   */
+  public void setMantissa(int[] mantissa) {
+    this.mantissa = mantissa;
+  }
+
+  /**
+   * getUnCompressValues().
+   *
+   * @return the unCompressValues
+   */
+  public ValueCompressionHolder[] getValueCompressionHolder() {
+    return valueHolder;
+  }
+
+  /**
+   * @param valueHolder set the ValueCompressionHolder
+   */
+  public void setValueCompressionHolder(ValueCompressionHolder[] valueHolder) {
+    this.valueHolder = valueHolder;
+  }
+
+  /**
+   * getMinValue
+   *
+   * @return
+   */
+  public Object[] getMinValue() {
+    return minValue;
+  }
+
+  /**
+   * setMinValue.
+   *
+   * @param minValue
+   */
+  public void setMinValue(Object[] minValue) {
+    this.minValue = minValue;
+  }
+
+  /**
+   * @return the aggType
+   */
+  public char[] getType() {
+    return type;
+  }
+
+  /**
+   * @param type the type to set
+   */
+  public void setType(char[] type) {
+    this.type = type;
+  }
+
+  /**
+   * @return the dataTypeSelected
+   */
+  public byte[] getDataTypeSelected() {
+    return dataTypeSelected;
+  }
+
+  /**
+   * @param dataTypeSelected the dataTypeSelected to set
+   */
+  public void setDataTypeSelected(byte[] dataTypeSelected) {
+    this.dataTypeSelected = dataTypeSelected;
+  }
+
+  /**
+   * getUniqueValue
+   *
+   * @return
+   */
+  public Object[] getUniqueValue() {
+    return uniqueValue;
+  }
+
+  /**
+   * setUniqueValue
+   *
+   * @param uniqueValue
+   */
+  public void setUniqueValue(Object[] uniqueValue) {
+    this.uniqueValue = uniqueValue;
+  }
+
+  public void setCompressionFinders(CompressionFinder[] compressionFinders) {
+    this.compressionFinders = compressionFinders;
+  }
+
+  public CompressionFinder[] getCompressionFinders() {
+    return this.compressionFinders;
+  }
+
+  /**
+   * @return the compType
+   */
+  public ValueCompressionUtil.COMPRESSION_TYPE getCompType(int index) {
+    return this.compressionFinders[index].getCompType();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinByte.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinByte.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinByte.java
new file mode 100644
index 0000000..e1f4e06
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinByte.java
@@ -0,0 +1,115 @@
+/*
+3 * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastore.compression.decimal;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+public class CompressionMaxMinByte extends ValueCompressionHolder<byte[]> {
+
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CompressionMaxMinByte.class.getName());
+
+  /**
+   * compressor.
+   */
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
+
+  /**
+   * value.
+   */
+  protected byte[] value;
+
+  private MeasureDataChunkStore<byte[]> measureChunkStore;
+
+  /**
+   * actual data type
+   */
+  protected DataType actualDataType;
+
+  private double maxValue;
+
+  public CompressionMaxMinByte(DataType actualDataType) {
+    this.actualDataType = actualDataType;
+  }
+
+  @Override public byte[] getValue() {return this.value; }
+
+  @Override public void setValue(byte[] value) {
+    this.value = value;
+  }
+
+  @Override public void compress() {
+    compressedValue = super.compress(compressor, DataType.DATA_BYTE, value);
+  }
+
+  @Override
+  public void uncompress(DataType dataType, byte[] compressedData, int offset, 
int length,
+      int decimalPlaces, Object maxValueObject) {
+    super.unCompress(compressor, dataType, compressedData, offset, length);
+    setUncompressedValues(value, maxValueObject);
+
+  }
+
+  @Override public void setValueInBytes(byte[] value) {
+    this.value = value;
+  }
+
+  @Override public long getLongValue(int index) {
+    byte byteValue = measureChunkStore.getByte(index);
+    return (long) (maxValue - byteValue);
+  }
+
+  @Override public double getDoubleValue(int index) {
+    byte byteValue = measureChunkStore.getByte(index);
+    return (maxValue - byteValue);
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException(
+      "Big decimal value is not defined for CompressionMaxMinByte");
+  }
+
+  private void setUncompressedValues(byte[] data, Object maxValueObject) {
+    this.measureChunkStore =
+        
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_BYTE, 
data.length);
+    this.measureChunkStore.putData(data);
+    if (maxValueObject instanceof Long) {
+      this.maxValue = (long)maxValueObject;
+    } else {
+      this.maxValue = (double) maxValueObject;
+    }
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinDefault.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinDefault.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinDefault.java
new file mode 100644
index 0000000..89c0016
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinDefault.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastore.compression.decimal;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+
+public class CompressionMaxMinDefault extends ValueCompressionHolder<double[]> 
{
+
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      
LogServiceFactory.getLogService(CompressionMaxMinDefault.class.getName());
+
+  /**
+   * compressor.
+   */
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
+
+  /**
+   * value.
+   */
+  private double[] value;
+
+  private MeasureDataChunkStore<double[]> measureChunkStore;
+
+  /**
+   * actual data type
+   */
+  private DataType actualDataType;
+
+  private double maxValue;
+
+  public CompressionMaxMinDefault(DataType actualDataType) {
+    this.actualDataType = actualDataType;
+  }
+
+  @Override public void setValue(double[] value) {
+    this.value = value;
+  }
+
+  @Override public double[] getValue() {return this.value; }
+
+  @Override public void compress() {
+    compressedValue = super.compress(compressor, DataType.DATA_DOUBLE, value);
+  }
+
+  @Override public void uncompress(DataType dataType, byte[] compressedData,
+      int offset, int length, int decimalPlaces, Object maxValueObject) {
+    super.unCompress(compressor, dataType, compressedData, offset, length);
+    setUncompressedValues(value, maxValueObject);
+  }
+
+  @Override public void setValueInBytes(byte[] value) {
+    ByteBuffer buffer = ByteBuffer.wrap(value);
+    this.value = ValueCompressionUtil.convertToDoubleArray(buffer, 
value.length);
+  }
+
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException(
+      "Long value is not defined for CompressionMaxMinDefault");
+  }
+
+  @Override public double getDoubleValue(int index) {
+    double doubleValue = measureChunkStore.getDouble(index);
+    return maxValue - doubleValue;
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException(
+      "Big decimal value is not defined for CompressionMaxMinDefault");
+  }
+
+  private void setUncompressedValues(double[] data, Object maxValueObject) {
+    this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
+            .getMeasureDataChunkStore(DataType.DATA_DOUBLE, data.length);
+    this.measureChunkStore.putData(data);
+    if (maxValueObject instanceof Long) {
+      this.maxValue = (long) maxValueObject;
+    } else {
+      this.maxValue = (double) maxValueObject;
+    }
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinInt.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinInt.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinInt.java
new file mode 100644
index 0000000..fdd11ee
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinInt.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastore.compression.decimal;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+public class CompressionMaxMinInt extends ValueCompressionHolder<int[]> {
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CompressionMaxMinInt.class.getName());
+
+  /**
+   * compressor.
+   */
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
+
+  private MeasureDataChunkStore<int[]> measureChunkStore;
+
+  /**
+   * value.
+   */
+  private int[] value;
+
+  private DataType actualDataType;
+
+  private double maxValue;
+
+  public CompressionMaxMinInt(DataType actualType) {
+    this.actualDataType = actualType;
+  }
+
+  @Override public void setValue(int[] value) {
+    this.value = value;
+  }
+
+  @Override public int[] getValue() { return this.value; }
+
+  @Override
+  public void uncompress(DataType dataType, byte[] compressedData, int offset, 
int length,
+      int decimalPlaces, Object maxValueObject) {
+    super.unCompress(compressor, dataType, compressedData, offset, length);
+    setUncompressedValues(value, maxValueObject);
+  }
+
+  @Override public void compress() {
+    compressedValue = super.compress(compressor, DataType.DATA_INT, value);
+  }
+
+  @Override public void setValueInBytes(byte[] value) {
+    ByteBuffer buffer = ByteBuffer.wrap(value);
+    this.value = ValueCompressionUtil.convertToIntArray(buffer, value.length);
+  }
+
+  @Override public long getLongValue(int index) {
+    int intValue = measureChunkStore.getInt(index);
+    return (long) (maxValue - intValue);
+  }
+
+  @Override public double getDoubleValue(int index) {
+    int intValue = measureChunkStore.getInt(index);
+    return maxValue - intValue;
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException(
+      "Big decimal value is not defined for CompressionMaxMinInt");
+  }
+
+  private void setUncompressedValues(int[] data, Object maxValueObject) {
+    this.measureChunkStore =
+        
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_INT, 
data.length);
+    this.measureChunkStore.putData(data);
+    if (maxValueObject instanceof Long) {
+      this.maxValue = (long) maxValueObject;
+    } else {
+      this.maxValue = (double) maxValueObject;
+    }
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinLong.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinLong.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinLong.java
new file mode 100644
index 0000000..b8decac
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinLong.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastore.compression.decimal;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+public class CompressionMaxMinLong extends ValueCompressionHolder<long[]> {
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CompressionMaxMinLong.class.getName());
+  /**
+   * compressor.
+   */
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
+
+  private MeasureDataChunkStore<long[]> measureChunkStore;
+
+  /**
+   * value.
+   */
+  protected long[] value;
+
+  protected DataType actualDataType;
+
+  private double maxValue;
+
+  public CompressionMaxMinLong(DataType actualDataType) {
+    this.actualDataType = actualDataType;
+  }
+
+  @Override public void compress() {
+    compressedValue = super.compress(compressor, DataType.DATA_LONG, value);
+  }
+
+  @Override public void setValue(long[] value) {
+    this.value = value;
+
+  }
+
+  @Override
+  public void uncompress(DataType dataType, byte[] compressedData, int offset, 
int length,
+      int decimalPlaces, Object maxValueObject) {
+    super.unCompress(compressor, dataType, compressedData, offset, length);
+    setUncompressValues(value, maxValueObject);
+  }
+
+  @Override public long[] getValue() {return this.value; }
+
+  @Override public void setValueInBytes(byte[] value) {
+    ByteBuffer buffer = ByteBuffer.wrap(value);
+    this.value = ValueCompressionUtil.convertToLongArray(buffer, value.length);
+  }
+
+  @Override public long getLongValue(int index) {
+    long longValue = measureChunkStore.getLong(index);
+    return (long) maxValue - longValue;
+  }
+
+  @Override public double getDoubleValue(int index) {
+    long longValue = measureChunkStore.getLong(index);
+    return maxValue - longValue;
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException(
+      "Big decimal value is not defined for CompressionMaxMinLong");
+  }
+
+  private void setUncompressValues(long[] data, Object maxValueObject) {
+    this.measureChunkStore =
+        
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_LONG, 
data.length);
+    this.measureChunkStore.putData(data);
+    if (maxValueObject instanceof Long) {
+      this.maxValue = (long) maxValueObject;
+    } else {
+      this.maxValue = (double) maxValueObject;
+    }
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinShort.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinShort.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinShort.java
new file mode 100644
index 0000000..812d887
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinShort.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastore.compression.decimal;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+public class CompressionMaxMinShort extends ValueCompressionHolder<short[]> {
+
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CompressionMaxMinShort.class.getName());
+
+  /**
+   * shortCompressor.
+   */
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
+
+  private MeasureDataChunkStore<short[]> measureChunkStore;
+
+  /**
+   * value.
+   */
+  private short[] value;
+
+  private DataType actualDataType;
+
+  private double maxValue;
+
+  public CompressionMaxMinShort(DataType actualDataType) {
+    this.actualDataType = actualDataType;
+  }
+
+  @Override public void setValue(short[] value) {
+    this.value = value;
+
+  }
+
+  @Override
+  public void uncompress(DataType dataType, byte[] compressedData, int offset, 
int length,
+      int decimalPlaces, Object maxValueObject) {
+    super.unCompress(compressor, dataType, compressedData, offset, length);
+    setUncompressedValues(value, maxValueObject);
+  }
+
+  @Override public short[] getValue() {return this.value; }
+
+  @Override public void compress() {
+    compressedValue = super.compress(compressor, DataType.DATA_SHORT, value);
+  }
+
+  @Override public void setValueInBytes(byte[] value) {
+    ByteBuffer buffer = ByteBuffer.wrap(value);
+    this.value = ValueCompressionUtil.convertToShortArray(buffer, 
value.length);
+  }
+
+  @Override public long getLongValue(int index) {
+    short shortValue = measureChunkStore.getShort(index);
+    return (long) maxValue - shortValue;
+  }
+
+  @Override public double getDoubleValue(int index) {
+    short shortValue = measureChunkStore.getShort(index);
+    return maxValue - shortValue;
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException(
+      "Big decimal value is not defined for CompressionMaxMinShort");
+  }
+
+  private void setUncompressedValues(short[] data, Object maxValueObject) {
+    this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
+        .getMeasureDataChunkStore(DataType.DATA_SHORT, data.length);
+    this.measureChunkStore.putData(data);
+    if (maxValueObject instanceof Long) {
+      this.maxValue = (long) maxValueObject;
+    } else {
+      this.maxValue = (double) maxValueObject;
+    }
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalByte.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalByte.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalByte.java
new file mode 100644
index 0000000..eb3da93
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalByte.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastore.compression.nondecimal;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+public class CompressionNonDecimalByte extends ValueCompressionHolder<byte[]> {
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      
LogServiceFactory.getLogService(CompressionNonDecimalByte.class.getName());
+
+  /**
+   * compressor.
+   */
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
+
+  /**
+   * value.
+   */
+  private byte[] value;
+
+  private MeasureDataChunkStore<byte[]> measureChunkStore;
+
+  private double divisionFactory;
+
+  @Override public void setValue(byte[] value) {
+    this.value = value;
+  }
+
+  @Override public byte[] getValue() {return this.value; }
+
+  @Override public void compress() {
+    compressedValue = super.compress(compressor, DataType.DATA_BYTE, value);
+  }
+
+  @Override
+  public void uncompress(DataType dataType, byte[] compressedData,
+      int offset, int length, int decimalPlaces, Object maxValueObject) {
+    super.unCompress(compressor, dataType, compressedData, offset, length);
+    setUncompressedValues(value, decimalPlaces);
+  }
+
+  @Override public void setValueInBytes(byte[] value) {
+    this.value = value;
+  }
+
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException(
+      "Long value is not defined for CompressionNonDecimalByte");
+  }
+
+  @Override public double getDoubleValue(int index) {
+    return (measureChunkStore.getByte(index) / this.divisionFactory);
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException(
+      "Big decimal value is not defined for CompressionNonDecimalByte");
+  }
+
+  private void setUncompressedValues(byte[] data, int decimalPlaces) {
+    this.measureChunkStore =
+        
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_BYTE, 
data.length);
+    this.measureChunkStore.putData(data);
+    this.divisionFactory = Math.pow(10, decimalPlaces);
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalDefault.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalDefault.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalDefault.java
new file mode 100644
index 0000000..cf8bda9
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalDefault.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastore.compression.nondecimal;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+public class CompressionNonDecimalDefault extends 
ValueCompressionHolder<double[]> {
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      
LogServiceFactory.getLogService(CompressionNonDecimalDefault.class.getName());
+  /**
+   * doubleCompressor.
+   */
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
+  /**
+   * value.
+   */
+  private double[] value;
+
+  private MeasureDataChunkStore<double[]> measureChunkStore;
+
+  private double divisionFactory;
+
+  @Override
+  public void uncompress(DataType dataType, byte[] compressedData, int offset, 
int length,
+      int decimalPlaces, Object maxValueObject) {
+    super.unCompress(compressor, dataType, compressedData, offset, length);
+    setUncompressedValues(value, decimalPlaces);
+  }
+
+  @Override public void compress() {
+    compressedValue = super.compress(compressor, DataType.DATA_DOUBLE, value);
+  }
+
+  @Override public void setValue(double[] value) {
+    this.value = value;
+
+  }
+
+  @Override public double[] getValue() {return this.value; }
+
+  @Override public void setValueInBytes(byte[] value) {
+    ByteBuffer buffer = ByteBuffer.wrap(value);
+    this.value = ValueCompressionUtil.convertToDoubleArray(buffer, 
value.length);
+  }
+
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException(
+      "Long value is not defined for CompressionNonDecimalDefault");
+  }
+
+  @Override public double getDoubleValue(int index) {
+    return (measureChunkStore.getDouble(index) / divisionFactory);
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException(
+      "Big decimal value is not defined for CompressionNonDecimalDefault");
+  }
+
+  private void setUncompressedValues(double[] data, int decimalPlaces) {
+    this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
+        .getMeasureDataChunkStore(DataType.DATA_DOUBLE, data.length);
+    this.measureChunkStore.putData(data);
+    this.divisionFactory = Math.pow(10, decimalPlaces);
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalInt.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalInt.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalInt.java
new file mode 100644
index 0000000..29ac7d0
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalInt.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastore.compression.nondecimal;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+public class CompressionNonDecimalInt extends ValueCompressionHolder<int[]> {
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      
LogServiceFactory.getLogService(CompressionNonDecimalInt.class.getName());
+  /**
+   * intCompressor.
+   */
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
+  /**
+   * value.
+   */
+  private int[] value;
+
+  private MeasureDataChunkStore<int[]> measureChunkStore;
+
+  private double divisionFactory;
+
+  @Override public void setValue(int[] value) {
+    this.value = value;
+  }
+
+  @Override public int[] getValue() { return this.value; }
+
+  @Override public void compress() {
+    compressedValue = super.compress(compressor, DataType.DATA_INT, value);
+  }
+
+  @Override public void setValueInBytes(byte[] bytesArr) {
+    ByteBuffer buffer = ByteBuffer.wrap(bytesArr);
+    this.value = ValueCompressionUtil.convertToIntArray(buffer, 
bytesArr.length);
+  }
+
+  @Override
+  public void uncompress(DataType dataType, byte[] compressedData, int offset, 
int length,
+      int decimalPlaces, Object maxValueObject) {
+    super.unCompress(compressor, dataType, compressedData, offset, length);
+    setUncompressedValues(value, decimalPlaces);
+  }
+
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException(
+      "Long value is not defined for CompressionNonDecimalInt");
+  }
+
+  @Override public double getDoubleValue(int index) {
+    return (measureChunkStore.getInt(index) / this.divisionFactory);
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException(
+      "Big decimal value is not defined for CompressionNonDecmialInt");
+  }
+
+  private void setUncompressedValues(int[] data, int decimalPlaces) {
+    this.measureChunkStore =
+        
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_INT, 
data.length);
+    this.measureChunkStore.putData(data);
+    this.divisionFactory = Math.pow(10, decimalPlaces);
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalLong.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalLong.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalLong.java
new file mode 100644
index 0000000..c7c5f5f
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalLong.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastore.compression.nondecimal;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+public class CompressionNonDecimalLong extends ValueCompressionHolder<long[]> {
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      
LogServiceFactory.getLogService(CompressionNonDecimalLong.class.getName());
+
+  /**
+   * longCompressor.
+   */
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
+
+  /**
+   * value.
+   */
+  private long[] value;
+
+  private MeasureDataChunkStore<long[]> measureChunkStore;
+
+  private double divisionFactory;
+
+  @Override public void setValue(long[] value) {
+    this.value = value;
+  }
+
+  @Override public long[] getValue() { return this.value; }
+
+  @Override public void compress() {
+    compressedValue = super.compress(compressor, DataType.DATA_LONG, value);
+  }
+
+  @Override public void uncompress(DataType dataType, byte[] compressedData, 
int offset,
+      int length, int decimalPlaces, Object maxValueObject) {
+    super.unCompress(compressor, dataType, compressedData, offset, length);
+    setUncompressedValues(value, decimalPlaces);
+  }
+
+  @Override public void setValueInBytes(byte[] bytes) {
+    ByteBuffer buffer = ByteBuffer.wrap(bytes);
+    this.value = ValueCompressionUtil.convertToLongArray(buffer, bytes.length);
+  }
+
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException(
+      "Long value is not defined for CompressionNonDecimalLong");
+  }
+
+  @Override public double getDoubleValue(int index) {
+    return (measureChunkStore.getLong(index) / this.divisionFactory);
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException(
+      "Big decimal value is not defined for CompressionNonDecimalLong");
+  }
+
+  private void setUncompressedValues(long[] data, int decimalPlaces) {
+    this.measureChunkStore =
+        
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_LONG, 
data.length);
+    this.measureChunkStore.putData(data);
+    this.divisionFactory = Math.pow(10, decimalPlaces);
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinByte.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinByte.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinByte.java
new file mode 100644
index 0000000..6bf7a2c
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinByte.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastore.compression.nondecimal;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+public class CompressionNonDecimalMaxMinByte extends 
ValueCompressionHolder<byte[]> {
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      
LogServiceFactory.getLogService(CompressionNonDecimalMaxMinByte.class.getName());
+
+  /**
+   * compressor.
+   */
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
+
+  /**
+   * value.
+   */
+  private byte[] value;
+
+  private MeasureDataChunkStore<byte[]> measureChunkStore;
+
+  private BigDecimal maxValue;
+
+  private double divisionFactor;
+
+  @Override
+  public void uncompress(DataType dataType, byte[] compressedData,
+      int offset, int length, int decimalPlaces, Object maxValueObject) {
+    super.unCompress(compressor, dataType, compressedData, offset, length);
+    setUncompressedValues(value, decimalPlaces, maxValueObject );
+  }
+
+  @Override public void compress() {
+    compressedValue = super.compress(compressor, DataType.DATA_BYTE, value);
+  }
+
+  @Override public void setValueInBytes(byte[] value) {
+    this.value = value;
+  }
+
+  @Override public void setValue(byte[] value) {
+    this.value = value;
+  }
+
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException(
+      "Long value is not defined for CompressionNonDecimalMaxMinByte");
+  }
+
+  @Override public double getDoubleValue(int index) {
+    byte byteValue = measureChunkStore.getByte(index);
+    BigDecimal diff = BigDecimal.valueOf(byteValue / this.divisionFactor);
+    return maxValue.subtract(diff).doubleValue();
+
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException(
+      "Big decimal value is not defined for CompressionNonDecimalMaxMinByte");
+  }
+
+  private void setUncompressedValues(byte[] data, int decimalPlaces, Object 
maxValueObject) {
+    this.measureChunkStore =
+        
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_BYTE, 
data.length);
+    this.measureChunkStore.putData(data);
+    this.maxValue = BigDecimal.valueOf((double) maxValueObject);
+    this.divisionFactor = Math.pow(10, decimalPlaces);
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+
+  @Override public byte[] getValue() { return this.value; }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinDefault.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinDefault.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinDefault.java
new file mode 100644
index 0000000..3a57ec8
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinDefault.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastore.compression.nondecimal;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+public class CompressionNonDecimalMaxMinDefault extends 
ValueCompressionHolder<double[]> {
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      
LogServiceFactory.getLogService(CompressionNonDecimalMaxMinDefault.class.getName());
+
+  /**
+   * doubleCompressor.
+   */
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
+
+  /**
+   * value.
+   */
+  private double[] value;
+
+  private MeasureDataChunkStore<double[]> measureChunkStore;
+
+  private BigDecimal maxValue;
+
+  private double divisionFactor;
+
+  @Override public void setValue(double[] value) {
+    this.value = value;
+  }
+
+  @Override public double[] getValue() { return this.value; }
+
+  @Override public void compress() {
+    compressedValue = super.compress(compressor, DataType.DATA_DOUBLE, value);
+  }
+
+  @Override public void setValueInBytes(byte[] value) {
+    ByteBuffer buffer = ByteBuffer.wrap(value);
+    this.value = ValueCompressionUtil.convertToDoubleArray(buffer, 
value.length);
+  }
+
+  @Override
+  public void uncompress(DataType dataType, byte[] compressedData,
+      int offset, int length, int decimalPlaces, Object maxValueObject) {
+    super.unCompress(compressor,dataType,compressedData, offset, length);
+    setUncompressedValues(value, decimalPlaces, maxValueObject);
+  }
+
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException(
+      "Long value is not defined for CompressionNonDecimalMaxMinDefault");
+  }
+
+  @Override public double getDoubleValue(int index) {
+    double doubleValue = measureChunkStore.getDouble(index);
+    BigDecimal diff = BigDecimal.valueOf(doubleValue / this.divisionFactor);
+    return maxValue.subtract(diff).doubleValue();
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException(
+      "Big decimal value is not defined for 
CompressionNonDecimalMaxMinDefault");
+  }
+
+  private void setUncompressedValues(double[] data, int decimalPlaces, Object 
maxValueObject) {
+    this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
+        .getMeasureDataChunkStore(DataType.DATA_DOUBLE, data.length);
+    this.measureChunkStore.putData(data);
+    this.maxValue = BigDecimal.valueOf((double) maxValueObject);
+    this.divisionFactor = Math.pow(10, decimalPlaces);
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinInt.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinInt.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinInt.java
new file mode 100644
index 0000000..0597672
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinInt.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastore.compression.nondecimal;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+public class CompressionNonDecimalMaxMinInt extends 
ValueCompressionHolder<int[]> {
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      
LogServiceFactory.getLogService(CompressionNonDecimalMaxMinInt.class.getName());
+  /**
+   * intCompressor.
+   */
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
+  /**
+   * value.
+   */
+  private int[] value;
+
+  private MeasureDataChunkStore<int[]> measureChunkStore;
+
+  private double divisionFactor;
+
+  private BigDecimal maxValue;
+
+  @Override public void setValue(int[] value) {
+    this.value = value;
+  }
+
+  @Override public int[] getValue() {return this.value; }
+
+  @Override public void compress() {
+    compressedValue = super.compress(compressor, DataType.DATA_INT, value);
+  }
+
+  @Override public void setValueInBytes(byte[] value) {
+    ByteBuffer buffer = ByteBuffer.wrap(value);
+    this.value = ValueCompressionUtil.convertToIntArray(buffer, value.length);
+  }
+
+  @Override
+  public void uncompress(DataType dataType, byte[] compressedData,
+      int offset, int length, int decimalPlaces, Object maxValueObject) {
+    super.unCompress(compressor, dataType, compressedData, offset, length);
+    setUncompressedValues(value, decimalPlaces, maxValueObject);
+  }
+
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException(
+      "Long value is not defined for CompressionNonDecimalMaxMinInt");
+  }
+
+  @Override public double getDoubleValue(int index) {
+    int intValue = measureChunkStore.getInt(index);
+    BigDecimal diff = BigDecimal.valueOf(intValue / this.divisionFactor);
+    return maxValue.subtract(diff).doubleValue();
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException(
+      "Big decimal value is not defined for CompressionNonDecimalMaxMinInt");
+  }
+
+  private void setUncompressedValues(int[] data, int decimalPlaces, Object 
maxValueObject) {
+    this.measureChunkStore =
+        
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_INT, 
data.length);
+    this.measureChunkStore.putData(data);
+    this.maxValue = BigDecimal.valueOf((double) maxValueObject);
+    this.divisionFactor = Math.pow(10, decimalPlaces);
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinLong.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinLong.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinLong.java
new file mode 100644
index 0000000..7cd9a30
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinLong.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastore.compression.nondecimal;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+public class CompressionNonDecimalMaxMinLong extends 
ValueCompressionHolder<long[]> {
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      
LogServiceFactory.getLogService(CompressionNonDecimalMaxMinLong.class.getName());
+
+  /**
+   * longCompressor.
+   */
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
+  /**
+   * value.
+   */
+  private long[] value;
+
+  private MeasureDataChunkStore<long[]> measureChunkStore;
+
+  private BigDecimal maxValue;
+
+  private double divisionFactor;
+
+  @Override public void setValue(long[] value) {
+    this.value = value;
+
+  }
+
+  @Override public long[] getValue() { return this.value; }
+
+  @Override
+  public void uncompress(DataType dataType, byte[] compressedData,
+      int offset, int length, int decimalPlaces, Object maxValueObject) {
+    super.unCompress(compressor, dataType, compressedData, offset, length);
+    setUncompressedValues(value, decimalPlaces, maxValueObject);
+  }
+
+  @Override public void compress() {
+    compressedValue = super.compress(compressor, DataType.DATA_LONG, value);
+  }
+
+  @Override public void setValueInBytes(byte[] value) {
+    ByteBuffer buff = ByteBuffer.wrap(value);
+    this.value = ValueCompressionUtil.convertToLongArray(buff, value.length);
+  }
+
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException(
+      "Long value is not defined for CompressionNonDecimalMaxMinLong");
+  }
+
+  @Override public double getDoubleValue(int index) {
+    long longValue = measureChunkStore.getLong(index);
+    BigDecimal diff = BigDecimal.valueOf(longValue / this.divisionFactor);
+    return maxValue.subtract(diff).doubleValue();
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException("Big decimal value is not 
supported");
+  }
+
+  private void setUncompressedValues(long[] data, int decimalPlaces, Object 
maxValueObject) {
+    this.measureChunkStore =
+      
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_LONG, 
data.length);
+    this.measureChunkStore.putData(data);
+    this.maxValue = BigDecimal.valueOf((double) maxValueObject);
+    this.divisionFactor = Math.pow(10, decimalPlaces);
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinShort.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinShort.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinShort.java
new file mode 100644
index 0000000..a3f36c6
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinShort.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastore.compression.nondecimal;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+public class CompressionNonDecimalMaxMinShort extends 
ValueCompressionHolder<short[]> {
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      
LogServiceFactory.getLogService(CompressionNonDecimalMaxMinShort.class.getName());
+  /**
+   * compressor.
+   */
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
+  /**
+   * value.
+   */
+  private short[] value;
+
+  private MeasureDataChunkStore<short[]> measureChunkStore;
+
+  private BigDecimal maxValue;
+
+  private double divisionFactor;
+
+  @Override public void setValue(short[] value) {
+    this.value = value;
+  }
+
+  @Override public short[] getValue() { return this.value; }
+
+  @Override
+  public void uncompress(DataType dataType, byte[] compressedData,
+      int offset, int length, int decimalPlaces, Object maxValueObject) {
+    super.unCompress(compressor, dataType, compressedData, offset, length);
+    setUncompressedValues(value, decimalPlaces, maxValueObject);
+  }
+
+  @Override public void compress() {
+    compressedValue = super.compress(compressor, DataType.DATA_SHORT, value);
+  }
+
+  @Override public void setValueInBytes(byte[] value) {
+    ByteBuffer buffer = ByteBuffer.wrap(value);
+    this.value = ValueCompressionUtil.convertToShortArray(buffer, 
value.length);
+  }
+
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException(
+      "Long value is not defined for CompressionNonDecimalMaxMinShort");
+  }
+
+  @Override public double getDoubleValue(int index) {
+    short shortValue = measureChunkStore.getShort(index);
+    BigDecimal diff = BigDecimal.valueOf(shortValue / this.divisionFactor);
+    return maxValue.subtract(diff).doubleValue();
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException("Get big decimal value is not 
supported");
+  }
+
+  private void setUncompressedValues(short[] data, int decimalPlaces, Object 
maxValueObject) {
+    this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
+        .getMeasureDataChunkStore(DataType.DATA_SHORT, data.length);
+    this.measureChunkStore.putData(data);
+    this.maxValue = BigDecimal.valueOf((double) maxValueObject);
+    this.divisionFactor = Math.pow(10, decimalPlaces);
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalShort.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalShort.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalShort.java
new file mode 100644
index 0000000..b65ecae
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalShort.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastore.compression.nondecimal;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+public class CompressionNonDecimalShort extends 
ValueCompressionHolder<short[]> {
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      
LogServiceFactory.getLogService(CompressionNonDecimalShort.class.getName());
+  /**
+   * shortCompressor.
+   */
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
+  /**
+   * value.
+   */
+  private short[] value;
+
+  private MeasureDataChunkStore<short[]> measureChunkStore;
+
+  private double divisionFactory;
+
+  @Override public void setValue(short[] value) {
+    this.value = value;
+  }
+
+  @Override public short[] getValue() { return this.value; }
+
+  @Override public void compress() {
+    compressedValue = super.compress(compressor, DataType.DATA_SHORT, value);
+  }
+
+  @Override
+  public void uncompress(DataType dataType, byte[] compressedData,
+      int offset, int length, int decimalPlaces, Object maxValueObject) {
+    super.unCompress(compressor,dataType,compressedData, offset, length);
+    setUncompressedValues(value, decimalPlaces);
+  }
+
+  @Override public void setValueInBytes(byte[] value) {
+    ByteBuffer buffer = ByteBuffer.wrap(value);
+    this.value = ValueCompressionUtil.convertToShortArray(buffer, 
value.length);
+  }
+
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException(
+      "Long value is not defined for CompressionNonDecimalShort");
+  }
+
+  @Override public double getDoubleValue(int index) {
+    return (measureChunkStore.getShort(index) / this.divisionFactory);
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException(
+      "Big decimal value is not defined for CompressionNonDecimalShort");
+  }
+
+  private void setUncompressedValues(short[] data, int decimalPlaces) {
+    this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
+        .getMeasureDataChunkStore(DataType.DATA_SHORT, data.length);
+    this.measureChunkStore.putData(data);
+    this.divisionFactory = Math.pow(10, decimalPlaces);
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneByte.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneByte.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneByte.java
new file mode 100644
index 0000000..8161557
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneByte.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastore.compression.none;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+public class CompressionNoneByte extends ValueCompressionHolder<byte[]> {
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CompressionNoneByte.class.getName());
+
+  /**
+   * byteCompressor.
+   */
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
+
+  /**
+   * value.
+   */
+  private byte[] value;
+
+  /**
+   * actual data type
+   */
+  private DataType actualDataType;
+
+  private MeasureDataChunkStore<byte[]> measureChunkStore;
+
+  public CompressionNoneByte(DataType actualDataType) {
+    this.actualDataType = actualDataType;
+  }
+
+  @Override public void setValue(byte[] value) {
+    this.value = value;
+  }
+
+  @Override
+  public void uncompress(DataType dataType, byte[] data, int offset, int 
length,
+      int mantissa, Object maxValueObject) {
+    super.unCompress(compressor, dataType, data, offset, length);
+    setUncompressedValues(value);
+  }
+
+  @Override public byte[] getValue() { return this.value; }
+
+  @Override public void compress() {
+    compressedValue = super.compress(compressor, DataType.DATA_BYTE, value);
+  }
+
+  @Override public void setValueInBytes(byte[] value) {
+    this.value = value;
+  }
+
+  @Override public long getLongValue(int index) {
+    return measureChunkStore.getByte(index);
+  }
+
+  @Override public double getDoubleValue(int index) {
+    return measureChunkStore.getByte(index);
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException(
+      "Big decimal is not defined for CompressionNoneByte");
+  }
+
+  private void setUncompressedValues(byte[] data) {
+    this.measureChunkStore =
+      
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_BYTE, 
data.length);
+    this.measureChunkStore.putData(data);
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneDefault.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneDefault.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneDefault.java
new file mode 100644
index 0000000..267710f
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneDefault.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastore.compression.none;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+public class CompressionNoneDefault extends ValueCompressionHolder<double[]> {
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CompressionNoneDefault.class.getName());
+  /**
+   * doubleCompressor.
+   */
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
+  /**
+   * value.
+   */
+  private double[] value;
+
+  private DataType actualDataType;
+
+  private MeasureDataChunkStore<double[]> measureChunkStore;
+
+  public CompressionNoneDefault(DataType actualDataType) {
+    this.actualDataType = actualDataType;
+  }
+
+  @Override public void setValue(double[] value) {this.value = value; }
+
+  @Override
+  public void uncompress(DataType dataType, byte[] data, int offset, int 
length,
+      int decimalPlaces, Object maxValueObject) {
+    super.unCompress(compressor, dataType, data, offset, length);
+    setUncompressedValues(value);
+  }
+
+  @Override public double[] getValue() { return this.value; }
+
+  @Override public void compress() {
+    compressedValue = super.compress(compressor, DataType.DATA_DOUBLE, value);
+  }
+
+  @Override public void setValueInBytes(byte[] value) {
+    ByteBuffer buffer = ByteBuffer.wrap(value);
+    this.value = ValueCompressionUtil.convertToDoubleArray(buffer, 
value.length);
+  }
+
+  @Override public long getLongValue(int index) {
+    throw new UnsupportedOperationException(
+      "Long value is not defined for CompressionNonDefault");
+  }
+
+  @Override public double getDoubleValue(int index) {
+    return measureChunkStore.getDouble(index);
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException(
+      "Big decimal is not defined for CompressionNoneDefault");
+  }
+
+  private void setUncompressedValues(double[] data) {
+    this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
+        .getMeasureDataChunkStore(DataType.DATA_DOUBLE, data.length);
+    this.measureChunkStore.putData(data);
+
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/48316190/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneInt.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneInt.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneInt.java
new file mode 100644
index 0000000..5e5e0eb
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneInt.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.datastore.compression.none;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.datastore.chunk.store.MeasureChunkStoreFactory;
+import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
+import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+
+public class CompressionNoneInt extends ValueCompressionHolder<int[]> {
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CompressionNoneInt.class.getName());
+  /**
+   * intCompressor.
+   */
+  private static Compressor compressor = 
CompressorFactory.getInstance().getCompressor();
+  /**
+   * value.
+   */
+  private int[] value;
+
+  private DataType actualDataType;
+
+  private MeasureDataChunkStore<int[]> measureChunkStore;
+
+  public CompressionNoneInt(DataType actualDataType) {
+    this.actualDataType = actualDataType;
+  }
+
+  @Override public void setValue(int[] value) {
+    this.value = value;
+  }
+
+  @Override public int[] getValue() { return this.value; }
+
+  @Override public void compress() {
+    compressedValue = super.compress(compressor, DataType.DATA_INT, value);
+  }
+
+  @Override
+  public void uncompress(DataType dataType, byte[] data, int offset, int 
length,
+      int decimalPlaces, Object maxValueObject) {
+    super.unCompress(compressor, dataType, data, offset, length);
+    setUncompressedValues(value);
+  }
+
+  @Override public void setValueInBytes(byte[] value) {
+    ByteBuffer buffer = ByteBuffer.wrap(value);
+    this.value = ValueCompressionUtil.convertToIntArray(buffer, value.length);
+  }
+
+  @Override public long getLongValue(int index) {
+    return measureChunkStore.getInt(index);
+  }
+
+  @Override public double getDoubleValue(int index) {
+    return measureChunkStore.getInt(index);
+  }
+
+  @Override public BigDecimal getBigDecimalValue(int index) {
+    throw new UnsupportedOperationException(
+      "Big decimal is not defined for CompressionNoneLong");
+  }
+
+  private void setUncompressedValues(int[] data) {
+    this.measureChunkStore =
+      
MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_INT, 
data.length);
+    this.measureChunkStore.putData(data);
+  }
+
+  @Override public void freeMemory() {
+    this.measureChunkStore.freeMemory();
+  }
+}

Reply via email to