Refactored core package and fixed all testcases (#684) * Optimizing detail query
* Optimizing detail query flow * Optimizing detail query flow * Optimized raw detail query to improve push up performance. * Fixed bugs * reverted wrong check in * Rebased the code * Removed aggregation from core * Refactored core package and fixed test cases * Fixed bugs * Fixed review comments and deleted aggregate classes after merge from master * Removed unused code Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/6288ec71 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/6288ec71 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/6288ec71 Branch: refs/heads/master Commit: 6288ec7152bbb832884de24e2e5912a0feecc0af Parents: a83dba3 Author: Ravindra Pesala <ravi.pes...@gmail.com> Authored: Sat Jun 18 06:47:19 2016 +0530 Committer: Jacky Li <jacky.li...@huawei.com> Committed: Sat Jun 18 09:17:19 2016 +0800 ---------------------------------------------------------------------- .../aggregator/CalculatedMeasureAggregator.java | 30 -- .../aggregator/CustomMeasureAggregator.java | 39 --- .../query/aggregator/MeasureAggregator.java | 144 -------- .../impl/AbstractMeasureAggregatorBasic.java | 60 ---- .../impl/AbstractMeasureAggregatorMaxMin.java | 177 ---------- .../query/aggregator/impl/BitSet.java | 337 ------------------- .../aggregator/impl/CustomAggregatorHelper.java | 235 ------------- .../impl/avg/AbstractAvgAggregator.java | 28 -- .../impl/avg/AvgBigDecimalAggregator.java | 218 ------------ .../impl/avg/AvgDoubleAggregator.java | 210 ------------ .../aggregator/impl/avg/AvgLongAggregator.java | 195 ----------- .../aggregator/impl/count/CountAggregator.java | 204 ----------- ...bstractDistinctCountAggregatorObjectSet.java | 103 ------ .../impl/distinct/DistinctCountAggregator.java | 319 ------------------ .../DistinctCountAggregatorObjectSet.java | 96 ------ ...tinctCountBigDecimalAggregatorObjectSet.java | 86 ----- .../DistinctCountLongAggregatorObjectSet.java | 89 ----- .../distinct/DistinctStringCountAggregator.java | 165 --------- .../SumDistinctBigDecimalAggregator.java | 233 ------------- .../distinct/SumDistinctDoubleAggregator.java | 231 ------------- .../distinct/SumDistinctLongAggregator.java | 221 ------------ .../dummy/AbstractMeasureAggregatorDummy.java | 70 ---- .../impl/dummy/DummyBigDecimalAggregator.java | 63 ---- .../impl/dummy/DummyDoubleAggregator.java | 64 ---- .../impl/dummy/DummyLongAggregator.java | 59 ---- .../impl/max/AbstractMaxAggregator.java | 61 ---- .../aggregator/impl/max/MaxAggregator.java | 68 ---- .../impl/max/MaxBigDecimalAggregator.java | 67 ---- .../aggregator/impl/max/MaxLongAggregator.java | 67 ---- .../impl/min/AbstractMinAggregator.java | 61 ---- .../aggregator/impl/min/MinAggregator.java | 66 ---- .../impl/min/MinBigDecimalAggregator.java | 66 ---- .../aggregator/impl/min/MinLongAggregator.java | 66 ---- .../impl/sum/SumBigDecimalAggregator.java | 179 ---------- .../impl/sum/SumDoubleAggregator.java | 179 ---------- .../aggregator/impl/sum/SumLongAggregator.java | 165 --------- .../util/MeasureAggregatorFactory.java | 182 ---------- .../query/carbon/aggregator/DataAggregator.java | 76 ----- .../dimension/DimensionDataAggregator.java | 40 --- .../impl/ColumnGroupDimensionsAggregator.java | 109 ------ .../DirectDictionaryDimensionAggregator.java | 114 ------- .../impl/FixedLengthDimensionAggregator.java | 131 ------- .../impl/VariableLengthDimensionAggregator.java | 126 ------- .../expression/ExpressionAggregator.java | 135 -------- .../impl/ListBasedResultAggregator.java | 128 ++++--- .../impl/MapBasedResultAggregator.java | 171 ---------- .../measure/MeasureDataAggregator.java | 48 --- .../measure/impl/FactTableAggregator.java | 87 ----- .../carbon/executor/QueryExecutorFactory.java | 48 +-- .../executor/impl/AbstractQueryExecutor.java | 51 +-- .../executor/impl/AggregationQueryExecutor.java | 47 --- .../executor/impl/CountStarQueryExecutor.java | 45 --- .../executor/impl/DetailQueryExecutor.java | 5 +- .../impl/DetailRawRecordQueryExcecutor.java | 26 -- .../impl/DetailRawRecordQueryExecutor.java | 26 ++ .../impl/DetailWithOrderByQueryExecutor.java | 53 --- .../executor/impl/FunctionQueryExecutor.java | 37 -- .../executor/impl/QueryExecutorProperties.java | 12 +- .../carbon/executor/infos/AggregatorInfo.java | 30 +- .../executor/infos/BlockExecutionInfo.java | 56 +-- .../impl/InternalAbstractQueryExecutor.java | 12 +- .../impl/InternalAggregationQueryExecutor.java | 44 --- .../impl/InternalCountStartQueryExecutor.java | 83 ----- .../impl/InternalDetailQueryExecutor.java | 7 +- .../InternalDetailWithOrderQueryExecutor.java | 74 ---- .../impl/InternalFunctionQueryExecutor.java | 36 -- .../executor/internal/impl/QueryRunner.java | 7 +- .../query/carbon/executor/util/QueryUtil.java | 189 +---------- .../merger/AbstractScannedResultMerger.java | 7 +- .../merger/impl/SortedScannedResultMerger.java | 176 ---------- .../query/carbon/model/CarbonQueryPlan.java | 30 -- .../carbon/model/CustomAggregateExpression.java | 149 -------- .../query/carbon/model/QueryModel.java | 55 --- .../processor/AbstractDataBlockProcessor.java | 11 +- .../carbon/result/AbstractScannedResult.java | 8 + .../query/carbon/result/BatchRawResult.java | 130 ++----- .../query/carbon/result/BatchResult.java | 64 ++-- .../carbon/result/ListBasedResultWrapper.java | 7 +- .../carbondata/query/carbon/result/Result.java | 7 +- .../query/carbon/result/RowResult.java | 45 --- .../carbon/result/impl/ListBasedResult.java | 11 +- .../carbon/result/impl/MapBasedResult.java | 141 -------- .../AbstractDetailQueryResultIterator.java | 8 +- .../ChunkBasedDetailResultIterator.java | 75 +++++ .../iterator/ChunkBasedResultIterator.java | 71 ---- .../result/iterator/ChunkRawRowIterartor.java | 56 --- .../result/iterator/ChunkRowIterator.java | 5 +- .../iterator/DetailQueryResultIterator.java | 12 +- .../iterator/DetailRawQueryResultIterator.java | 89 +++-- .../preparator/QueryResultPreparator.java | 5 +- .../impl/AbstractQueryResultPreparator.java | 61 ++-- .../impl/DetailQueryResultPreparatorImpl.java | 139 ++++++++ .../impl/QueryResultPreparatorImpl.java | 297 ---------------- .../impl/RawQueryResultPreparatorImpl.java | 87 ++--- .../executer/RowLevelFilterExecuterImpl.java | 59 ++-- .../MeasureColumnResolvedFilterInfo.java | 10 - .../query/scanner/impl/CarbonKey.java | 105 ------ .../query/scanner/impl/CarbonValue.java | 87 ----- .../carbondata/hadoop/CarbonRecordReader.java | 8 +- .../spark/sql/CarbonCatalystOperators.scala | 330 ------------------ .../spark/sql/CarbonDictionaryDecoder.scala | 29 +- .../org/apache/spark/sql/CarbonOperators.scala | 35 +- .../org/apache/spark/sql/CarbonSQLConf.scala | 11 - .../org/apache/spark/sql/CarbonSqlParser.scala | 10 +- .../execution/command/carbonTableSchema.scala | 33 +- .../spark/sql/hive/CarbonStrategies.scala | 86 ++--- .../CarbonDecoderOptimizerHelper.scala | 8 +- .../spark/sql/optimizer/CarbonOptimizer.scala | 39 ++- .../org/carbondata/spark/CarbonFilters.scala | 58 ++-- .../scala/org/carbondata/spark/KeyVal.scala | 28 +- .../spark/rdd/CarbonCleanFilesRDD.scala | 22 +- .../spark/rdd/CarbonDataRDDFactory.scala | 40 +-- .../spark/rdd/CarbonDeleteLoadRDD.scala | 40 +-- .../spark/rdd/CarbonDropAggregateTableRDD.scala | 85 ----- .../spark/rdd/CarbonDropCubeRDD.scala | 77 ----- .../spark/rdd/CarbonDropTableRDD.scala | 76 +++++ .../carbondata/spark/rdd/CarbonMergerRDD.scala | 3 +- .../carbondata/spark/rdd/CarbonScanRDD.scala | 42 +-- .../carbondata/spark/util/QueryPlanUtil.scala | 5 +- .../AllDataTypesTestCaseAggregate.scala | 1 + .../store/colgroup/ColGroupMinMaxTest.java | 2 +- 121 files changed, 877 insertions(+), 9154 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/aggregator/CalculatedMeasureAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/CalculatedMeasureAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/CalculatedMeasureAggregator.java deleted file mode 100644 index d115a91..0000000 --- a/core/src/main/java/org/carbondata/query/aggregator/CalculatedMeasureAggregator.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.carbondata.query.aggregator; - -public interface CalculatedMeasureAggregator extends MeasureAggregator { - - /** - * Calculate calculated measures - * - * @param aggregators - */ - void calculateCalcMeasure(MeasureAggregator[] aggregators); -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/aggregator/CustomMeasureAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/CustomMeasureAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/CustomMeasureAggregator.java deleted file mode 100644 index 7c14a7c..0000000 --- a/core/src/main/java/org/carbondata/query/aggregator/CustomMeasureAggregator.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.carbondata.query.aggregator; - -import java.util.List; - -import org.carbondata.query.carbonfilterinterface.RowIntf; -import org.carbondata.query.expression.ColumnExpression; - -public interface CustomMeasureAggregator extends MeasureAggregator { - /** - * Aggregate method with generic row interface where RowIntf holds value for - * each column given in MeasureAggregator@getColumns() - */ - void agg(RowIntf row); - - /** - * @return List of columns required for the aggregator - */ - List<ColumnExpression> getColumns(); - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/aggregator/MeasureAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/MeasureAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/MeasureAggregator.java deleted file mode 100644 index 515d307..0000000 --- a/core/src/main/java/org/carbondata/query/aggregator/MeasureAggregator.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.carbondata.query.aggregator; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.Serializable; -import java.math.BigDecimal; - -import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; - -/** - * Class Description : MeasureAggregator interface. It will be implemented by - * all the aggregator functions eg: sum, avg, max, min, etc, will be used for - * aggregate the measure value based on kind of aggregator - */ - -public interface MeasureAggregator extends Serializable, Comparable<MeasureAggregator> { - - /** - * Below method will be used to aggregate the Double value - * - * @param newVal - */ - void agg(double newVal); - - /** - * Below method will be used to aggregate the object value - * - * @param newVal - */ - void agg(Object newVal); - - /** - * Below method will be used to aggregate the value based on index - * - * @param newVal - * @param index - */ - void agg(MeasureColumnDataChunk newVal, int index); - - /** - * Get the Serialize byte array - * - * @return - */ - byte[] getByteArray(); - - /** - * This method will be used to set the new value - * - * @param newValue - */ - void setNewValue(Object newValue); - - /** - * This method return the object value of the MeasureAggregator - * - * @return aggregated value - */ - Object getValueObject(); - - /** - * This method return the object value of the MeasureAggregator - * - * @return aggregated value - */ - Double getDoubleValue(); - - /** - * This method return the object value of the MeasureAggregator - * - * @return aggregated value - */ - Long getLongValue(); - - BigDecimal getBigDecimalValue(); - - /** - * This method merge the aggregated value based on aggregator passed - * - * @param aggregator type of aggregator - */ - void merge(MeasureAggregator aggregator); - - /** - * Is first time. It means it was never used for aggregating any value. - * - * @return - */ - boolean isFirstTime(); - - /** - * it creates the new copy of MeasureAggregator - * - * @return MeasureAggregator - */ - MeasureAggregator getCopy(); - - /** - * Write the state of the class to buffer - */ - void writeData(DataOutput output) throws IOException; - - /** - * Read the state of the class and set to the object - */ - void readData(DataInput inPut) throws IOException; - - MeasureAggregator get(); - - /** - * Merge the byte arrays - * - * @param value - */ - void merge(byte[] value); - - /** - * Below method will be used to get the - * new instance - * - * @return new instance - */ - MeasureAggregator getNew(); -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractMeasureAggregatorBasic.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractMeasureAggregatorBasic.java b/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractMeasureAggregatorBasic.java deleted file mode 100644 index a4ce00b..0000000 --- a/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractMeasureAggregatorBasic.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.carbondata.query.aggregator.impl; - -import java.math.BigDecimal; - -import org.carbondata.query.aggregator.MeasureAggregator; - -/** - * AbstractMeasureAggregatorSum - * Used for custom Carbon Aggregator sum - */ -public abstract class AbstractMeasureAggregatorBasic implements MeasureAggregator { - /** - * serialVersionUID - */ - private static final long serialVersionUID = 1L; - - protected boolean firstTime = true; - - @Override public void agg(double newVal) { - } - - @Override public Double getDoubleValue() { - return null; - } - - @Override public Long getLongValue() { - return null; - } - - @Override public BigDecimal getBigDecimalValue() { - return null; - } - - @Override public boolean isFirstTime() { - return firstTime; - } - - @Override public MeasureAggregator get() { - return this; - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractMeasureAggregatorMaxMin.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractMeasureAggregatorMaxMin.java b/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractMeasureAggregatorMaxMin.java deleted file mode 100644 index 99a8ed9..0000000 --- a/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractMeasureAggregatorMaxMin.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.carbondata.query.aggregator.impl; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectInputStream; -import java.io.ObjectOutput; -import java.io.ObjectOutputStream; -import java.math.BigDecimal; - -import org.carbondata.common.logging.LogService; -import org.carbondata.common.logging.LogServiceFactory; -import org.carbondata.core.util.CarbonUtil; -import org.carbondata.query.aggregator.MeasureAggregator; -import org.carbondata.query.aggregator.impl.max.MaxAggregator; - -/** - * AbstractMeasureAggregatorMaxMin - * Used for custom Carbon Aggregator max min - */ -public abstract class AbstractMeasureAggregatorMaxMin implements MeasureAggregator { - private static final long serialVersionUID = 1L; - - private static final LogService LOGGER = - LogServiceFactory.getLogService(MaxAggregator.class.getName()); - - public Comparable<Object> aggVal; - - public boolean firstTime = true; - - protected abstract void internalAgg(Object value); - - @Override public void agg(double newVal) { - internalAgg((Double) newVal); - firstTime = false; - } - - @Override public Double getDoubleValue() { - return (Double) ((Object) aggVal); - } - - @Override public void agg(Object newVal) { - internalAgg(newVal); - firstTime = false; - } - - @Override public Long getLongValue() { - return (Long) ((Object) aggVal); - } - - @Override public BigDecimal getBigDecimalValue() { - return (BigDecimal) ((Object) aggVal); - } - - /** - * @see MeasureAggregator#setNewValue(Object) - */ - @Override public void setNewValue(Object newValue) { - } - - /** - * This method return the max value as an object - * a8 - * - * @return max value as an object - */ - @Override public Object getValueObject() { - return aggVal; - } - - @Override public boolean isFirstTime() { - return firstTime; - } - - @Override public MeasureAggregator get() { - return this; - - } - - public String toString() { - return aggVal + ""; - } - - @Override public int compareTo(MeasureAggregator msrAggr) { - @SuppressWarnings("unchecked") Comparable<Object> other = - (Comparable<Object>) msrAggr.getValueObject(); - - return aggVal.compareTo(other); - } - - @Override public void writeData(DataOutput dataOutput) throws IOException { - ByteArrayOutputStream bos = null; - ObjectOutput out = null; - - try { - dataOutput.writeBoolean(firstTime); - bos = new ByteArrayOutputStream(); - out = new ObjectOutputStream(bos); - out.writeObject(aggVal); - byte[] objectBytes = bos.toByteArray(); - dataOutput.write(objectBytes.length); - dataOutput.write(objectBytes, 0, objectBytes.length); - } catch (Exception e) { - LOGGER.error(e, - "Problem while getting byte array in maxMinAggregator: " + e.getMessage()); - } finally { - CarbonUtil.closeStreams(bos); - } - } - - @SuppressWarnings("unchecked") - @Override - public void readData(DataInput inPut) - throws IOException { - ByteArrayInputStream bis = null; - ObjectInput in = null; - try { - int length = inPut.readInt(); - firstTime = inPut.readBoolean(); - byte[] data = new byte[length]; - bis = new ByteArrayInputStream(data); - in = new ObjectInputStream(bis); - aggVal = (Comparable<Object>) in.readObject(); - } catch (Exception e) { - LOGGER.error(e, - "Problem while getting byte array in maxMinAggregator: " + e.getMessage()); - } finally { - CarbonUtil.closeStreams(bis); - } - } - - /** - * Below method will be used to get the value byte array - */ - @Override public byte[] getByteArray() { - byte[] objectBytes = new byte[0]; - if (firstTime) { - return objectBytes; - } - ObjectOutput out = null; - ByteArrayOutputStream bos = null; - try { - bos = new ByteArrayOutputStream(); - out = new ObjectOutputStream(bos); - out.writeObject(aggVal); - objectBytes = bos.toByteArray(); - } catch (IOException e) { - LOGGER.error(e, - "Problem while getting byte array in maxMinAggregator: " + e.getMessage()); - } finally { - CarbonUtil.closeStreams(bos); - } - return objectBytes; - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/aggregator/impl/BitSet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/BitSet.java b/core/src/main/java/org/carbondata/query/aggregator/impl/BitSet.java deleted file mode 100644 index 61fe179..0000000 --- a/core/src/main/java/org/carbondata/query/aggregator/impl/BitSet.java +++ /dev/null @@ -1,337 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.carbondata.query.aggregator.impl; - -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.util.Arrays; - -/** - * This class implements a vector of bits that grows as needed. Each component - * of the bit set has a {@code boolean} value. The bits of a {@code BitSet} are - * indexed by nonnegative integers. Individual indexed bits can be examined, - * set, or cleared. One {@code BitSet} may be used to modify the contents of - * another {@code BitSet} through logical AND, logical inclusive OR, and logical - * exclusive OR operations. - * By default, all bits in the set initially have the value {@code false}. - * Every bit set has a current size, which is the number of bits of space - * currently in use by the bit set. Note that the size is related to the - * implementation of a bit set, so it may change with implementation. The length - * of a bit set relates to logical length of a bit set and is defined - * independently of implementation. - * Unless otherwise noted, passing a null parameter to any of the methods in a - * {@code BitSet} will result in a {@code NullPointerException}. - * A {@code BitSet} is not safe for multithreaded use without external - * synchronization. - * - * @author Arthur van Hoff - * @author Michael McCloskey - * @author Martin Buchholz - * @since JDK1.0 - */ -public class BitSet implements java.io.Serializable { - /* use serialVersionUID from JDK 1.0.2 for interoperability */ - private static final long serialVersionUID = 7997698588986878753L; - - /* - * BitSets are packed into arrays of "words." Currently a word is a long, - * which consists of 64 bits, requiring 6 address bits. The choice of word - * size is determined purely by performance concerns. - */ - private static final int ADDRESS_BITS_PER_WORD = 6; - - private static final int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD; - - /* Used to shift left or right for a partial word mask */ - private static final long WORD_MASK = 0xffffffffffffffffL; - - /** - * The internal field corresponding to the serialField "bits". - */ - private long[] words; - - /** - * The number of words in the logical size of this BitSet. - */ - private transient int wordsInUse; - - /** - * Creates a new bit set. All bits are initially {@code false}. - */ - public BitSet() { - initWords(BITS_PER_WORD); - } - - /** - * Creates a bit set using words as the internal representation. The last - * word (if there is one) must be non-zero. - */ - private BitSet(long[] words) { - this.words = words; - this.wordsInUse = words.length; - checkInvariants(); - } - - /** - * Given a bit index, return word index containing it. - */ - private static int wordIndex(int bitIndex) { - return bitIndex >> ADDRESS_BITS_PER_WORD; - } - - /** - * Returns a new bit set containing all the bits in the given byte array. - * More precisely, <br> - * {@code BitSet.valueOf(bytes).get(n) == ((bytes[n/8] & (1<<(n%8))) != 0)} - * <br> - * for all {@code n < 8 * bytes.length}. - * This method is equivalent to - * {@code BitSet.valueOf(ByteBuffer.wrap(bytes))}. - * - * @param bytes a byte array containing a little-endian representation of a - * sequence of bits to be used as the initial bits of the new bit - * set - * @since 1.7 - */ - public static BitSet valueOf(byte[] bytes) { - return BitSet.valueOf(ByteBuffer.wrap(bytes)); - } - - /** - * Returns a new bit set containing all the bits in the given byte buffer - * between its position and limit. - * More precisely, <br> - * {@code BitSet.valueOf(bb).get(n) == ((bb.get(bb.position()+n/8) & (1<<(n%8))) != 0)} - * <br> - * for all {@code n < 8 * bb.remaining()}. - * The byte buffer is not modified by this method, and no reference to the - * buffer is retained by the bit set. - * - * @param bb a byte buffer containing a little-endian representation of a - * sequence of bits between its position and limit, to be used as - * the initial bits of the new bit set - * @since 1.7 - */ - public static BitSet valueOf(ByteBuffer bb) { - bb = bb.slice().order(ByteOrder.LITTLE_ENDIAN); - int n = 0;//CHECKSTYLE:OFF - for (n = bb.remaining(); ; n--) {//CHECKSTYLE:ON - if (n > 0 && bb.get(n - 1) == 0) { - continue; - } else { - break; - } - } - long[] words = new long[(n + 7) / 8]; - bb.limit(n); - int i = 0;//CHECKSTYLE:OFF - while (bb.remaining() >= 8) {//CHECKSTYLE:ON - words[i++] = bb.getLong(); - } - - int j = 0; - for (int remaining = bb.remaining(); j < remaining; j++) { - words[i] |= (bb.get() & 0xffL) << (8 * j); - } - return new BitSet(words); - } - - /** - * Every public method must preserve these invariants. - */ - private void checkInvariants() { - assert(wordsInUse == 0 || words[wordsInUse - 1] != 0); - assert(wordsInUse >= 0 && wordsInUse <= words.length); - assert(wordsInUse == words.length || words[wordsInUse] == 0); - } - - private void initWords(int nbits) { - words = new long[wordIndex(nbits - 1) + 1]; - } - - /** - * Returns a new byte array containing all the bits in this bit set. - * More precisely, if <br> - * {@code byte[] bytes = s.toByteArray();} <br> - * then {@code bytes.length == (s.length()+7)/8} and <br> - * {@code s.get(n) == ((bytes[n/8] & (1<<(n%8))) != 0)} <br> - * for all {@code n < 8 * bytes.length}. - * - * @return a byte array containing a little-endian representation of all the - * bits in this bit set - * @since 1.7 - */ - public byte[] toByteArray() { - int n = wordsInUse; - if (n == 0) { - return new byte[0]; - } - int len = 8 * (n - 1); - for (long x = words[n - 1]; x != 0; x >>>= 8) { - len++; - } - byte[] bytes = new byte[len]; - ByteBuffer bb = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN); - for (int i = 0; i < n - 1; i++) { - bb.putLong(words[i]); - } - for (long x = words[n - 1]; x != 0; x >>>= 8) { - bb.put((byte) (x & 0xff)); - } - return bytes; - } - - /** - * Ensures that the BitSet can hold enough words. - * - * @param wordsRequired the minimum acceptable number of words. - */ - private void ensureCapacity(int wordsRequired) { - if (words.length < wordsRequired) { - // Allocate larger of doubled size or required size - int request = Math.max(2 * words.length, wordsRequired); - words = Arrays.copyOf(words, request); - } - } - - /** - * Ensures that the BitSet can accommodate a given wordIndex, temporarily - * violating the invariants. The caller must restore the invariants before - * returning to the user, possibly using recalculateWordsInUse(). - * - * @param wordIndex the index to be accommodated. - */ - private void expandTo(int wordIndex) { - int wordsRequired = wordIndex + 1; - if (wordsInUse < wordsRequired) { - ensureCapacity(wordsRequired); - wordsInUse = wordsRequired; - } - } - - /** - * Sets the bit at the specified index to {@code true}. - * - * @param bitIndex a bit index - * @throws IndexOutOfBoundsException if the specified index is negative - * @since JDK1.0 - */ - public void set(int bitIndex) { - if (bitIndex < 0) { - throw new IndexOutOfBoundsException("bitIndex < 0: " + bitIndex); - } - int wordIndex = wordIndex(bitIndex); - expandTo(wordIndex); - - words[wordIndex] |= (1L << bitIndex); // Restores invariants - - checkInvariants(); - } - - /** - * Returns the index of the first bit that is set to {@code true} that - * occurs on or after the specified starting index. If no such bit exists - * then {@code -1} is returned. - * To iterate over the {@code true} bits in a {@code BitSet}, use the - * following loop: - * <pre> - * {@code - * for (int i = bs.nextSetBit(0); i >= 0; i = bs.nextSetBit(i+1)) { - * // operate on index i here - * }} - * </pre> - * - * @param fromIndex the index to start checking from (inclusive) - * @return the index of the next set bit, or {@code -1} if there is no such - * bit - * @throws IndexOutOfBoundsException if the specified index is negative - * @since 1.4 - */ - public int nextSetBit(int fromIndex) { - if (fromIndex < 0) { - throw new IndexOutOfBoundsException("fromIndex < 0: " + fromIndex); - } - checkInvariants(); - - int u = wordIndex(fromIndex); - if (u >= wordsInUse) { - return -1; - } - long word = words[u] & (WORD_MASK << fromIndex); - - while (true) { - if (word != 0) { - return (u * BITS_PER_WORD) + Long.numberOfTrailingZeros(word); - } - - u++; - if (u == wordsInUse) { - return -1; - } - word = words[u]; - } - } - - /** - * Returns the number of bits set to {@code true} in this {@code BitSet}. - * - * @return the number of bits set to {@code true} in this {@code BitSet} - * @since 1.4 - */ - public int cardinality() { - int sum = 0; - for (int i = 0; i < wordsInUse; i++) { - sum += Long.bitCount(words[i]); - } - return sum; - } - - /** - * Performs a logical <b>OR</b> of this bit set with the bit set argument. - * This bit set is modified so that a bit in it has the value {@code true} - * if and only if it either already had the value {@code true} or the - * corresponding bit in the bit set argument has the value {@code true}. - * - * @param set a bit set - */ - public void or(BitSet set) { - if (this == set) { - return; - } - int wordsInCommon = Math.min(wordsInUse, set.wordsInUse); - - if (wordsInUse < set.wordsInUse) { - ensureCapacity(set.wordsInUse); - wordsInUse = set.wordsInUse; - } - - // Perform logical OR on words in common - for (int i = 0; i < wordsInCommon; i++) { - words[i] |= set.words[i]; - } - // Copy any remaining words - if (wordsInCommon < set.wordsInUse) { - System.arraycopy(set.words, wordsInCommon, words, wordsInCommon, wordsInUse - wordsInCommon); - } - // recalculateWordsInUse() is unnecessary - checkInvariants(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/aggregator/impl/CustomAggregatorHelper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/CustomAggregatorHelper.java b/core/src/main/java/org/carbondata/query/aggregator/impl/CustomAggregatorHelper.java deleted file mode 100644 index 0e23df1..0000000 --- a/core/src/main/java/org/carbondata/query/aggregator/impl/CustomAggregatorHelper.java +++ /dev/null @@ -1,235 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.carbondata.query.aggregator.impl; - -import java.io.File; -import java.io.FileFilter; -import java.io.FileInputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.carbondata.common.logging.LogService; -import org.carbondata.common.logging.LogServiceFactory; -import org.carbondata.core.constants.CarbonCommonConstants; -import org.carbondata.core.util.CarbonProperties; -import org.carbondata.core.util.CarbonUtil; - -import org.apache.commons.codec.binary.Base64; - -public class CustomAggregatorHelper { - /** - * LOGGER - */ - private static final LogService LOGGER = - LogServiceFactory.getLogService(CustomAggregatorHelper.class.getName()); - - /** - * surrogateKeyMap - */ - private Map<String, Map<Integer, String>> surrogateKeyMap; - - /** - * loadFolderList - */ - private List<File> loadFolderList; - - public CustomAggregatorHelper() { - surrogateKeyMap = - new HashMap<String, Map<Integer, String>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - loadFolderList = new ArrayList<File>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - } - - /** - * Below method will be used to get the file list - * - * @param baseStorePath - * @param fileNameSearchPattern - * @return - */ - private static File[] getFilesArray(File baseStorePath, final String fileNameSearchPattern) { - File[] listFiles = baseStorePath.listFiles(new FileFilter() { - @Override public boolean accept(File pathname) { - if (pathname.getName().indexOf(fileNameSearchPattern) > -1) { - return true; - } - return false; - } - }); - return listFiles; - } - - /** - * Below method will be used to get the member - * - * @param tableName - * @param columnName - * @param key - * @param cubeName - * @param schemaName - * @return member - */ - public String getDimValue(String tableName, String columnName, int key, String cubeName, - String schemaName) { - Map<Integer, String> memberCache = surrogateKeyMap.get(tableName + '_' + columnName); - if (null == memberCache) { - loadLevelFile(tableName, columnName, cubeName, schemaName); - } - memberCache = surrogateKeyMap.get(tableName + '_' + columnName); - return memberCache.get(key); - } - - /** - * Below method will be used to fill the level cache - * - * @param tableName - * @param columnName - * @param cubeName - * @param schemaName - */ - private void loadLevelFile(String tableName, String columnName, String cubeName, - String schemaName) { - String baseLocation = CarbonUtil.getCarbonStorePath(schemaName, cubeName); - baseLocation = baseLocation + File.separator + schemaName + File.separator + cubeName; - if (loadFolderList.size() == 0) { - checkAndUpdateFolderList(baseLocation); - } - try { - File[] filesArray = null; - for (File loadFoler : loadFolderList) { - filesArray = getFilesArray(loadFoler, tableName + '_' + columnName); - for (int i = 0; i < filesArray.length; i++) { - readLevelFileAndUpdateCache(filesArray[i], tableName + '_' + columnName); - } - } - } catch (IOException e) { - LOGGER - .error("Problem while populating the cache"); - } - } - - /** - * Below method will be used to read the level files - * - * @param memberFile - * @param fileName - * @throws IOException - */ - private void readLevelFileAndUpdateCache(File memberFile, String fileName) throws IOException { - FileInputStream fos = null; - FileChannel fileChannel = null; - try { - // create an object of FileOutputStream - fos = new FileInputStream(memberFile); - - fileChannel = fos.getChannel(); - Map<Integer, String> memberMap = surrogateKeyMap.get(fileName); - - if (null == memberMap) { - memberMap = new HashMap<Integer, String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - surrogateKeyMap.put(fileName, memberMap); - } - - long size = fileChannel.size(); - int maxKey = 0; - ByteBuffer rowlengthToRead = null; - int len = 0; - ByteBuffer row = null; - int toread = 0; - byte[] bb = null; - String value = null; - int surrogateValue = 0; - - boolean enableEncoding = Boolean.valueOf(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.ENABLE_BASE64_ENCODING, - CarbonCommonConstants.ENABLE_BASE64_ENCODING_DEFAULT)); - - while (fileChannel.position() < size) { - rowlengthToRead = ByteBuffer.allocate(4); - fileChannel.read(rowlengthToRead); - rowlengthToRead.rewind(); - len = rowlengthToRead.getInt(); - if (len == 0) { - continue; - } - - row = ByteBuffer.allocate(len); - fileChannel.read(row); - row.rewind(); - toread = row.getInt(); - bb = new byte[toread]; - row.get(bb); - - if (enableEncoding) { - value = new String(Base64.decodeBase64(bb), Charset.defaultCharset()); - } else { - value = new String(bb, Charset.defaultCharset()); - } - - surrogateValue = row.getInt(); - memberMap.put(surrogateValue, value); - - // check if max key is less than Surrogate key then update the max key - if (maxKey < surrogateValue) { - maxKey = surrogateValue; - } - } - - } finally { - CarbonUtil.closeStreams(fileChannel, fos); - } - } - - /** - * This method recursively checks the folder with Load_ inside each and - * every RS_x/TableName/Load_x and add in the folder list the load folders. - * - * @param baseStorePath - * @return - */ - private File[] checkAndUpdateFolderList(String baseStorePath) { - File folders = new File(baseStorePath); - // - File[] rsFolders = folders.listFiles(new FileFilter() { - @Override public boolean accept(File pathname) { - boolean check = false; - check = pathname.isDirectory() - && pathname.getAbsolutePath().indexOf(CarbonCommonConstants.LOAD_FOLDER) > -1; - if (check) { - return true; - } else { - File[] checkFolder = checkAndUpdateFolderList(pathname.getAbsolutePath()); - if (null != checkFolder) { - for (File f : checkFolder) { - loadFolderList.add(f); - } - } - } - return false; - } - }); - return rsFolders; - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/aggregator/impl/avg/AbstractAvgAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/avg/AbstractAvgAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/avg/AbstractAvgAggregator.java deleted file mode 100644 index 24eff08..0000000 --- a/core/src/main/java/org/carbondata/query/aggregator/impl/avg/AbstractAvgAggregator.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.carbondata.query.aggregator.impl.avg; - -import org.carbondata.query.aggregator.impl.AbstractMeasureAggregatorBasic; - -public abstract class AbstractAvgAggregator extends AbstractMeasureAggregatorBasic { - - public abstract Object[] getAvgState(); - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/aggregator/impl/avg/AvgBigDecimalAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/avg/AvgBigDecimalAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/avg/AvgBigDecimalAggregator.java deleted file mode 100644 index c6ba55d..0000000 --- a/core/src/main/java/org/carbondata/query/aggregator/impl/avg/AvgBigDecimalAggregator.java +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.carbondata.query.aggregator.impl.avg; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.math.BigDecimal; -import java.nio.ByteBuffer; - -import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; -import org.carbondata.core.constants.CarbonCommonConstants; -import org.carbondata.core.util.DataTypeUtil; -import org.carbondata.query.aggregator.MeasureAggregator; - -public class AvgBigDecimalAggregator extends AbstractAvgAggregator { - - /** - * serialVersionUID - */ - private static final long serialVersionUID = 5463736686281089871L; - - /** - * total number of aggregate values - */ - protected double count; - - /** - * aggregate value - */ - protected BigDecimal aggVal; - - public AvgBigDecimalAggregator() { - aggVal = new BigDecimal(0); - } - - /** - * Average Aggregate function which will add all the aggregate values and it - * will increment the total count every time, for average value - * - * @param newVal new value - */ - @Override public void agg(Object newVal) { - if (newVal instanceof byte[]) { - ByteBuffer buffer = ByteBuffer.wrap((byte[]) newVal); - buffer.rewind(); - while (buffer.hasRemaining()) { - byte[] valueByte = new byte[buffer.getInt()]; - buffer.get(valueByte); - BigDecimal valueBigDecimal = DataTypeUtil.byteToBigDecimal(valueByte); - aggVal = aggVal.add(valueBigDecimal); - - count += buffer.getDouble(); - firstTime = false; - } - return; - } - - if (firstTime) { - aggVal = (BigDecimal) newVal; - firstTime = false; - } else { - aggVal = aggVal.add((BigDecimal) newVal); - } - count++; - } - - @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { - if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { - BigDecimal value = dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index); - aggVal = aggVal.add(value); - firstTime = false; - count++; - } - } - - @Override public Object[] getAvgState() { - return new Object[]{aggVal, count}; - } - - /** - * Below method will be used to get the value byte array - */ - @Override public byte[] getByteArray() { - if (firstTime) { - return new byte[0]; - } - byte[] bytes = DataTypeUtil.bigDecimalToByte(aggVal); - ByteBuffer allocate = - ByteBuffer.allocate(4 + bytes.length + CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE); - allocate.putInt(bytes.length); - allocate.put(bytes); - allocate.putDouble(count); - allocate.rewind(); - - return allocate.array(); - } - - /** - * Return the average of the aggregate values - * - * @return average aggregate value - */ - @Override public BigDecimal getBigDecimalValue() { - return aggVal.divide(new BigDecimal(count), 6); - } - - /** - * This method merge the aggregated value, in average aggregator it will add - * count and aggregate value - * - * @param aggregator Avg Aggregator - */ - @Override public void merge(MeasureAggregator aggregator) { - AvgBigDecimalAggregator avgAggregator = (AvgBigDecimalAggregator) aggregator; - if (!avgAggregator.isFirstTime()) { - aggVal = aggVal.add(avgAggregator.aggVal); - count += avgAggregator.count; - firstTime = false; - } - } - - /** - * This method return the average value as an object - * - * @return average value as an object - */ - @Override public Object getValueObject() { - return aggVal.divide(new BigDecimal(count)); - } - - /** - * @see MeasureAggregator#setNewValue(Object) - */ - @Override public void setNewValue(Object newValue) { - aggVal = (BigDecimal) newValue; - count = 1; - } - - @Override public void writeData(DataOutput output) throws IOException { - output.writeBoolean(firstTime); - output.writeUTF(aggVal.toString()); - output.writeDouble(count); - - } - - @Override public void readData(DataInput inPut) throws IOException { - firstTime = inPut.readBoolean(); - aggVal = new BigDecimal(inPut.readUTF()); - count = inPut.readDouble(); - } - - @Override public MeasureAggregator getCopy() { - AvgBigDecimalAggregator avg = new AvgBigDecimalAggregator(); - avg.aggVal = aggVal; - avg.count = count; - avg.firstTime = firstTime; - return avg; - } - - @Override public int compareTo(MeasureAggregator o) { - BigDecimal val = getBigDecimalValue(); - BigDecimal otherVal = o.getBigDecimalValue(); - - return val.compareTo(otherVal); - } - - @Override public boolean equals(Object obj) { - if (!(obj instanceof AvgBigDecimalAggregator)) { - return false; - } - AvgBigDecimalAggregator o = (AvgBigDecimalAggregator) obj; - return getBigDecimalValue().equals(o.getBigDecimalValue()); - } - - @Override public int hashCode() { - return getBigDecimalValue().hashCode(); - } - - @Override public void merge(byte[] value) { - if (0 == value.length) { - return; - } - ByteBuffer buffer = ByteBuffer.wrap(value); - - byte[] valueByte = new byte[buffer.getInt()]; - buffer.get(valueByte); - BigDecimal valueBigDecimal = DataTypeUtil.byteToBigDecimal(valueByte); - aggVal = aggVal.add(valueBigDecimal); - count += buffer.getDouble(); - firstTime = false; - } - - public String toString() { - return (aggVal.divide(new BigDecimal(count))) + ""; - } - - @Override public MeasureAggregator getNew() { - return new AvgBigDecimalAggregator(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/aggregator/impl/avg/AvgDoubleAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/avg/AvgDoubleAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/avg/AvgDoubleAggregator.java deleted file mode 100644 index bacff29..0000000 --- a/core/src/main/java/org/carbondata/query/aggregator/impl/avg/AvgDoubleAggregator.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.carbondata.query.aggregator.impl.avg; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; -import org.carbondata.core.constants.CarbonCommonConstants; -import org.carbondata.query.aggregator.MeasureAggregator; - -public class AvgDoubleAggregator extends AbstractAvgAggregator { - - /** - * serialVersionUID - */ - private static final long serialVersionUID = 5463736686281089871L; - - /** - * total number of aggregate values - */ - protected double count; - - /** - * aggregate value - */ - protected double aggVal; - - /** - * Average Aggregate function which will add all the aggregate values and it - * will increment the total count every time, for average value - * - * @param newVal new value - */ - @Override public void agg(double newVal) { - aggVal += newVal; - count++; - firstTime = false; - } - - /** - * Average Aggregate function which will add all the aggregate values and it - * will increment the total count every time, for average value - * - * @param newVal new value - */ - @Override public void agg(Object newVal) { - if (newVal instanceof byte[]) { - ByteBuffer buffer = ByteBuffer.wrap((byte[]) newVal); - buffer.rewind(); - while (buffer.hasRemaining()) { - aggVal += buffer.getDouble(); - count += buffer.getDouble(); - firstTime = false; - } - return; - } - aggVal += ((Number) newVal).doubleValue(); - count++; - firstTime = false; - } - - @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { - if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { - aggVal += dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index); - count++; - firstTime = false; - } - } - - /** - * Below method will be used to get the value byte array - */ - @Override public byte[] getByteArray() { - if (firstTime) { - return new byte[0]; - } - ByteBuffer buffer = ByteBuffer.allocate(2 * CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE); - buffer.putDouble(aggVal); - buffer.putDouble(count); - return buffer.array(); - } - - /** - * Return the average of the aggregate values - * - * @return average aggregate value - */ - @Override public Double getDoubleValue() { - return aggVal / count; - } - - @Override public Object[] getAvgState() { - return new Object[]{aggVal, count}; - } - - /** - * This method merge the aggregated value, in average aggregator it will add - * count and aggregate value - * - * @param aggregator Avg Aggregator - */ - @Override public void merge(MeasureAggregator aggregator) { - AvgDoubleAggregator avgAggregator = (AvgDoubleAggregator) aggregator; - if (!avgAggregator.isFirstTime()) { - aggVal += avgAggregator.aggVal; - count += avgAggregator.count; - firstTime = false; - } - } - - /** - * This method return the average value as an object - * - * @return average value as an object - */ - @Override public Object getValueObject() { - return aggVal / count; - } - - /** - * @see MeasureAggregator#setNewValue(Object) - */ - @Override public void setNewValue(Object newValue) { - aggVal = (Double) newValue; - count = 1; - } - - @Override public void writeData(DataOutput output) throws IOException { - output.writeBoolean(firstTime); - output.writeDouble(aggVal); - output.writeDouble(count); - - } - - @Override public void readData(DataInput inPut) throws IOException { - firstTime = inPut.readBoolean(); - aggVal = inPut.readDouble(); - count = inPut.readDouble(); - } - - @Override public MeasureAggregator getCopy() { - AvgDoubleAggregator avg = new AvgDoubleAggregator(); - avg.aggVal = aggVal; - avg.count = count; - avg.firstTime = firstTime; - return avg; - } - - @Override public int compareTo(MeasureAggregator o) { - double val = getDoubleValue(); - double otherVal = o.getDoubleValue(); - if (val > otherVal) { - return 1; - } - if (val < otherVal) { - return -1; - } - return 0; - } - - @Override public boolean equals(Object obj) { - if(!(obj instanceof AvgDoubleAggregator)) { - return false; - } - AvgDoubleAggregator o = (AvgDoubleAggregator)obj; - return getDoubleValue().equals(o.getDoubleValue()); - } - - @Override public int hashCode() { - return getDoubleValue().hashCode(); - } - - @Override public void merge(byte[] value) { - if (0 == value.length) { - return; - } - ByteBuffer buffer = ByteBuffer.wrap(value); - aggVal += buffer.getDouble(); - count += buffer.getDouble(); - firstTime = false; - } - - public String toString() { - return (aggVal / count) + ""; - } - - @Override public MeasureAggregator getNew() { - return new AvgDoubleAggregator(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/aggregator/impl/avg/AvgLongAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/avg/AvgLongAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/avg/AvgLongAggregator.java deleted file mode 100644 index 6290de6..0000000 --- a/core/src/main/java/org/carbondata/query/aggregator/impl/avg/AvgLongAggregator.java +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.carbondata.query.aggregator.impl.avg; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; -import org.carbondata.core.constants.CarbonCommonConstants; -import org.carbondata.query.aggregator.MeasureAggregator; - -public class AvgLongAggregator extends AbstractAvgAggregator { - - /** - * serialVersionUID - */ - private static final long serialVersionUID = 5463736686281089871L; - - /** - * total number of aggregate values - */ - protected double count; - - /** - * aggregate value - */ - protected long aggVal; - - /** - * Average Aggregate function which will add all the aggregate values and it - * will increment the total count every time, for average value - * - * @param newVal new value - */ - @Override public void agg(Object newVal) { - if (newVal instanceof byte[]) { - ByteBuffer buffer = ByteBuffer.wrap((byte[]) newVal); - buffer.rewind(); - while (buffer.hasRemaining()) { - aggVal += buffer.getLong(); - count += buffer.getDouble(); - firstTime = false; - } - return; - } - aggVal += (Long) newVal; - count++; - firstTime = false; - } - - @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { - if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { - aggVal += dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index); - count++; - firstTime = false; - } - } - - @Override public Object[] getAvgState() { - return new Object[]{aggVal, count}; - } - - /** - * Below method will be used to get the value byte array - */ - @Override public byte[] getByteArray() { - if (firstTime) { - return new byte[0]; - } - ByteBuffer buffer = ByteBuffer.allocate( - CarbonCommonConstants.LONG_SIZE_IN_BYTE + CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE); - buffer.putLong(aggVal); - buffer.putDouble(count); - return buffer.array(); - } - - @Override public Long getLongValue() { - return aggVal / (long) count; - } - - /** - * This method merge the aggregated value, in average aggregator it will add - * count and aggregate value - * - * @param aggregator Avg Aggregator - */ - @Override public void merge(MeasureAggregator aggregator) { - AvgLongAggregator avgAggregator = (AvgLongAggregator) aggregator; - if (!avgAggregator.isFirstTime()) { - aggVal += avgAggregator.aggVal; - count += avgAggregator.count; - firstTime = false; - } - } - - /** - * This method return the average value as an object - * - * @return average value as an object - */ - @Override public Object getValueObject() { - return aggVal / count; - } - - /** - * @see MeasureAggregator#setNewValue(Object) - */ - @Override public void setNewValue(Object newValue) { - aggVal = (Long) newValue; - count = 1; - } - - @Override public void writeData(DataOutput output) throws IOException { - output.writeBoolean(firstTime); - output.writeLong(aggVal); - output.writeDouble(count); - - } - - @Override public void readData(DataInput inPut) throws IOException { - firstTime = inPut.readBoolean(); - aggVal = inPut.readLong(); - count = inPut.readDouble(); - } - - @Override public MeasureAggregator getCopy() { - AvgLongAggregator avg = new AvgLongAggregator(); - avg.aggVal = aggVal; - avg.count = count; - avg.firstTime = firstTime; - return avg; - } - - @Override public int compareTo(MeasureAggregator o) { - long val = getLongValue(); - long otherVal = o.getLongValue(); - if (val > otherVal) { - return 1; - } else if (val < otherVal) { - return -1; - } else { - return 0; - } - } - - @Override public boolean equals(Object obj) { - if(!(obj instanceof AvgLongAggregator)) { - return false; - } - AvgLongAggregator o = (AvgLongAggregator)obj; - return getLongValue().equals(o.getLongValue()); - } - - @Override public int hashCode() { - return getLongValue().hashCode(); - } - - @Override public void merge(byte[] value) { - if (0 == value.length) { - return; - } - ByteBuffer buffer = ByteBuffer.wrap(value); - aggVal += buffer.getLong(); - count += buffer.getDouble(); - firstTime = false; - } - - public String toString() { - return (aggVal / count) + ""; - } - - @Override public MeasureAggregator getNew() { - // TODO Auto-generated method stub - return new AvgLongAggregator(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/aggregator/impl/count/CountAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/count/CountAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/count/CountAggregator.java deleted file mode 100644 index 352e7aa..0000000 --- a/core/src/main/java/org/carbondata/query/aggregator/impl/count/CountAggregator.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.carbondata.query.aggregator.impl.count; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.math.BigDecimal; -import java.nio.ByteBuffer; - -import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; -import org.carbondata.core.constants.CarbonCommonConstants; -import org.carbondata.query.aggregator.MeasureAggregator; - -/** - * Class Description : It will return total count of values - */ -public class CountAggregator implements MeasureAggregator { - - /** - * serialVersionUID - */ - private static final long serialVersionUID = 2678878935295306313L; - - /** - * aggregate value - */ - private double aggVal; - - /** - * Count Aggregate function which update the total count - * - * @param newVal new value - */ - @Override public void agg(double newVal) { - aggVal++; - } - - /** - * Count Aggregate function which update the total count - * - * @param newVal new value - */ - @Override public void agg(Object newVal) { - aggVal++; - } - - @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { - if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { - aggVal++; - } - } - - /** - * Below method will be used to get the value byte array - */ - @Override public byte[] getByteArray() { - ByteBuffer buffer = ByteBuffer.allocate(CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE); - buffer.putDouble(aggVal); - return buffer.array(); - } - - /** - * Returns the total count - * - * @return total count - */ - @Override public Double getDoubleValue() { - return aggVal; - } - - @Override public Long getLongValue() { - return (long) aggVal; - } - - @Override public BigDecimal getBigDecimalValue() { - return new BigDecimal(aggVal); - } - - /** - * Merge the total count with the aggregator - * - * @param aggregator count aggregator - */ - @Override public void merge(MeasureAggregator aggregator) { - CountAggregator countAggregator = (CountAggregator) aggregator; - aggVal += countAggregator.aggVal; - } - - /** - * Overloaded Aggregate function will be used for Aggregate tables because - * aggregate table will have fact_count as a measure. It will update the - * total count - * - * @param newVal - * new value - * @param factCount - * total fact count - * - */ - // @Override - // public void agg(double newVal, double factCount) - // { - // agg(newVal, null, 0, 0); - // } - - /** - * This method return the count value as an object - * - * @return count value as an object - */ - - @Override public Object getValueObject() { - return aggVal; - } - - /** - * @see MeasureAggregator#setNewValue(Object) - */ - @Override public void setNewValue(Object newValue) { - aggVal += Double.parseDouble(String.valueOf(newValue)); - } - - @Override public boolean isFirstTime() { - return false; - } - - @Override public void writeData(DataOutput output) throws IOException { - output.writeDouble(aggVal); - - } - - @Override public void readData(DataInput inPut) throws IOException { - aggVal = inPut.readDouble(); - } - - @Override public MeasureAggregator getCopy() { - CountAggregator aggregator = new CountAggregator(); - aggregator.aggVal = aggVal; - return aggregator; - } - - @Override public void merge(byte[] value) { - if (0 == value.length) { - return; - } - ByteBuffer buffer = ByteBuffer.wrap(value); - aggVal += buffer.getDouble(); - } - - @Override public int compareTo(MeasureAggregator obj) { - double val = getDoubleValue(); - double otherVal = obj.getDoubleValue(); - if (val > otherVal) { - return 1; - } - if (val < otherVal) { - return -1; - } - return 0; - } - - @Override public boolean equals(Object obj) { - if(!(obj instanceof CountAggregator)) { - return false; - } - CountAggregator o = (CountAggregator)obj; - return getDoubleValue().equals(o.getDoubleValue()); - } - - @Override public int hashCode() { - return getDoubleValue().hashCode(); - } - - @Override public MeasureAggregator get() { - return this; - } - - public String toString() { - return aggVal + ""; - } - - @Override public MeasureAggregator getNew() { - return new CountAggregator(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/AbstractDistinctCountAggregatorObjectSet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/AbstractDistinctCountAggregatorObjectSet.java b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/AbstractDistinctCountAggregatorObjectSet.java deleted file mode 100644 index 0629007..0000000 --- a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/AbstractDistinctCountAggregatorObjectSet.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.carbondata.query.aggregator.impl.distinct; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.math.BigDecimal; -import java.util.HashSet; -import java.util.Set; - -import org.carbondata.core.constants.CarbonCommonConstants; -import org.carbondata.query.aggregator.MeasureAggregator; - -public abstract class AbstractDistinctCountAggregatorObjectSet implements MeasureAggregator { - - private static final long serialVersionUID = 6313463368629960186L; - - protected Set<Object> valueSetForObj; - - public AbstractDistinctCountAggregatorObjectSet() { - valueSetForObj = new HashSet<Object>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - } - - /** - * just need to add the unique values to agg set - */ - @Override public void agg(double newVal) { - } - - /** - * Distinct count Aggregate function which update the Distinct count - * - * @param newVal new value - */ - @Override public void agg(Object newVal) { - valueSetForObj.add(newVal); - } - - /** - * Below method will be used to get the value byte array - */ - @Override public byte[] getByteArray() { - return null; - } - - @Override public Double getDoubleValue() { - return (double) valueSetForObj.size(); - } - - @Override public Long getLongValue() { - return (long) valueSetForObj.size(); - } - - @Override public BigDecimal getBigDecimalValue() { - return new BigDecimal(valueSetForObj.size()); - } - - @Override public Object getValueObject() { - return valueSetForObj.size(); - } - - @Override public void setNewValue(Object newValue) { - valueSetForObj.add(newValue); - } - - @Override public boolean isFirstTime() { - return false; - } - - @Override public void writeData(DataOutput output) throws IOException { - - } - - @Override public void readData(DataInput inPut) throws IOException { - - } - - public String toString() { - return valueSetForObj.size() + ""; - } - - @Override public void merge(byte[] value) { - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregator.java deleted file mode 100644 index 1b2b33d..0000000 --- a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregator.java +++ /dev/null @@ -1,319 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.carbondata.query.aggregator.impl.distinct; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.IOException; -import java.math.BigDecimal; -import java.nio.ByteBuffer; - -import org.carbondata.common.logging.LogService; -import org.carbondata.common.logging.LogServiceFactory; -import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; -import org.carbondata.query.aggregator.MeasureAggregator; - -import org.roaringbitmap.IntIterator; -import org.roaringbitmap.RoaringBitmap; - -/** - * * The distinct count aggregator - * Ex: - * ID NAME Sales - * <p>1 a 200 - * <p>2 a 100 - * <p>3 a 200 - * select count(distinct sales) # would result 2 - * select count(sales) # would result 3 - */ -public class DistinctCountAggregator implements MeasureAggregator { - - private static final LogService LOGGER = - LogServiceFactory.getLogService(DistinctCountAggregator.class.getName()); - /** - * - */ - private static final long serialVersionUID = 6313463368629960186L; - /** - * For Spark CARBON to avoid heavy object transfer it better to flatten - * the Aggregators. There is no aggregation expected after setting this value. - */ - private Double computedFixedValue; - /** - * - */ - // private Set<Double> valueSet; - private RoaringBitmap valueSet; - - private byte[] data; - - private double minValue; - - public DistinctCountAggregator(Object minValue) { - valueSet = new RoaringBitmap(); - if (minValue instanceof BigDecimal) { - this.minValue = ((BigDecimal) minValue).doubleValue(); - } else if (minValue instanceof Long) { - this.minValue = ((Long) minValue).doubleValue(); - } else { - this.minValue = (Double) minValue; - } - } - - public DistinctCountAggregator() { - valueSet = new RoaringBitmap(); - } - - /** - * just need to add the unique values to agg set - */ - @Override public void agg(double newVal) { - valueSet.add((int) (newVal - minValue)); - } - - /** - * Distinct count Aggregate function which update the Distinct count - * - * @param newVal new value - */ - @Override public void agg(Object newVal) { - if (newVal instanceof byte[]) { - byte[] values = (byte[]) newVal; - ByteBuffer buffer = ByteBuffer.wrap(values); - buffer.rewind(); - while (buffer.hasRemaining()) { - valueSet.add(buffer.getInt()); - } - return; - } else { - double value = new Double(newVal.toString()); - agg(value); - } - } - - @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { - if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { - valueSet.add((int) dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index)); - } - } - - /** - * Below method will be used to get the value byte array - */ - @Override public byte[] getByteArray() { - if (valueSet.getCardinality() == 0) { - return new byte[0]; - } - IntIterator iterator = valueSet.getIntIterator(); - ByteBuffer buffer = ByteBuffer.allocate(valueSet.getCardinality() * 4 + 8); - buffer.putDouble(minValue); - while (iterator.hasNext()) { - buffer.putInt(iterator.next()); - } - buffer.rewind(); - return buffer.array(); - } - - private void agg(RoaringBitmap set2, double minValue) { - if (this.minValue == minValue) { - valueSet.or(set2); - } else { - if (this.minValue > minValue) { - IntIterator intIterator = valueSet.getIntIterator(); - while (intIterator.hasNext()) { - set2.add((int) ((double) (intIterator.next() + this.minValue) - minValue)); - } - this.minValue = minValue; - this.valueSet = set2; - } else { - IntIterator intIterator = set2.getIntIterator(); - while (intIterator.hasNext()) { - valueSet.add((int) ((double) (intIterator.next() + minValue) - this.minValue)); - } - } - } - } - - /** - * merge the valueset so that we get the count of unique values - */ - @Override public void merge(MeasureAggregator aggregator) { - DistinctCountAggregator distinctCountAggregator = (DistinctCountAggregator) aggregator; - readData(); - distinctCountAggregator.readData(); - if (distinctCountAggregator.valueSet != null) { - agg(distinctCountAggregator.valueSet, distinctCountAggregator.minValue); - } - } - - @Override public Double getDoubleValue() { - if (computedFixedValue == null) { - readData(); - return (double) valueSet.getCardinality(); - } - return computedFixedValue; - } - - @Override public Long getLongValue() { - if (computedFixedValue == null) { - readData(); - return (long) valueSet.getCardinality(); - } - return computedFixedValue.longValue(); - } - - @Override public BigDecimal getBigDecimalValue() { - if (computedFixedValue == null) { - readData(); - return new BigDecimal(valueSet.getCardinality()); - } - return new BigDecimal(computedFixedValue); - } - - @Override public Object getValueObject() { - return valueSet.getCardinality(); - } - - @Override public void setNewValue(Object newValue) { - computedFixedValue = (Double) newValue; - valueSet = null; - } - - @Override public boolean isFirstTime() { - return false; - } - - @Override public void writeData(DataOutput output) throws IOException { - - if (computedFixedValue != null) { - ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 8); - byteBuffer.putInt(-1); - byteBuffer.putDouble(computedFixedValue); - byteBuffer.flip(); - output.write(byteBuffer.array()); - } else { - if (valueSet != null) { - valueSet.serialize(output); - } else { - output.write(data); - } - } - } - - @Override public void readData(DataInput inPut) throws IOException { - valueSet = new RoaringBitmap(); - valueSet.deserialize(inPut); - } - - private void readData() { - if (data != null && (valueSet == null || valueSet.isEmpty())) { - ByteArrayInputStream stream = new ByteArrayInputStream(data); - DataInputStream outputStream = new DataInputStream(stream); - try { - readData(outputStream); - outputStream.close(); - data = null; - } catch (IOException e) { - LOGGER.error(e, e.getMessage()); - } - } - } - - @Override public MeasureAggregator getCopy() { - DistinctCountAggregator aggr = new DistinctCountAggregator(minValue); - aggr.valueSet = valueSet.clone(); - return aggr; - } - - @Override public int compareTo(MeasureAggregator measureAggr) { - double compFixedVal = getDoubleValue(); - double otherVal = measureAggr.getDoubleValue(); - if (compFixedVal > otherVal) { - return 1; - } - if (compFixedVal < otherVal) { - return -1; - } - return 0; - } - - @Override public boolean equals(Object obj) { - if(!(obj instanceof DistinctCountAggregator)) { - return false; - } - DistinctCountAggregator o = (DistinctCountAggregator) obj; - return getDoubleValue().equals(o.getDoubleValue()); - } - - @Override public int hashCode() { - return getDoubleValue().hashCode(); - } - - @Override public MeasureAggregator get() { - ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); - DataOutputStream outputStream = new DataOutputStream(byteStream); - try { - writeData(outputStream); - } catch (IOException ex) { - LOGGER.error(ex, ex.getMessage()); - } - data = byteStream.toByteArray(); - valueSet = null; - return this; - } - - public String toString() { - if (computedFixedValue == null) { - readData(); - return valueSet.getCardinality() + ""; - } - return computedFixedValue + ""; - } - - public RoaringBitmap getBitMap() { - return valueSet; - } - - public double getMinValue() { - return minValue; - } - - @Override public void merge(byte[] value) { - if (0 == value.length) { - return; - } - ByteBuffer buffer = ByteBuffer.wrap(value); - buffer.rewind(); - double currentMinValue = buffer.getDouble(); - while (buffer.hasRemaining()) { - agg(buffer.getInt() + currentMinValue); - } - } - - @Override public MeasureAggregator getNew() { - // TODO Auto-generated method stub - return new DistinctCountAggregator(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregatorObjectSet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregatorObjectSet.java b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregatorObjectSet.java deleted file mode 100644 index 3b26e53..0000000 --- a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregatorObjectSet.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.carbondata.query.aggregator.impl.distinct; - -import java.util.HashSet; -import java.util.Set; - -import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; -import org.carbondata.query.aggregator.MeasureAggregator; - -public class DistinctCountAggregatorObjectSet extends AbstractDistinctCountAggregatorObjectSet { - - private static final long serialVersionUID = 6313463368629960186L; - - /** - * just need to add the unique values to agg set - */ - @Override public void agg(double newVal) { - valueSetForObj.add(newVal); - } - - @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { - if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { - valueSetForObj.add(dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index)); - } - } - - private void agg(Set<Object> set2) { - valueSetForObj.addAll(set2); - } - - /** - * merge the valueset so that we get the count of unique values - */ - @Override public void merge(MeasureAggregator aggregator) { - DistinctCountAggregatorObjectSet distinctCountAggregator = - (DistinctCountAggregatorObjectSet) aggregator; - agg(distinctCountAggregator.valueSetForObj); - } - - @Override public MeasureAggregator getCopy() { - DistinctCountAggregatorObjectSet aggregator = new DistinctCountAggregatorObjectSet(); - aggregator.valueSetForObj = new HashSet<Object>(valueSetForObj); - return aggregator; - } - - @Override public int compareTo(MeasureAggregator measureAggr) { - double valueSetForObjSize = getDoubleValue(); - double otherVal = measureAggr.getDoubleValue(); - if (valueSetForObjSize > otherVal) { - return 1; - } - if (valueSetForObjSize < otherVal) { - return -1; - } - return 0; - } - - @Override public boolean equals(Object obj) { - if (!(obj instanceof DistinctCountAggregatorObjectSet)) { - return false; - } - DistinctCountAggregatorObjectSet o = (DistinctCountAggregatorObjectSet) obj; - return getDoubleValue().equals(o.getDoubleValue()); - } - - @Override public int hashCode() { - return getDoubleValue().hashCode(); - } - - @Override public MeasureAggregator get() { - return this; - } - - @Override public MeasureAggregator getNew() { - return new DistinctCountAggregatorObjectSet(); - } - -}