Refactored core package and fixed all testcases (#684)

* Optimizing detail query

* Optimizing detail query flow

* Optimizing detail query flow

* Optimized raw detail query to improve push up performance.

* Fixed bugs

* reverted wrong check in

* Rebased the code

* Removed aggregation from core

* Refactored core package and fixed test cases

* Fixed bugs

* Fixed review comments and deleted aggregate classes after merge from master

* Removed unused code


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/6288ec71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/6288ec71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/6288ec71

Branch: refs/heads/master
Commit: 6288ec7152bbb832884de24e2e5912a0feecc0af
Parents: a83dba3
Author: Ravindra Pesala <ravi.pes...@gmail.com>
Authored: Sat Jun 18 06:47:19 2016 +0530
Committer: Jacky Li <jacky.li...@huawei.com>
Committed: Sat Jun 18 09:17:19 2016 +0800

----------------------------------------------------------------------
 .../aggregator/CalculatedMeasureAggregator.java |  30 --
 .../aggregator/CustomMeasureAggregator.java     |  39 ---
 .../query/aggregator/MeasureAggregator.java     | 144 --------
 .../impl/AbstractMeasureAggregatorBasic.java    |  60 ----
 .../impl/AbstractMeasureAggregatorMaxMin.java   | 177 ----------
 .../query/aggregator/impl/BitSet.java           | 337 -------------------
 .../aggregator/impl/CustomAggregatorHelper.java | 235 -------------
 .../impl/avg/AbstractAvgAggregator.java         |  28 --
 .../impl/avg/AvgBigDecimalAggregator.java       | 218 ------------
 .../impl/avg/AvgDoubleAggregator.java           | 210 ------------
 .../aggregator/impl/avg/AvgLongAggregator.java  | 195 -----------
 .../aggregator/impl/count/CountAggregator.java  | 204 -----------
 ...bstractDistinctCountAggregatorObjectSet.java | 103 ------
 .../impl/distinct/DistinctCountAggregator.java  | 319 ------------------
 .../DistinctCountAggregatorObjectSet.java       |  96 ------
 ...tinctCountBigDecimalAggregatorObjectSet.java |  86 -----
 .../DistinctCountLongAggregatorObjectSet.java   |  89 -----
 .../distinct/DistinctStringCountAggregator.java | 165 ---------
 .../SumDistinctBigDecimalAggregator.java        | 233 -------------
 .../distinct/SumDistinctDoubleAggregator.java   | 231 -------------
 .../distinct/SumDistinctLongAggregator.java     | 221 ------------
 .../dummy/AbstractMeasureAggregatorDummy.java   |  70 ----
 .../impl/dummy/DummyBigDecimalAggregator.java   |  63 ----
 .../impl/dummy/DummyDoubleAggregator.java       |  64 ----
 .../impl/dummy/DummyLongAggregator.java         |  59 ----
 .../impl/max/AbstractMaxAggregator.java         |  61 ----
 .../aggregator/impl/max/MaxAggregator.java      |  68 ----
 .../impl/max/MaxBigDecimalAggregator.java       |  67 ----
 .../aggregator/impl/max/MaxLongAggregator.java  |  67 ----
 .../impl/min/AbstractMinAggregator.java         |  61 ----
 .../aggregator/impl/min/MinAggregator.java      |  66 ----
 .../impl/min/MinBigDecimalAggregator.java       |  66 ----
 .../aggregator/impl/min/MinLongAggregator.java  |  66 ----
 .../impl/sum/SumBigDecimalAggregator.java       | 179 ----------
 .../impl/sum/SumDoubleAggregator.java           | 179 ----------
 .../aggregator/impl/sum/SumLongAggregator.java  | 165 ---------
 .../util/MeasureAggregatorFactory.java          | 182 ----------
 .../query/carbon/aggregator/DataAggregator.java |  76 -----
 .../dimension/DimensionDataAggregator.java      |  40 ---
 .../impl/ColumnGroupDimensionsAggregator.java   | 109 ------
 .../DirectDictionaryDimensionAggregator.java    | 114 -------
 .../impl/FixedLengthDimensionAggregator.java    | 131 -------
 .../impl/VariableLengthDimensionAggregator.java | 126 -------
 .../expression/ExpressionAggregator.java        | 135 --------
 .../impl/ListBasedResultAggregator.java         | 128 ++++---
 .../impl/MapBasedResultAggregator.java          | 171 ----------
 .../measure/MeasureDataAggregator.java          |  48 ---
 .../measure/impl/FactTableAggregator.java       |  87 -----
 .../carbon/executor/QueryExecutorFactory.java   |  48 +--
 .../executor/impl/AbstractQueryExecutor.java    |  51 +--
 .../executor/impl/AggregationQueryExecutor.java |  47 ---
 .../executor/impl/CountStarQueryExecutor.java   |  45 ---
 .../executor/impl/DetailQueryExecutor.java      |   5 +-
 .../impl/DetailRawRecordQueryExcecutor.java     |  26 --
 .../impl/DetailRawRecordQueryExecutor.java      |  26 ++
 .../impl/DetailWithOrderByQueryExecutor.java    |  53 ---
 .../executor/impl/FunctionQueryExecutor.java    |  37 --
 .../executor/impl/QueryExecutorProperties.java  |  12 +-
 .../carbon/executor/infos/AggregatorInfo.java   |  30 +-
 .../executor/infos/BlockExecutionInfo.java      |  56 +--
 .../impl/InternalAbstractQueryExecutor.java     |  12 +-
 .../impl/InternalAggregationQueryExecutor.java  |  44 ---
 .../impl/InternalCountStartQueryExecutor.java   |  83 -----
 .../impl/InternalDetailQueryExecutor.java       |   7 +-
 .../InternalDetailWithOrderQueryExecutor.java   |  74 ----
 .../impl/InternalFunctionQueryExecutor.java     |  36 --
 .../executor/internal/impl/QueryRunner.java     |   7 +-
 .../query/carbon/executor/util/QueryUtil.java   | 189 +----------
 .../merger/AbstractScannedResultMerger.java     |   7 +-
 .../merger/impl/SortedScannedResultMerger.java  | 176 ----------
 .../query/carbon/model/CarbonQueryPlan.java     |  30 --
 .../carbon/model/CustomAggregateExpression.java | 149 --------
 .../query/carbon/model/QueryModel.java          |  55 ---
 .../processor/AbstractDataBlockProcessor.java   |  11 +-
 .../carbon/result/AbstractScannedResult.java    |   8 +
 .../query/carbon/result/BatchRawResult.java     | 130 ++-----
 .../query/carbon/result/BatchResult.java        |  64 ++--
 .../carbon/result/ListBasedResultWrapper.java   |   7 +-
 .../carbondata/query/carbon/result/Result.java  |   7 +-
 .../query/carbon/result/RowResult.java          |  45 ---
 .../carbon/result/impl/ListBasedResult.java     |  11 +-
 .../carbon/result/impl/MapBasedResult.java      | 141 --------
 .../AbstractDetailQueryResultIterator.java      |   8 +-
 .../ChunkBasedDetailResultIterator.java         |  75 +++++
 .../iterator/ChunkBasedResultIterator.java      |  71 ----
 .../result/iterator/ChunkRawRowIterartor.java   |  56 ---
 .../result/iterator/ChunkRowIterator.java       |   5 +-
 .../iterator/DetailQueryResultIterator.java     |  12 +-
 .../iterator/DetailRawQueryResultIterator.java  |  89 +++--
 .../preparator/QueryResultPreparator.java       |   5 +-
 .../impl/AbstractQueryResultPreparator.java     |  61 ++--
 .../impl/DetailQueryResultPreparatorImpl.java   | 139 ++++++++
 .../impl/QueryResultPreparatorImpl.java         | 297 ----------------
 .../impl/RawQueryResultPreparatorImpl.java      |  87 ++---
 .../executer/RowLevelFilterExecuterImpl.java    |  59 ++--
 .../MeasureColumnResolvedFilterInfo.java        |  10 -
 .../query/scanner/impl/CarbonKey.java           | 105 ------
 .../query/scanner/impl/CarbonValue.java         |  87 -----
 .../carbondata/hadoop/CarbonRecordReader.java   |   8 +-
 .../spark/sql/CarbonCatalystOperators.scala     | 330 ------------------
 .../spark/sql/CarbonDictionaryDecoder.scala     |  29 +-
 .../org/apache/spark/sql/CarbonOperators.scala  |  35 +-
 .../org/apache/spark/sql/CarbonSQLConf.scala    |  11 -
 .../org/apache/spark/sql/CarbonSqlParser.scala  |  10 +-
 .../execution/command/carbonTableSchema.scala   |  33 +-
 .../spark/sql/hive/CarbonStrategies.scala       |  86 ++---
 .../CarbonDecoderOptimizerHelper.scala          |   8 +-
 .../spark/sql/optimizer/CarbonOptimizer.scala   |  39 ++-
 .../org/carbondata/spark/CarbonFilters.scala    |  58 ++--
 .../scala/org/carbondata/spark/KeyVal.scala     |  28 +-
 .../spark/rdd/CarbonCleanFilesRDD.scala         |  22 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  40 +--
 .../spark/rdd/CarbonDeleteLoadRDD.scala         |  40 +--
 .../spark/rdd/CarbonDropAggregateTableRDD.scala |  85 -----
 .../spark/rdd/CarbonDropCubeRDD.scala           |  77 -----
 .../spark/rdd/CarbonDropTableRDD.scala          |  76 +++++
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |   3 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  42 +--
 .../carbondata/spark/util/QueryPlanUtil.scala   |   5 +-
 .../AllDataTypesTestCaseAggregate.scala         |   1 +
 .../store/colgroup/ColGroupMinMaxTest.java      |   2 +-
 121 files changed, 877 insertions(+), 9154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/aggregator/CalculatedMeasureAggregator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/carbondata/query/aggregator/CalculatedMeasureAggregator.java
 
b/core/src/main/java/org/carbondata/query/aggregator/CalculatedMeasureAggregator.java
deleted file mode 100644
index d115a91..0000000
--- 
a/core/src/main/java/org/carbondata/query/aggregator/CalculatedMeasureAggregator.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.query.aggregator;
-
-public interface CalculatedMeasureAggregator extends MeasureAggregator {
-
-  /**
-   * Calculate calculated measures
-   *
-   * @param aggregators
-   */
-  void calculateCalcMeasure(MeasureAggregator[] aggregators);
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/aggregator/CustomMeasureAggregator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/carbondata/query/aggregator/CustomMeasureAggregator.java
 
b/core/src/main/java/org/carbondata/query/aggregator/CustomMeasureAggregator.java
deleted file mode 100644
index 7c14a7c..0000000
--- 
a/core/src/main/java/org/carbondata/query/aggregator/CustomMeasureAggregator.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.query.aggregator;
-
-import java.util.List;
-
-import org.carbondata.query.carbonfilterinterface.RowIntf;
-import org.carbondata.query.expression.ColumnExpression;
-
-public interface CustomMeasureAggregator extends MeasureAggregator {
-  /**
-   * Aggregate method with generic row interface where RowIntf holds value for
-   * each column given in MeasureAggregator@getColumns()
-   */
-  void agg(RowIntf row);
-
-  /**
-   * @return List of columns required for the aggregator
-   */
-  List<ColumnExpression> getColumns();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/aggregator/MeasureAggregator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/carbondata/query/aggregator/MeasureAggregator.java 
b/core/src/main/java/org/carbondata/query/aggregator/MeasureAggregator.java
deleted file mode 100644
index 515d307..0000000
--- a/core/src/main/java/org/carbondata/query/aggregator/MeasureAggregator.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.query.aggregator;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-import java.math.BigDecimal;
-
-import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
-
-/**
- * Class Description : MeasureAggregator interface. It will be implemented by
- * all the aggregator functions eg: sum, avg, max, min, etc, will be used for
- * aggregate the measure value based on kind of aggregator
- */
-
-public interface MeasureAggregator extends Serializable, 
Comparable<MeasureAggregator> {
-
-  /**
-   * Below method will be used to aggregate the Double value
-   *
-   * @param newVal
-   */
-  void agg(double newVal);
-
-  /**
-   * Below method will be used to aggregate the object value
-   *
-   * @param newVal
-   */
-  void agg(Object newVal);
-
-  /**
-   * Below method will be used to aggregate the value based on index
-   *
-   * @param newVal
-   * @param index
-   */
-  void agg(MeasureColumnDataChunk newVal, int index);
-
-  /**
-   * Get the Serialize byte array
-   *
-   * @return
-   */
-  byte[] getByteArray();
-
-  /**
-   * This method will be used to set the new value
-   *
-   * @param newValue
-   */
-  void setNewValue(Object newValue);
-
-  /**
-   * This method return the object value of the MeasureAggregator
-   *
-   * @return aggregated value
-   */
-  Object getValueObject();
-
-  /**
-   * This method return the object value of the MeasureAggregator
-   *
-   * @return aggregated value
-   */
-  Double getDoubleValue();
-
-  /**
-   * This method return the object value of the MeasureAggregator
-   *
-   * @return aggregated value
-   */
-  Long getLongValue();
-
-  BigDecimal getBigDecimalValue();
-
-  /**
-   * This method merge the aggregated value based on aggregator passed
-   *
-   * @param aggregator type of aggregator
-   */
-  void merge(MeasureAggregator aggregator);
-
-  /**
-   * Is first time. It means it was never used for aggregating any value.
-   *
-   * @return
-   */
-  boolean isFirstTime();
-
-  /**
-   * it creates the new copy of MeasureAggregator
-   *
-   * @return MeasureAggregator
-   */
-  MeasureAggregator getCopy();
-
-  /**
-   * Write the state of the class to buffer
-   */
-  void writeData(DataOutput output) throws IOException;
-
-  /**
-   * Read the state of the class and set to the object
-   */
-  void readData(DataInput inPut) throws IOException;
-
-  MeasureAggregator get();
-
-  /**
-   * Merge the byte arrays
-   *
-   * @param value
-   */
-  void merge(byte[] value);
-
-  /**
-   * Below method will be used to get the
-   * new instance
-   *
-   * @return new instance
-   */
-  MeasureAggregator getNew();
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractMeasureAggregatorBasic.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractMeasureAggregatorBasic.java
 
b/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractMeasureAggregatorBasic.java
deleted file mode 100644
index a4ce00b..0000000
--- 
a/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractMeasureAggregatorBasic.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.query.aggregator.impl;
-
-import java.math.BigDecimal;
-
-import org.carbondata.query.aggregator.MeasureAggregator;
-
-/**
- * AbstractMeasureAggregatorSum
- * Used for custom Carbon Aggregator sum
- */
-public abstract class AbstractMeasureAggregatorBasic implements 
MeasureAggregator {
-  /**
-   * serialVersionUID
-   */
-  private static final long serialVersionUID = 1L;
-
-  protected boolean firstTime = true;
-
-  @Override public void agg(double newVal) {
-  }
-
-  @Override public Double getDoubleValue() {
-    return null;
-  }
-
-  @Override public Long getLongValue() {
-    return null;
-  }
-
-  @Override public BigDecimal getBigDecimalValue() {
-    return null;
-  }
-
-  @Override public boolean isFirstTime() {
-    return firstTime;
-  }
-
-  @Override public MeasureAggregator get() {
-    return this;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractMeasureAggregatorMaxMin.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractMeasureAggregatorMaxMin.java
 
b/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractMeasureAggregatorMaxMin.java
deleted file mode 100644
index 99a8ed9..0000000
--- 
a/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractMeasureAggregatorMaxMin.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.query.aggregator.impl;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutput;
-import java.io.ObjectOutputStream;
-import java.math.BigDecimal;
-
-import org.carbondata.common.logging.LogService;
-import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.core.util.CarbonUtil;
-import org.carbondata.query.aggregator.MeasureAggregator;
-import org.carbondata.query.aggregator.impl.max.MaxAggregator;
-
-/**
- * AbstractMeasureAggregatorMaxMin
- * Used for custom Carbon Aggregator max  min
- */
-public abstract class AbstractMeasureAggregatorMaxMin implements 
MeasureAggregator {
-  private static final long serialVersionUID = 1L;
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(MaxAggregator.class.getName());
-
-  public Comparable<Object> aggVal;
-
-  public boolean firstTime = true;
-
-  protected abstract void internalAgg(Object value);
-
-  @Override public void agg(double newVal) {
-    internalAgg((Double) newVal);
-    firstTime = false;
-  }
-
-  @Override public Double getDoubleValue() {
-    return (Double) ((Object) aggVal);
-  }
-
-  @Override public void agg(Object newVal) {
-    internalAgg(newVal);
-    firstTime = false;
-  }
-
-  @Override public Long getLongValue() {
-    return (Long) ((Object) aggVal);
-  }
-
-  @Override public BigDecimal getBigDecimalValue() {
-    return (BigDecimal) ((Object) aggVal);
-  }
-
-  /**
-   * @see MeasureAggregator#setNewValue(Object)
-   */
-  @Override public void setNewValue(Object newValue) {
-  }
-
-  /**
-   * This method return the max value as an object
-   * a8
-   *
-   * @return max value as an object
-   */
-  @Override public Object getValueObject() {
-    return aggVal;
-  }
-
-  @Override public boolean isFirstTime() {
-    return firstTime;
-  }
-
-  @Override public MeasureAggregator get() {
-    return this;
-
-  }
-
-  public String toString() {
-    return aggVal + "";
-  }
-
-  @Override public int compareTo(MeasureAggregator msrAggr) {
-    @SuppressWarnings("unchecked") Comparable<Object> other =
-        (Comparable<Object>) msrAggr.getValueObject();
-
-    return aggVal.compareTo(other);
-  }
-
-  @Override public void writeData(DataOutput dataOutput) throws IOException {
-    ByteArrayOutputStream bos = null;
-    ObjectOutput out = null;
-
-    try {
-      dataOutput.writeBoolean(firstTime);
-      bos = new ByteArrayOutputStream();
-      out = new ObjectOutputStream(bos);
-      out.writeObject(aggVal);
-      byte[] objectBytes = bos.toByteArray();
-      dataOutput.write(objectBytes.length);
-      dataOutput.write(objectBytes, 0, objectBytes.length);
-    } catch (Exception e) {
-      LOGGER.error(e,
-          "Problem while getting byte array in maxMinAggregator: " + 
e.getMessage());
-    } finally {
-      CarbonUtil.closeStreams(bos);
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public void readData(DataInput inPut)
-      throws IOException {
-    ByteArrayInputStream bis = null;
-    ObjectInput in = null;
-    try {
-      int length = inPut.readInt();
-      firstTime = inPut.readBoolean();
-      byte[] data = new byte[length];
-      bis = new ByteArrayInputStream(data);
-      in = new ObjectInputStream(bis);
-      aggVal = (Comparable<Object>) in.readObject();
-    } catch (Exception e) {
-      LOGGER.error(e,
-          "Problem while getting byte array in maxMinAggregator: " + 
e.getMessage());
-    } finally {
-      CarbonUtil.closeStreams(bis);
-    }
-  }
-
-  /**
-   * Below method will be used to get the value byte array
-   */
-  @Override public byte[] getByteArray() {
-    byte[] objectBytes = new byte[0];
-    if (firstTime) {
-      return objectBytes;
-    }
-    ObjectOutput out = null;
-    ByteArrayOutputStream bos = null;
-    try {
-      bos = new ByteArrayOutputStream();
-      out = new ObjectOutputStream(bos);
-      out.writeObject(aggVal);
-      objectBytes = bos.toByteArray();
-    } catch (IOException e) {
-      LOGGER.error(e,
-          "Problem while getting byte array in maxMinAggregator: " + 
e.getMessage());
-    } finally {
-      CarbonUtil.closeStreams(bos);
-    }
-    return objectBytes;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/aggregator/impl/BitSet.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/carbondata/query/aggregator/impl/BitSet.java 
b/core/src/main/java/org/carbondata/query/aggregator/impl/BitSet.java
deleted file mode 100644
index 61fe179..0000000
--- a/core/src/main/java/org/carbondata/query/aggregator/impl/BitSet.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.query.aggregator.impl;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.util.Arrays;
-
-/**
- * This class implements a vector of bits that grows as needed. Each component
- * of the bit set has a {@code boolean} value. The bits of a {@code BitSet} are
- * indexed by nonnegative integers. Individual indexed bits can be examined,
- * set, or cleared. One {@code BitSet} may be used to modify the contents of
- * another {@code BitSet} through logical AND, logical inclusive OR, and 
logical
- * exclusive OR operations.
- * By default, all bits in the set initially have the value {@code false}.
- * Every bit set has a current size, which is the number of bits of space
- * currently in use by the bit set. Note that the size is related to the
- * implementation of a bit set, so it may change with implementation. The 
length
- * of a bit set relates to logical length of a bit set and is defined
- * independently of implementation.
- * Unless otherwise noted, passing a null parameter to any of the methods in a
- * {@code BitSet} will result in a {@code NullPointerException}.
- * A {@code BitSet} is not safe for multithreaded use without external
- * synchronization.
- *
- * @author Arthur van Hoff
- * @author Michael McCloskey
- * @author Martin Buchholz
- * @since JDK1.0
- */
-public class BitSet implements java.io.Serializable {
-  /* use serialVersionUID from JDK 1.0.2 for interoperability */
-  private static final long serialVersionUID = 7997698588986878753L;
-
-  /*
-   * BitSets are packed into arrays of "words." Currently a word is a long,
-   * which consists of 64 bits, requiring 6 address bits. The choice of word
-   * size is determined purely by performance concerns.
-   */
-  private static final int ADDRESS_BITS_PER_WORD = 6;
-
-  private static final int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD;
-
-  /* Used to shift left or right for a partial word mask */
-  private static final long WORD_MASK = 0xffffffffffffffffL;
-
-  /**
-   * The internal field corresponding to the serialField "bits".
-   */
-  private long[] words;
-
-  /**
-   * The number of words in the logical size of this BitSet.
-   */
-  private transient int wordsInUse;
-
-  /**
-   * Creates a new bit set. All bits are initially {@code false}.
-   */
-  public BitSet() {
-    initWords(BITS_PER_WORD);
-  }
-
-  /**
-   * Creates a bit set using words as the internal representation. The last
-   * word (if there is one) must be non-zero.
-   */
-  private BitSet(long[] words) {
-    this.words = words;
-    this.wordsInUse = words.length;
-    checkInvariants();
-  }
-
-  /**
-   * Given a bit index, return word index containing it.
-   */
-  private static int wordIndex(int bitIndex) {
-    return bitIndex >> ADDRESS_BITS_PER_WORD;
-  }
-
-  /**
-   * Returns a new bit set containing all the bits in the given byte array.
-   * More precisely, <br>
-   * {@code BitSet.valueOf(bytes).get(n) == ((bytes[n/8] & (1<<(n%8))) != 0)}
-   * <br>
-   * for all {@code n <  8 * bytes.length}.
-   * This method is equivalent to
-   * {@code BitSet.valueOf(ByteBuffer.wrap(bytes))}.
-   *
-   * @param bytes a byte array containing a little-endian representation of a
-   *              sequence of bits to be used as the initial bits of the new 
bit
-   *              set
-   * @since 1.7
-   */
-  public static BitSet valueOf(byte[] bytes) {
-    return BitSet.valueOf(ByteBuffer.wrap(bytes));
-  }
-
-  /**
-   * Returns a new bit set containing all the bits in the given byte buffer
-   * between its position and limit.
-   * More precisely, <br>
-   * {@code BitSet.valueOf(bb).get(n) == ((bb.get(bb.position()+n/8) & 
(1<<(n%8))) != 0)}
-   * <br>
-   * for all {@code n < 8 * bb.remaining()}.
-   * The byte buffer is not modified by this method, and no reference to the
-   * buffer is retained by the bit set.
-   *
-   * @param bb a byte buffer containing a little-endian representation of a
-   *           sequence of bits between its position and limit, to be used as
-   *           the initial bits of the new bit set
-   * @since 1.7
-   */
-  public static BitSet valueOf(ByteBuffer bb) {
-    bb = bb.slice().order(ByteOrder.LITTLE_ENDIAN);
-    int n = 0;//CHECKSTYLE:OFF
-    for (n = bb.remaining(); ; n--) {//CHECKSTYLE:ON
-      if (n > 0 && bb.get(n - 1) == 0) {
-        continue;
-      } else {
-        break;
-      }
-    }
-    long[] words = new long[(n + 7) / 8];
-    bb.limit(n);
-    int i = 0;//CHECKSTYLE:OFF
-    while (bb.remaining() >= 8) {//CHECKSTYLE:ON
-      words[i++] = bb.getLong();
-    }
-
-    int j = 0;
-    for (int remaining = bb.remaining(); j < remaining; j++) {
-      words[i] |= (bb.get() & 0xffL) << (8 * j);
-    }
-    return new BitSet(words);
-  }
-
-  /**
-   * Every public method must preserve these invariants.
-   */
-  private void checkInvariants() {
-    assert(wordsInUse == 0 || words[wordsInUse - 1] != 0);
-    assert(wordsInUse >= 0 && wordsInUse <= words.length);
-    assert(wordsInUse == words.length || words[wordsInUse] == 0);
-  }
-
-  private void initWords(int nbits) {
-    words = new long[wordIndex(nbits - 1) + 1];
-  }
-
-  /**
-   * Returns a new byte array containing all the bits in this bit set.
-   * More precisely, if <br>
-   * {@code byte[] bytes = s.toByteArray();} <br>
-   * then {@code bytes.length == (s.length()+7)/8} and <br>
-   * {@code s.get(n) == ((bytes[n/8] & (1<<(n%8))) != 0)} <br>
-   * for all {@code n < 8 * bytes.length}.
-   *
-   * @return a byte array containing a little-endian representation of all the
-   * bits in this bit set
-   * @since 1.7
-   */
-  public byte[] toByteArray() {
-    int n = wordsInUse;
-    if (n == 0) {
-      return new byte[0];
-    }
-    int len = 8 * (n - 1);
-    for (long x = words[n - 1]; x != 0; x >>>= 8) {
-      len++;
-    }
-    byte[] bytes = new byte[len];
-    ByteBuffer bb = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
-    for (int i = 0; i < n - 1; i++) {
-      bb.putLong(words[i]);
-    }
-    for (long x = words[n - 1]; x != 0; x >>>= 8) {
-      bb.put((byte) (x & 0xff));
-    }
-    return bytes;
-  }
-
-  /**
-   * Ensures that the BitSet can hold enough words.
-   *
-   * @param wordsRequired the minimum acceptable number of words.
-   */
-  private void ensureCapacity(int wordsRequired) {
-    if (words.length < wordsRequired) {
-      // Allocate larger of doubled size or required size
-      int request = Math.max(2 * words.length, wordsRequired);
-      words = Arrays.copyOf(words, request);
-    }
-  }
-
-  /**
-   * Ensures that the BitSet can accommodate a given wordIndex, temporarily
-   * violating the invariants. The caller must restore the invariants before
-   * returning to the user, possibly using recalculateWordsInUse().
-   *
-   * @param wordIndex the index to be accommodated.
-   */
-  private void expandTo(int wordIndex) {
-    int wordsRequired = wordIndex + 1;
-    if (wordsInUse < wordsRequired) {
-      ensureCapacity(wordsRequired);
-      wordsInUse = wordsRequired;
-    }
-  }
-
-  /**
-   * Sets the bit at the specified index to {@code true}.
-   *
-   * @param bitIndex a bit index
-   * @throws IndexOutOfBoundsException if the specified index is negative
-   * @since JDK1.0
-   */
-  public void set(int bitIndex) {
-    if (bitIndex < 0) {
-      throw new IndexOutOfBoundsException("bitIndex < 0: " + bitIndex);
-    }
-    int wordIndex = wordIndex(bitIndex);
-    expandTo(wordIndex);
-
-    words[wordIndex] |= (1L << bitIndex); // Restores invariants
-
-    checkInvariants();
-  }
-
-  /**
-   * Returns the index of the first bit that is set to {@code true} that
-   * occurs on or after the specified starting index. If no such bit exists
-   * then {@code -1} is returned.
-   * To iterate over the {@code true} bits in a {@code BitSet}, use the
-   * following loop:
-   * <pre>
-   * {@code
-   * for (int i = bs.nextSetBit(0); i >= 0; i = bs.nextSetBit(i+1)) {
-   *     // operate on index i here
-   * }}
-   * </pre>
-   *
-   * @param fromIndex the index to start checking from (inclusive)
-   * @return the index of the next set bit, or {@code -1} if there is no such
-   * bit
-   * @throws IndexOutOfBoundsException if the specified index is negative
-   * @since 1.4
-   */
-  public int nextSetBit(int fromIndex) {
-    if (fromIndex < 0) {
-      throw new IndexOutOfBoundsException("fromIndex < 0: " + fromIndex);
-    }
-    checkInvariants();
-
-    int u = wordIndex(fromIndex);
-    if (u >= wordsInUse) {
-      return -1;
-    }
-    long word = words[u] & (WORD_MASK << fromIndex);
-
-    while (true) {
-      if (word != 0) {
-        return (u * BITS_PER_WORD) + Long.numberOfTrailingZeros(word);
-      }
-
-      u++;
-      if (u == wordsInUse) {
-        return -1;
-      }
-      word = words[u];
-    }
-  }
-
-  /**
-   * Returns the number of bits set to {@code true} in this {@code BitSet}.
-   *
-   * @return the number of bits set to {@code true} in this {@code BitSet}
-   * @since 1.4
-   */
-  public int cardinality() {
-    int sum = 0;
-    for (int i = 0; i < wordsInUse; i++) {
-      sum += Long.bitCount(words[i]);
-    }
-    return sum;
-  }
-
-  /**
-   * Performs a logical <b>OR</b> of this bit set with the bit set argument.
-   * This bit set is modified so that a bit in it has the value {@code true}
-   * if and only if it either already had the value {@code true} or the
-   * corresponding bit in the bit set argument has the value {@code true}.
-   *
-   * @param set a bit set
-   */
-  public void or(BitSet set) {
-    if (this == set) {
-      return;
-    }
-    int wordsInCommon = Math.min(wordsInUse, set.wordsInUse);
-
-    if (wordsInUse < set.wordsInUse) {
-      ensureCapacity(set.wordsInUse);
-      wordsInUse = set.wordsInUse;
-    }
-
-    // Perform logical OR on words in common
-    for (int i = 0; i < wordsInCommon; i++) {
-      words[i] |= set.words[i];
-    }
-    // Copy any remaining words
-    if (wordsInCommon < set.wordsInUse) {
-      System.arraycopy(set.words, wordsInCommon, words, wordsInCommon, 
wordsInUse - wordsInCommon);
-    }
-    // recalculateWordsInUse() is unnecessary
-    checkInvariants();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/aggregator/impl/CustomAggregatorHelper.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/carbondata/query/aggregator/impl/CustomAggregatorHelper.java
 
b/core/src/main/java/org/carbondata/query/aggregator/impl/CustomAggregatorHelper.java
deleted file mode 100644
index 0e23df1..0000000
--- 
a/core/src/main/java/org/carbondata/query/aggregator/impl/CustomAggregatorHelper.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.query.aggregator.impl;
-
-import java.io.File;
-import java.io.FileFilter;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.carbondata.common.logging.LogService;
-import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.core.constants.CarbonCommonConstants;
-import org.carbondata.core.util.CarbonProperties;
-import org.carbondata.core.util.CarbonUtil;
-
-import org.apache.commons.codec.binary.Base64;
-
-public class CustomAggregatorHelper {
-  /**
-   * LOGGER
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(CustomAggregatorHelper.class.getName());
-
-  /**
-   * surrogateKeyMap
-   */
-  private Map<String, Map<Integer, String>> surrogateKeyMap;
-
-  /**
-   * loadFolderList
-   */
-  private List<File> loadFolderList;
-
-  public CustomAggregatorHelper() {
-    surrogateKeyMap =
-        new HashMap<String, Map<Integer, 
String>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    loadFolderList = new 
ArrayList<File>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-  }
-
-  /**
-   * Below method will be used to get the file list
-   *
-   * @param baseStorePath
-   * @param fileNameSearchPattern
-   * @return
-   */
-  private static File[] getFilesArray(File baseStorePath, final String 
fileNameSearchPattern) {
-    File[] listFiles = baseStorePath.listFiles(new FileFilter() {
-      @Override public boolean accept(File pathname) {
-        if (pathname.getName().indexOf(fileNameSearchPattern) > -1) {
-          return true;
-        }
-        return false;
-      }
-    });
-    return listFiles;
-  }
-
-  /**
-   * Below method will be used to get the member
-   *
-   * @param tableName
-   * @param columnName
-   * @param key
-   * @param cubeName
-   * @param schemaName
-   * @return member
-   */
-  public String getDimValue(String tableName, String columnName, int key, 
String cubeName,
-      String schemaName) {
-    Map<Integer, String> memberCache = surrogateKeyMap.get(tableName + '_' + 
columnName);
-    if (null == memberCache) {
-      loadLevelFile(tableName, columnName, cubeName, schemaName);
-    }
-    memberCache = surrogateKeyMap.get(tableName + '_' + columnName);
-    return memberCache.get(key);
-  }
-
-  /**
-   * Below method will be used to fill the level cache
-   *
-   * @param tableName
-   * @param columnName
-   * @param cubeName
-   * @param schemaName
-   */
-  private void loadLevelFile(String tableName, String columnName, String 
cubeName,
-      String schemaName) {
-    String baseLocation = CarbonUtil.getCarbonStorePath(schemaName, cubeName);
-    baseLocation = baseLocation + File.separator + schemaName + File.separator 
+ cubeName;
-    if (loadFolderList.size() == 0) {
-      checkAndUpdateFolderList(baseLocation);
-    }
-    try {
-      File[] filesArray = null;
-      for (File loadFoler : loadFolderList) {
-        filesArray = getFilesArray(loadFoler, tableName + '_' + columnName);
-        for (int i = 0; i < filesArray.length; i++) {
-          readLevelFileAndUpdateCache(filesArray[i], tableName + '_' + 
columnName);
-        }
-      }
-    } catch (IOException e) {
-      LOGGER
-          .error("Problem while populating the cache");
-    }
-  }
-
-  /**
-   * Below method will be used to read the level files
-   *
-   * @param memberFile
-   * @param fileName
-   * @throws IOException
-   */
-  private void readLevelFileAndUpdateCache(File memberFile, String fileName) 
throws IOException {
-    FileInputStream fos = null;
-    FileChannel fileChannel = null;
-    try {
-      // create an object of FileOutputStream
-      fos = new FileInputStream(memberFile);
-
-      fileChannel = fos.getChannel();
-      Map<Integer, String> memberMap = surrogateKeyMap.get(fileName);
-
-      if (null == memberMap) {
-        memberMap = new HashMap<Integer, 
String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-        surrogateKeyMap.put(fileName, memberMap);
-      }
-
-      long size = fileChannel.size();
-      int maxKey = 0;
-      ByteBuffer rowlengthToRead = null;
-      int len = 0;
-      ByteBuffer row = null;
-      int toread = 0;
-      byte[] bb = null;
-      String value = null;
-      int surrogateValue = 0;
-
-      boolean enableEncoding = Boolean.valueOf(CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.ENABLE_BASE64_ENCODING,
-              CarbonCommonConstants.ENABLE_BASE64_ENCODING_DEFAULT));
-
-      while (fileChannel.position() < size) {
-        rowlengthToRead = ByteBuffer.allocate(4);
-        fileChannel.read(rowlengthToRead);
-        rowlengthToRead.rewind();
-        len = rowlengthToRead.getInt();
-        if (len == 0) {
-          continue;
-        }
-
-        row = ByteBuffer.allocate(len);
-        fileChannel.read(row);
-        row.rewind();
-        toread = row.getInt();
-        bb = new byte[toread];
-        row.get(bb);
-
-        if (enableEncoding) {
-          value = new String(Base64.decodeBase64(bb), 
Charset.defaultCharset());
-        } else {
-          value = new String(bb, Charset.defaultCharset());
-        }
-
-        surrogateValue = row.getInt();
-        memberMap.put(surrogateValue, value);
-
-        // check if max key is less than Surrogate key then update the max key
-        if (maxKey < surrogateValue) {
-          maxKey = surrogateValue;
-        }
-      }
-
-    } finally {
-      CarbonUtil.closeStreams(fileChannel, fos);
-    }
-  }
-
-  /**
-   * This method recursively checks the folder with Load_ inside each and
-   * every RS_x/TableName/Load_x and add in the folder list the load folders.
-   *
-   * @param baseStorePath
-   * @return
-   */
-  private File[] checkAndUpdateFolderList(String baseStorePath) {
-    File folders = new File(baseStorePath);
-    //
-    File[] rsFolders = folders.listFiles(new FileFilter() {
-      @Override public boolean accept(File pathname) {
-        boolean check = false;
-        check = pathname.isDirectory()
-            && 
pathname.getAbsolutePath().indexOf(CarbonCommonConstants.LOAD_FOLDER) > -1;
-        if (check) {
-          return true;
-        } else {
-          File[] checkFolder = 
checkAndUpdateFolderList(pathname.getAbsolutePath());
-          if (null != checkFolder) {
-            for (File f : checkFolder) {
-              loadFolderList.add(f);
-            }
-          }
-        }
-        return false;
-      }
-    });
-    return rsFolders;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/aggregator/impl/avg/AbstractAvgAggregator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/carbondata/query/aggregator/impl/avg/AbstractAvgAggregator.java
 
b/core/src/main/java/org/carbondata/query/aggregator/impl/avg/AbstractAvgAggregator.java
deleted file mode 100644
index 24eff08..0000000
--- 
a/core/src/main/java/org/carbondata/query/aggregator/impl/avg/AbstractAvgAggregator.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.query.aggregator.impl.avg;
-
-import org.carbondata.query.aggregator.impl.AbstractMeasureAggregatorBasic;
-
-public abstract class AbstractAvgAggregator extends 
AbstractMeasureAggregatorBasic {
-
-  public abstract Object[] getAvgState();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/aggregator/impl/avg/AvgBigDecimalAggregator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/carbondata/query/aggregator/impl/avg/AvgBigDecimalAggregator.java
 
b/core/src/main/java/org/carbondata/query/aggregator/impl/avg/AvgBigDecimalAggregator.java
deleted file mode 100644
index c6ba55d..0000000
--- 
a/core/src/main/java/org/carbondata/query/aggregator/impl/avg/AvgBigDecimalAggregator.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.query.aggregator.impl.avg;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-
-import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
-import org.carbondata.core.constants.CarbonCommonConstants;
-import org.carbondata.core.util.DataTypeUtil;
-import org.carbondata.query.aggregator.MeasureAggregator;
-
-public class AvgBigDecimalAggregator extends AbstractAvgAggregator {
-
-  /**
-   * serialVersionUID
-   */
-  private static final long serialVersionUID = 5463736686281089871L;
-
-  /**
-   * total number of aggregate values
-   */
-  protected double count;
-
-  /**
-   * aggregate value
-   */
-  protected BigDecimal aggVal;
-
-  public AvgBigDecimalAggregator() {
-    aggVal = new BigDecimal(0);
-  }
-
-  /**
-   * Average Aggregate function which will add all the aggregate values and it
-   * will increment the total count every time, for average value
-   *
-   * @param newVal new value
-   */
-  @Override public void agg(Object newVal) {
-    if (newVal instanceof byte[]) {
-      ByteBuffer buffer = ByteBuffer.wrap((byte[]) newVal);
-      buffer.rewind();
-      while (buffer.hasRemaining()) {
-        byte[] valueByte = new byte[buffer.getInt()];
-        buffer.get(valueByte);
-        BigDecimal valueBigDecimal = DataTypeUtil.byteToBigDecimal(valueByte);
-        aggVal = aggVal.add(valueBigDecimal);
-
-        count += buffer.getDouble();
-        firstTime = false;
-      }
-      return;
-    }
-
-    if (firstTime) {
-      aggVal = (BigDecimal) newVal;
-      firstTime = false;
-    } else {
-      aggVal = aggVal.add((BigDecimal) newVal);
-    }
-    count++;
-  }
-
-  @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
-    if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
-      BigDecimal value = 
dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index);
-      aggVal = aggVal.add(value);
-      firstTime = false;
-      count++;
-    }
-  }
-
-  @Override public Object[] getAvgState() {
-    return new Object[]{aggVal, count};
-  }
-
-  /**
-   * Below method will be used to get the value byte array
-   */
-  @Override public byte[] getByteArray() {
-    if (firstTime) {
-      return new byte[0];
-    }
-    byte[] bytes = DataTypeUtil.bigDecimalToByte(aggVal);
-    ByteBuffer allocate =
-        ByteBuffer.allocate(4 + bytes.length + 
CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE);
-    allocate.putInt(bytes.length);
-    allocate.put(bytes);
-    allocate.putDouble(count);
-    allocate.rewind();
-
-    return allocate.array();
-  }
-
-  /**
-   * Return the average of the aggregate values
-   *
-   * @return average aggregate value
-   */
-  @Override public BigDecimal getBigDecimalValue() {
-    return aggVal.divide(new BigDecimal(count), 6);
-  }
-
-  /**
-   * This method merge the aggregated value, in average aggregator it will add
-   * count and aggregate value
-   *
-   * @param aggregator Avg Aggregator
-   */
-  @Override public void merge(MeasureAggregator aggregator) {
-    AvgBigDecimalAggregator avgAggregator = (AvgBigDecimalAggregator) 
aggregator;
-    if (!avgAggregator.isFirstTime()) {
-      aggVal = aggVal.add(avgAggregator.aggVal);
-      count += avgAggregator.count;
-      firstTime = false;
-    }
-  }
-
-  /**
-   * This method return the average value as an object
-   *
-   * @return average value as an object
-   */
-  @Override public Object getValueObject() {
-    return aggVal.divide(new BigDecimal(count));
-  }
-
-  /**
-   * @see MeasureAggregator#setNewValue(Object)
-   */
-  @Override public void setNewValue(Object newValue) {
-    aggVal = (BigDecimal) newValue;
-    count = 1;
-  }
-
-  @Override public void writeData(DataOutput output) throws IOException {
-    output.writeBoolean(firstTime);
-    output.writeUTF(aggVal.toString());
-    output.writeDouble(count);
-
-  }
-
-  @Override public void readData(DataInput inPut) throws IOException {
-    firstTime = inPut.readBoolean();
-    aggVal = new BigDecimal(inPut.readUTF());
-    count = inPut.readDouble();
-  }
-
-  @Override public MeasureAggregator getCopy() {
-    AvgBigDecimalAggregator avg = new AvgBigDecimalAggregator();
-    avg.aggVal = aggVal;
-    avg.count = count;
-    avg.firstTime = firstTime;
-    return avg;
-  }
-
-  @Override public int compareTo(MeasureAggregator o) {
-    BigDecimal val = getBigDecimalValue();
-    BigDecimal otherVal = o.getBigDecimalValue();
-
-    return val.compareTo(otherVal);
-  }
-
-  @Override public boolean equals(Object obj) {
-    if (!(obj instanceof AvgBigDecimalAggregator)) {
-      return false;
-    }
-    AvgBigDecimalAggregator o = (AvgBigDecimalAggregator) obj;
-    return getBigDecimalValue().equals(o.getBigDecimalValue());
-  }
-
-  @Override public int hashCode() {
-    return getBigDecimalValue().hashCode();
-  }
-
-  @Override public void merge(byte[] value) {
-    if (0 == value.length) {
-      return;
-    }
-    ByteBuffer buffer = ByteBuffer.wrap(value);
-
-    byte[] valueByte = new byte[buffer.getInt()];
-    buffer.get(valueByte);
-    BigDecimal valueBigDecimal = DataTypeUtil.byteToBigDecimal(valueByte);
-    aggVal = aggVal.add(valueBigDecimal);
-    count += buffer.getDouble();
-    firstTime = false;
-  }
-
-  public String toString() {
-    return (aggVal.divide(new BigDecimal(count))) + "";
-  }
-
-  @Override public MeasureAggregator getNew() {
-    return new AvgBigDecimalAggregator();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/aggregator/impl/avg/AvgDoubleAggregator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/carbondata/query/aggregator/impl/avg/AvgDoubleAggregator.java
 
b/core/src/main/java/org/carbondata/query/aggregator/impl/avg/AvgDoubleAggregator.java
deleted file mode 100644
index bacff29..0000000
--- 
a/core/src/main/java/org/carbondata/query/aggregator/impl/avg/AvgDoubleAggregator.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.query.aggregator.impl.avg;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
-import org.carbondata.core.constants.CarbonCommonConstants;
-import org.carbondata.query.aggregator.MeasureAggregator;
-
-public class AvgDoubleAggregator extends AbstractAvgAggregator {
-
-  /**
-   * serialVersionUID
-   */
-  private static final long serialVersionUID = 5463736686281089871L;
-
-  /**
-   * total number of aggregate values
-   */
-  protected double count;
-
-  /**
-   * aggregate value
-   */
-  protected double aggVal;
-
-  /**
-   * Average Aggregate function which will add all the aggregate values and it
-   * will increment the total count every time, for average value
-   *
-   * @param newVal new value
-   */
-  @Override public void agg(double newVal) {
-    aggVal += newVal;
-    count++;
-    firstTime = false;
-  }
-
-  /**
-   * Average Aggregate function which will add all the aggregate values and it
-   * will increment the total count every time, for average value
-   *
-   * @param newVal new value
-   */
-  @Override public void agg(Object newVal) {
-    if (newVal instanceof byte[]) {
-      ByteBuffer buffer = ByteBuffer.wrap((byte[]) newVal);
-      buffer.rewind();
-      while (buffer.hasRemaining()) {
-        aggVal += buffer.getDouble();
-        count += buffer.getDouble();
-        firstTime = false;
-      }
-      return;
-    }
-    aggVal += ((Number) newVal).doubleValue();
-    count++;
-    firstTime = false;
-  }
-
-  @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
-    if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
-      aggVal += 
dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index);
-      count++;
-      firstTime = false;
-    }
-  }
-
-  /**
-   * Below method will be used to get the value byte array
-   */
-  @Override public byte[] getByteArray() {
-    if (firstTime) {
-      return new byte[0];
-    }
-    ByteBuffer buffer = ByteBuffer.allocate(2 * 
CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE);
-    buffer.putDouble(aggVal);
-    buffer.putDouble(count);
-    return buffer.array();
-  }
-
-  /**
-   * Return the average of the aggregate values
-   *
-   * @return average aggregate value
-   */
-  @Override public Double getDoubleValue() {
-    return aggVal / count;
-  }
-
-  @Override public Object[] getAvgState() {
-    return new Object[]{aggVal, count};
-  }
-
-  /**
-   * This method merge the aggregated value, in average aggregator it will add
-   * count and aggregate value
-   *
-   * @param aggregator Avg Aggregator
-   */
-  @Override public void merge(MeasureAggregator aggregator) {
-    AvgDoubleAggregator avgAggregator = (AvgDoubleAggregator) aggregator;
-    if (!avgAggregator.isFirstTime()) {
-      aggVal += avgAggregator.aggVal;
-      count += avgAggregator.count;
-      firstTime = false;
-    }
-  }
-
-  /**
-   * This method return the average value as an object
-   *
-   * @return average value as an object
-   */
-  @Override public Object getValueObject() {
-    return aggVal / count;
-  }
-
-  /**
-   * @see MeasureAggregator#setNewValue(Object)
-   */
-  @Override public void setNewValue(Object newValue) {
-    aggVal = (Double) newValue;
-    count = 1;
-  }
-
-  @Override public void writeData(DataOutput output) throws IOException {
-    output.writeBoolean(firstTime);
-    output.writeDouble(aggVal);
-    output.writeDouble(count);
-
-  }
-
-  @Override public void readData(DataInput inPut) throws IOException {
-    firstTime = inPut.readBoolean();
-    aggVal = inPut.readDouble();
-    count = inPut.readDouble();
-  }
-
-  @Override public MeasureAggregator getCopy() {
-    AvgDoubleAggregator avg = new AvgDoubleAggregator();
-    avg.aggVal = aggVal;
-    avg.count = count;
-    avg.firstTime = firstTime;
-    return avg;
-  }
-
-  @Override public int compareTo(MeasureAggregator o) {
-    double val = getDoubleValue();
-    double otherVal = o.getDoubleValue();
-    if (val > otherVal) {
-      return 1;
-    }
-    if (val < otherVal) {
-      return -1;
-    }
-    return 0;
-  }
-
-  @Override public boolean equals(Object obj) {
-    if(!(obj instanceof AvgDoubleAggregator)) {
-      return false;
-    }
-    AvgDoubleAggregator o = (AvgDoubleAggregator)obj;
-    return getDoubleValue().equals(o.getDoubleValue());
-  }
-
-  @Override public int hashCode() {
-    return getDoubleValue().hashCode();
-  }
-
-  @Override public void merge(byte[] value) {
-    if (0 == value.length) {
-      return;
-    }
-    ByteBuffer buffer = ByteBuffer.wrap(value);
-    aggVal += buffer.getDouble();
-    count += buffer.getDouble();
-    firstTime = false;
-  }
-
-  public String toString() {
-    return (aggVal / count) + "";
-  }
-
-  @Override public MeasureAggregator getNew() {
-    return new AvgDoubleAggregator();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/aggregator/impl/avg/AvgLongAggregator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/carbondata/query/aggregator/impl/avg/AvgLongAggregator.java
 
b/core/src/main/java/org/carbondata/query/aggregator/impl/avg/AvgLongAggregator.java
deleted file mode 100644
index 6290de6..0000000
--- 
a/core/src/main/java/org/carbondata/query/aggregator/impl/avg/AvgLongAggregator.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.query.aggregator.impl.avg;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
-import org.carbondata.core.constants.CarbonCommonConstants;
-import org.carbondata.query.aggregator.MeasureAggregator;
-
-public class AvgLongAggregator extends AbstractAvgAggregator {
-
-  /**
-   * serialVersionUID
-   */
-  private static final long serialVersionUID = 5463736686281089871L;
-
-  /**
-   * total number of aggregate values
-   */
-  protected double count;
-
-  /**
-   * aggregate value
-   */
-  protected long aggVal;
-
-  /**
-   * Average Aggregate function which will add all the aggregate values and it
-   * will increment the total count every time, for average value
-   *
-   * @param newVal new value
-   */
-  @Override public void agg(Object newVal) {
-    if (newVal instanceof byte[]) {
-      ByteBuffer buffer = ByteBuffer.wrap((byte[]) newVal);
-      buffer.rewind();
-      while (buffer.hasRemaining()) {
-        aggVal += buffer.getLong();
-        count += buffer.getDouble();
-        firstTime = false;
-      }
-      return;
-    }
-    aggVal += (Long) newVal;
-    count++;
-    firstTime = false;
-  }
-
-  @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
-    if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
-      aggVal += 
dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index);
-      count++;
-      firstTime = false;
-    }
-  }
-
-  @Override public Object[] getAvgState() {
-    return new Object[]{aggVal, count};
-  }
-
-  /**
-   * Below method will be used to get the value byte array
-   */
-  @Override public byte[] getByteArray() {
-    if (firstTime) {
-      return new byte[0];
-    }
-    ByteBuffer buffer = ByteBuffer.allocate(
-        CarbonCommonConstants.LONG_SIZE_IN_BYTE + 
CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE);
-    buffer.putLong(aggVal);
-    buffer.putDouble(count);
-    return buffer.array();
-  }
-
-  @Override public Long getLongValue() {
-    return aggVal / (long) count;
-  }
-
-  /**
-   * This method merge the aggregated value, in average aggregator it will add
-   * count and aggregate value
-   *
-   * @param aggregator Avg Aggregator
-   */
-  @Override public void merge(MeasureAggregator aggregator) {
-    AvgLongAggregator avgAggregator = (AvgLongAggregator) aggregator;
-    if (!avgAggregator.isFirstTime()) {
-      aggVal += avgAggregator.aggVal;
-      count += avgAggregator.count;
-      firstTime = false;
-    }
-  }
-
-  /**
-   * This method return the average value as an object
-   *
-   * @return average value as an object
-   */
-  @Override public Object getValueObject() {
-    return aggVal / count;
-  }
-
-  /**
-   * @see MeasureAggregator#setNewValue(Object)
-   */
-  @Override public void setNewValue(Object newValue) {
-    aggVal = (Long) newValue;
-    count = 1;
-  }
-
-  @Override public void writeData(DataOutput output) throws IOException {
-    output.writeBoolean(firstTime);
-    output.writeLong(aggVal);
-    output.writeDouble(count);
-
-  }
-
-  @Override public void readData(DataInput inPut) throws IOException {
-    firstTime = inPut.readBoolean();
-    aggVal = inPut.readLong();
-    count = inPut.readDouble();
-  }
-
-  @Override public MeasureAggregator getCopy() {
-    AvgLongAggregator avg = new AvgLongAggregator();
-    avg.aggVal = aggVal;
-    avg.count = count;
-    avg.firstTime = firstTime;
-    return avg;
-  }
-
-  @Override public int compareTo(MeasureAggregator o) {
-    long val = getLongValue();
-    long otherVal = o.getLongValue();
-    if (val > otherVal) {
-      return 1;
-    } else if (val < otherVal) {
-      return -1;
-    } else {
-      return 0;
-    }
-  }
-
-  @Override public boolean equals(Object obj) {
-    if(!(obj instanceof AvgLongAggregator)) {
-      return false;
-    }
-    AvgLongAggregator o = (AvgLongAggregator)obj;
-    return getLongValue().equals(o.getLongValue());
-  }
-
-  @Override public int hashCode() {
-    return getLongValue().hashCode();
-  }
-
-  @Override public void merge(byte[] value) {
-    if (0 == value.length) {
-      return;
-    }
-    ByteBuffer buffer = ByteBuffer.wrap(value);
-    aggVal += buffer.getLong();
-    count += buffer.getDouble();
-    firstTime = false;
-  }
-
-  public String toString() {
-    return (aggVal / count) + "";
-  }
-
-  @Override public MeasureAggregator getNew() {
-    // TODO Auto-generated method stub
-    return new AvgLongAggregator();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/aggregator/impl/count/CountAggregator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/carbondata/query/aggregator/impl/count/CountAggregator.java
 
b/core/src/main/java/org/carbondata/query/aggregator/impl/count/CountAggregator.java
deleted file mode 100644
index 352e7aa..0000000
--- 
a/core/src/main/java/org/carbondata/query/aggregator/impl/count/CountAggregator.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.query.aggregator.impl.count;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-
-import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
-import org.carbondata.core.constants.CarbonCommonConstants;
-import org.carbondata.query.aggregator.MeasureAggregator;
-
-/**
- * Class Description : It will return total count of values
- */
-public class CountAggregator implements MeasureAggregator {
-
-  /**
-   * serialVersionUID
-   */
-  private static final long serialVersionUID = 2678878935295306313L;
-
-  /**
-   * aggregate value
-   */
-  private double aggVal;
-
-  /**
-   * Count Aggregate function which update the total count
-   *
-   * @param newVal new value
-   */
-  @Override public void agg(double newVal) {
-    aggVal++;
-  }
-
-  /**
-   * Count Aggregate function which update the total count
-   *
-   * @param newVal new value
-   */
-  @Override public void agg(Object newVal) {
-    aggVal++;
-  }
-
-  @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
-    if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
-      aggVal++;
-    }
-  }
-
-  /**
-   * Below method will be used to get the value byte array
-   */
-  @Override public byte[] getByteArray() {
-    ByteBuffer buffer = 
ByteBuffer.allocate(CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE);
-    buffer.putDouble(aggVal);
-    return buffer.array();
-  }
-
-  /**
-   * Returns the total count
-   *
-   * @return total count
-   */
-  @Override public Double getDoubleValue() {
-    return aggVal;
-  }
-
-  @Override public Long getLongValue() {
-    return (long) aggVal;
-  }
-
-  @Override public BigDecimal getBigDecimalValue() {
-    return new BigDecimal(aggVal);
-  }
-
-  /**
-   * Merge the total count with the aggregator
-   *
-   * @param aggregator count aggregator
-   */
-  @Override public void merge(MeasureAggregator aggregator) {
-    CountAggregator countAggregator = (CountAggregator) aggregator;
-    aggVal += countAggregator.aggVal;
-  }
-
-  /**
-   * Overloaded Aggregate function will be used for Aggregate tables because
-   * aggregate table will have fact_count as a measure. It will update the
-   * total count
-   *
-   * @param newVal
-   *            new value
-   * @param factCount
-   *            total fact count
-   *
-   */
-  //    @Override
-  //    public void agg(double newVal, double factCount)
-  //    {
-  //        agg(newVal, null, 0, 0);
-  //    }
-
-  /**
-   * This method return the count value as an object
-   *
-   * @return count value as an object
-   */
-
-  @Override public Object getValueObject() {
-    return aggVal;
-  }
-
-  /**
-   * @see MeasureAggregator#setNewValue(Object)
-   */
-  @Override public void setNewValue(Object newValue) {
-    aggVal += Double.parseDouble(String.valueOf(newValue));
-  }
-
-  @Override public boolean isFirstTime() {
-    return false;
-  }
-
-  @Override public void writeData(DataOutput output) throws IOException {
-    output.writeDouble(aggVal);
-
-  }
-
-  @Override public void readData(DataInput inPut) throws IOException {
-    aggVal = inPut.readDouble();
-  }
-
-  @Override public MeasureAggregator getCopy() {
-    CountAggregator aggregator = new CountAggregator();
-    aggregator.aggVal = aggVal;
-    return aggregator;
-  }
-
-  @Override public void merge(byte[] value) {
-    if (0 == value.length) {
-      return;
-    }
-    ByteBuffer buffer = ByteBuffer.wrap(value);
-    aggVal += buffer.getDouble();
-  }
-
-  @Override public int compareTo(MeasureAggregator obj) {
-    double val = getDoubleValue();
-    double otherVal = obj.getDoubleValue();
-    if (val > otherVal) {
-      return 1;
-    }
-    if (val < otherVal) {
-      return -1;
-    }
-    return 0;
-  }
-
-  @Override public boolean equals(Object obj) {
-    if(!(obj instanceof CountAggregator)) {
-      return false;
-    }
-    CountAggregator o = (CountAggregator)obj;
-    return getDoubleValue().equals(o.getDoubleValue());
-  }
-
-  @Override public int hashCode() {
-    return getDoubleValue().hashCode();
-  }
-
-  @Override public MeasureAggregator get() {
-    return this;
-  }
-
-  public String toString() {
-    return aggVal + "";
-  }
-
-  @Override public MeasureAggregator getNew() {
-    return new CountAggregator();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/AbstractDistinctCountAggregatorObjectSet.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/AbstractDistinctCountAggregatorObjectSet.java
 
b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/AbstractDistinctCountAggregatorObjectSet.java
deleted file mode 100644
index 0629007..0000000
--- 
a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/AbstractDistinctCountAggregatorObjectSet.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.query.aggregator.impl.distinct;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.carbondata.core.constants.CarbonCommonConstants;
-import org.carbondata.query.aggregator.MeasureAggregator;
-
-public abstract class AbstractDistinctCountAggregatorObjectSet implements 
MeasureAggregator {
-
-  private static final long serialVersionUID = 6313463368629960186L;
-
-  protected Set<Object> valueSetForObj;
-
-  public AbstractDistinctCountAggregatorObjectSet() {
-    valueSetForObj = new 
HashSet<Object>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-  }
-
-  /**
-   * just need to add the unique values to agg set
-   */
-  @Override public void agg(double newVal) {
-  }
-
-  /**
-   * Distinct count Aggregate function which update the Distinct count
-   *
-   * @param newVal new value
-   */
-  @Override public void agg(Object newVal) {
-    valueSetForObj.add(newVal);
-  }
-
-  /**
-   * Below method will be used to get the value byte array
-   */
-  @Override public byte[] getByteArray() {
-    return null;
-  }
-
-  @Override public Double getDoubleValue() {
-    return (double) valueSetForObj.size();
-  }
-
-  @Override public Long getLongValue() {
-    return (long) valueSetForObj.size();
-  }
-
-  @Override public BigDecimal getBigDecimalValue() {
-    return new BigDecimal(valueSetForObj.size());
-  }
-
-  @Override public Object getValueObject() {
-    return valueSetForObj.size();
-  }
-
-  @Override public void setNewValue(Object newValue) {
-    valueSetForObj.add(newValue);
-  }
-
-  @Override public boolean isFirstTime() {
-    return false;
-  }
-
-  @Override public void writeData(DataOutput output) throws IOException {
-
-  }
-
-  @Override public void readData(DataInput inPut) throws IOException {
-
-  }
-
-  public String toString() {
-    return valueSetForObj.size() + "";
-  }
-
-  @Override public void merge(byte[] value) {
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregator.java
 
b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregator.java
deleted file mode 100644
index 1b2b33d..0000000
--- 
a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregator.java
+++ /dev/null
@@ -1,319 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.query.aggregator.impl.distinct;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-
-import org.carbondata.common.logging.LogService;
-import org.carbondata.common.logging.LogServiceFactory;
-import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
-import org.carbondata.query.aggregator.MeasureAggregator;
-
-import org.roaringbitmap.IntIterator;
-import org.roaringbitmap.RoaringBitmap;
-
-/**
- * * The distinct count aggregator
- * Ex:
- * ID NAME Sales
- * <p>1 a 200
- * <p>2 a 100
- * <p>3 a 200
- * select count(distinct sales) # would result 2
- * select count(sales) # would result 3
- */
-public class DistinctCountAggregator implements MeasureAggregator {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(DistinctCountAggregator.class.getName());
-  /**
-   *
-   */
-  private static final long serialVersionUID = 6313463368629960186L;
-  /**
-   * For Spark CARBON to avoid heavy object transfer it better to flatten
-   * the Aggregators. There is no aggregation expected after setting this 
value.
-   */
-  private Double computedFixedValue;
-  /**
-   *
-   */
-  //    private Set<Double> valueSet;
-  private RoaringBitmap valueSet;
-
-  private byte[] data;
-
-  private double minValue;
-
-  public DistinctCountAggregator(Object minValue) {
-    valueSet = new RoaringBitmap();
-    if (minValue instanceof BigDecimal) {
-      this.minValue = ((BigDecimal) minValue).doubleValue();
-    } else if (minValue instanceof Long) {
-      this.minValue = ((Long) minValue).doubleValue();
-    } else {
-      this.minValue = (Double) minValue;
-    }
-  }
-
-  public DistinctCountAggregator() {
-    valueSet = new RoaringBitmap();
-  }
-
-  /**
-   * just need to add the unique values to agg set
-   */
-  @Override public void agg(double newVal) {
-    valueSet.add((int) (newVal - minValue));
-  }
-
-  /**
-   * Distinct count Aggregate function which update the Distinct count
-   *
-   * @param newVal new value
-   */
-  @Override public void agg(Object newVal) {
-    if (newVal instanceof byte[]) {
-      byte[] values = (byte[]) newVal;
-      ByteBuffer buffer = ByteBuffer.wrap(values);
-      buffer.rewind();
-      while (buffer.hasRemaining()) {
-        valueSet.add(buffer.getInt());
-      }
-      return;
-    } else {
-      double value = new Double(newVal.toString());
-      agg(value);
-    }
-  }
-
-  @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
-    if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
-      valueSet.add((int) 
dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index));
-    }
-  }
-
-  /**
-   * Below method will be used to get the value byte array
-   */
-  @Override public byte[] getByteArray() {
-    if (valueSet.getCardinality() == 0) {
-      return new byte[0];
-    }
-    IntIterator iterator = valueSet.getIntIterator();
-    ByteBuffer buffer = ByteBuffer.allocate(valueSet.getCardinality() * 4 + 8);
-    buffer.putDouble(minValue);
-    while (iterator.hasNext()) {
-      buffer.putInt(iterator.next());
-    }
-    buffer.rewind();
-    return buffer.array();
-  }
-
-  private void agg(RoaringBitmap set2, double minValue) {
-    if (this.minValue == minValue) {
-      valueSet.or(set2);
-    } else {
-      if (this.minValue > minValue) {
-        IntIterator intIterator = valueSet.getIntIterator();
-        while (intIterator.hasNext()) {
-          set2.add((int) ((double) (intIterator.next() + this.minValue) - 
minValue));
-        }
-        this.minValue = minValue;
-        this.valueSet = set2;
-      } else {
-        IntIterator intIterator = set2.getIntIterator();
-        while (intIterator.hasNext()) {
-          valueSet.add((int) ((double) (intIterator.next() + minValue) - 
this.minValue));
-        }
-      }
-    }
-  }
-
-  /**
-   * merge the valueset so that we get the count of unique values
-   */
-  @Override public void merge(MeasureAggregator aggregator) {
-    DistinctCountAggregator distinctCountAggregator = 
(DistinctCountAggregator) aggregator;
-    readData();
-    distinctCountAggregator.readData();
-    if (distinctCountAggregator.valueSet != null) {
-      agg(distinctCountAggregator.valueSet, distinctCountAggregator.minValue);
-    }
-  }
-
-  @Override public Double getDoubleValue() {
-    if (computedFixedValue == null) {
-      readData();
-      return (double) valueSet.getCardinality();
-    }
-    return computedFixedValue;
-  }
-
-  @Override public Long getLongValue() {
-    if (computedFixedValue == null) {
-      readData();
-      return (long) valueSet.getCardinality();
-    }
-    return computedFixedValue.longValue();
-  }
-
-  @Override public BigDecimal getBigDecimalValue() {
-    if (computedFixedValue == null) {
-      readData();
-      return new BigDecimal(valueSet.getCardinality());
-    }
-    return new BigDecimal(computedFixedValue);
-  }
-
-  @Override public Object getValueObject() {
-    return valueSet.getCardinality();
-  }
-
-  @Override public void setNewValue(Object newValue) {
-    computedFixedValue = (Double) newValue;
-    valueSet = null;
-  }
-
-  @Override public boolean isFirstTime() {
-    return false;
-  }
-
-  @Override public void writeData(DataOutput output) throws IOException {
-
-    if (computedFixedValue != null) {
-      ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 8);
-      byteBuffer.putInt(-1);
-      byteBuffer.putDouble(computedFixedValue);
-      byteBuffer.flip();
-      output.write(byteBuffer.array());
-    } else {
-      if (valueSet != null) {
-        valueSet.serialize(output);
-      } else {
-        output.write(data);
-      }
-    }
-  }
-
-  @Override public void readData(DataInput inPut) throws IOException {
-    valueSet = new RoaringBitmap();
-    valueSet.deserialize(inPut);
-  }
-
-  private void readData() {
-    if (data != null && (valueSet == null || valueSet.isEmpty())) {
-      ByteArrayInputStream stream = new ByteArrayInputStream(data);
-      DataInputStream outputStream = new DataInputStream(stream);
-      try {
-        readData(outputStream);
-        outputStream.close();
-        data = null;
-      } catch (IOException e) {
-        LOGGER.error(e, e.getMessage());
-      }
-    }
-  }
-
-  @Override public MeasureAggregator getCopy() {
-    DistinctCountAggregator aggr = new DistinctCountAggregator(minValue);
-    aggr.valueSet = valueSet.clone();
-    return aggr;
-  }
-
-  @Override public int compareTo(MeasureAggregator measureAggr) {
-    double compFixedVal = getDoubleValue();
-    double otherVal = measureAggr.getDoubleValue();
-    if (compFixedVal > otherVal) {
-      return 1;
-    }
-    if (compFixedVal < otherVal) {
-      return -1;
-    }
-    return 0;
-  }
-
-  @Override public boolean equals(Object obj) {
-    if(!(obj instanceof DistinctCountAggregator)) {
-      return false;
-    }
-    DistinctCountAggregator o = (DistinctCountAggregator) obj;
-    return getDoubleValue().equals(o.getDoubleValue());
-  }
-
-  @Override public int hashCode() {
-    return getDoubleValue().hashCode();
-  }
-
-  @Override public MeasureAggregator get() {
-    ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
-    DataOutputStream outputStream = new DataOutputStream(byteStream);
-    try {
-      writeData(outputStream);
-    } catch (IOException ex) {
-      LOGGER.error(ex, ex.getMessage());
-    }
-    data = byteStream.toByteArray();
-    valueSet = null;
-    return this;
-  }
-
-  public String toString() {
-    if (computedFixedValue == null) {
-      readData();
-      return valueSet.getCardinality() + "";
-    }
-    return computedFixedValue + "";
-  }
-
-  public RoaringBitmap getBitMap() {
-    return valueSet;
-  }
-
-  public double getMinValue() {
-    return minValue;
-  }
-
-  @Override public void merge(byte[] value) {
-    if (0 == value.length) {
-      return;
-    }
-    ByteBuffer buffer = ByteBuffer.wrap(value);
-    buffer.rewind();
-    double currentMinValue = buffer.getDouble();
-    while (buffer.hasRemaining()) {
-      agg(buffer.getInt() + currentMinValue);
-    }
-  }
-
-  @Override public MeasureAggregator getNew() {
-    // TODO Auto-generated method stub
-    return new DistinctCountAggregator();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregatorObjectSet.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregatorObjectSet.java
 
b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregatorObjectSet.java
deleted file mode 100644
index 3b26e53..0000000
--- 
a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregatorObjectSet.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.carbondata.query.aggregator.impl.distinct;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
-import org.carbondata.query.aggregator.MeasureAggregator;
-
-public class DistinctCountAggregatorObjectSet extends 
AbstractDistinctCountAggregatorObjectSet {
-
-  private static final long serialVersionUID = 6313463368629960186L;
-
-  /**
-   * just need to add the unique values to agg set
-   */
-  @Override public void agg(double newVal) {
-    valueSetForObj.add(newVal);
-  }
-
-  @Override public void agg(MeasureColumnDataChunk dataChunk, int index) {
-    if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) {
-      
valueSetForObj.add(dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index));
-    }
-  }
-
-  private void agg(Set<Object> set2) {
-    valueSetForObj.addAll(set2);
-  }
-
-  /**
-   * merge the valueset so that we get the count of unique values
-   */
-  @Override public void merge(MeasureAggregator aggregator) {
-    DistinctCountAggregatorObjectSet distinctCountAggregator =
-        (DistinctCountAggregatorObjectSet) aggregator;
-    agg(distinctCountAggregator.valueSetForObj);
-  }
-
-  @Override public MeasureAggregator getCopy() {
-    DistinctCountAggregatorObjectSet aggregator = new 
DistinctCountAggregatorObjectSet();
-    aggregator.valueSetForObj = new HashSet<Object>(valueSetForObj);
-    return aggregator;
-  }
-
-  @Override public int compareTo(MeasureAggregator measureAggr) {
-    double valueSetForObjSize = getDoubleValue();
-    double otherVal = measureAggr.getDoubleValue();
-    if (valueSetForObjSize > otherVal) {
-      return 1;
-    }
-    if (valueSetForObjSize < otherVal) {
-      return -1;
-    }
-    return 0;
-  }
-
-  @Override public boolean equals(Object obj) {
-    if (!(obj instanceof DistinctCountAggregatorObjectSet)) {
-      return false;
-    }
-    DistinctCountAggregatorObjectSet o = (DistinctCountAggregatorObjectSet) 
obj;
-    return getDoubleValue().equals(o.getDoubleValue());
-  }
-
-  @Override public int hashCode() {
-    return getDoubleValue().hashCode();
-  }
-
-  @Override public MeasureAggregator get() {
-    return this;
-  }
-
-  @Override public MeasureAggregator getNew() {
-    return new DistinctCountAggregatorObjectSet();
-  }
-
-}


Reply via email to