[Issue 618]Supported Spark 1.6 in Carbondata (#670) * [Issue-660] Show segments query should not fail, if table name is case insensitive (#662)
* Show segments should not fail, if table name is case insensitive * Corrected test case * [issue-656] fix load data when int column contains integer.min_value (#657) * load data when int column contains min Integer * fixed test case * fix test bigint * fix test bigint * removed no used DATA_BIGINT case * removed no used condition for unCompressMaxMin * [issue- 664] select count(joinDate) from table_x is failing for direct dictionary column (#665) * Supported Spark 1.6 by changing aggregation interfaces * Fixed compile issue after rebase * optmizing the flow with unsafe row * Fixed bugs in push up * Fixed compiler issues after rebasing * Fixed merging issue after rebase * Fixed scan query pushdown Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/ead0076b Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/ead0076b Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/ead0076b Branch: refs/heads/master Commit: ead0076b077873a776e0e02273f587500ee713e7 Parents: afb6ed4 Author: Ravindra Pesala <ravi.pes...@gmail.com> Authored: Fri Jun 17 05:52:03 2016 +0530 Committer: Jacky Li <jacky.li...@huawei.com> Committed: Fri Jun 17 08:22:03 2016 +0800 ---------------------------------------------------------------------- core/pom.xml | 33 - .../core/carbon/metadata/CarbonMetadata.java | 19 +- .../compression/ValueCompressonHolder.java | 1 + .../TimeStampDirectDictionaryGenerator.java | 4 +- .../core/util/ValueCompressionUtil.java | 53 +- ...bstractDistinctCountAggregatorObjectSet.java | 103 --- .../aggregator/impl/AbstractMaxAggregator.java | 60 -- .../impl/AbstractMeasureAggregatorDummy.java | 69 -- .../impl/AbstractMeasureAggregatorMaxMin.java | 5 +- .../aggregator/impl/AbstractMinAggregator.java | 60 -- .../impl/AvgBigDecimalAggregator.java | 214 ----- .../aggregator/impl/AvgDoubleAggregator.java | 206 ----- .../aggregator/impl/AvgLongAggregator.java | 191 ----- .../query/aggregator/impl/CountAggregator.java | 204 ----- .../impl/DistinctCountAggregator.java | 319 -------- .../impl/DistinctCountAggregatorObjectSet.java | 96 --- ...tinctCountBigDecimalAggregatorObjectSet.java | 86 -- .../DistinctCountLongAggregatorObjectSet.java | 89 -- .../impl/DistinctStringCountAggregator.java | 165 ---- .../impl/DummyBigDecimalAggregator.java | 63 -- .../aggregator/impl/DummyDoubleAggregator.java | 64 -- .../aggregator/impl/DummyLongAggregator.java | 59 -- .../query/aggregator/impl/MaxAggregator.java | 68 -- .../impl/MaxBigDecimalAggregator.java | 67 -- .../aggregator/impl/MaxLongAggregator.java | 67 -- .../query/aggregator/impl/MinAggregator.java | 66 -- .../impl/MinBigDecimalAggregator.java | 66 -- .../aggregator/impl/MinLongAggregator.java | 66 -- .../impl/SumBigDecimalAggregator.java | 178 ---- .../impl/SumDistinctBigDecimalAggregator.java | 232 ------ .../impl/SumDistinctDoubleAggregator.java | 230 ------ .../impl/SumDistinctLongAggregator.java | 220 ----- .../aggregator/impl/SumDoubleAggregator.java | 178 ---- .../aggregator/impl/SumLongAggregator.java | 164 ---- .../impl/avg/AbstractAvgAggregator.java | 28 + .../impl/avg/AvgBigDecimalAggregator.java | 218 +++++ .../impl/avg/AvgDoubleAggregator.java | 210 +++++ .../aggregator/impl/avg/AvgLongAggregator.java | 195 +++++ .../aggregator/impl/count/CountAggregator.java | 204 +++++ ...bstractDistinctCountAggregatorObjectSet.java | 103 +++ .../impl/distinct/DistinctCountAggregator.java | 319 ++++++++ .../DistinctCountAggregatorObjectSet.java | 96 +++ ...tinctCountBigDecimalAggregatorObjectSet.java | 86 ++ .../DistinctCountLongAggregatorObjectSet.java | 89 ++ .../distinct/DistinctStringCountAggregator.java | 165 ++++ .../SumDistinctBigDecimalAggregator.java | 233 ++++++ .../distinct/SumDistinctDoubleAggregator.java | 231 ++++++ .../distinct/SumDistinctLongAggregator.java | 221 +++++ .../dummy/AbstractMeasureAggregatorDummy.java | 70 ++ .../impl/dummy/DummyBigDecimalAggregator.java | 63 ++ .../impl/dummy/DummyDoubleAggregator.java | 64 ++ .../impl/dummy/DummyLongAggregator.java | 59 ++ .../impl/max/AbstractMaxAggregator.java | 61 ++ .../aggregator/impl/max/MaxAggregator.java | 68 ++ .../impl/max/MaxBigDecimalAggregator.java | 67 ++ .../aggregator/impl/max/MaxLongAggregator.java | 67 ++ .../impl/min/AbstractMinAggregator.java | 61 ++ .../aggregator/impl/min/MinAggregator.java | 66 ++ .../impl/min/MinBigDecimalAggregator.java | 66 ++ .../aggregator/impl/min/MinLongAggregator.java | 66 ++ .../impl/sum/SumBigDecimalAggregator.java | 179 ++++ .../impl/sum/SumDoubleAggregator.java | 179 ++++ .../aggregator/impl/sum/SumLongAggregator.java | 165 ++++ .../util/MeasureAggregatorFactory.java | 44 +- .../DirectDictionaryDimensionAggregator.java | 114 +++ .../impl/InternalCountStartQueryExecutor.java | 2 +- .../query/carbon/executor/util/QueryUtil.java | 9 +- .../impl/QueryResultPreparatorImpl.java | 6 +- .../examples/GenerateDictionaryExample.scala | 2 +- .../spark/sql/common/util/QueryTest.scala | 7 +- .../allqueries/AllDataTypesTestCase1.scala | 31 +- .../allqueries/AllDataTypesTestCase2.scala | 43 +- .../allqueries/AllDataTypesTestCase6.scala | 14 +- integration/spark/pom.xml | 11 - .../org/apache/spark/sql/CarbonAggregate.scala | 209 ----- .../spark/sql/CarbonCatalystOperators.scala | 289 +++---- .../org/apache/spark/sql/CarbonContext.scala | 3 +- .../spark/sql/CarbonDatasourceRelation.scala | 11 +- .../spark/sql/CarbonDictionaryDecoder.scala | 15 +- .../org/apache/spark/sql/CarbonOperators.scala | 270 ++++--- .../apache/spark/sql/CarbonRawAggregate.scala | 239 ------ .../apache/spark/sql/CarbonRawOperators.scala | 55 +- .../org/apache/spark/sql/CarbonSqlParser.scala | 6 +- .../sql/SparkUnknownCarbonAggregator.scala | 179 ---- .../sql/agg/CarbonAggregationExpression.scala | 50 ++ .../apache/spark/sql/agg/CarbonAverage.scala | 89 ++ .../org/apache/spark/sql/agg/CarbonCount.scala | 77 ++ .../execution/command/carbonTableSchema.scala | 254 +++--- .../spark/sql/hive/CarbonMetastoreCatalog.scala | 134 ++- .../spark/sql/hive/CarbonRawStrategies.scala | 78 +- .../spark/sql/hive/CarbonStrategies.scala | 171 ++-- .../apache/spark/sql/hive/CarbonStrategy.scala | 2 +- .../spark/sql/optimizer/CarbonOptimizer.scala | 27 +- .../org/carbondata/spark/CarbonFilters.scala | 26 +- .../org/carbondata/spark/CarbonOption.scala | 2 +- .../scala/org/carbondata/spark/KeyVal.scala | 8 + .../carbondata/spark/agg/CarbonAggregates.scala | 807 ------------------- .../spark/agg/MeasureAggregatorUDT.scala | 55 -- .../spark/rdd/CarbonDataRDDFactory.scala | 56 +- .../spark/rdd/CarbonRawQueryRDD.scala | 12 +- .../carbondata/spark/util/CarbonScalaUtil.scala | 20 +- .../spark/util/GlobalDictionaryUtil.scala | 2 +- .../src/test/resources/datawithmaxinteger.csv | 12 + .../test/resources/datawithmaxmininteger.csv | 13 + .../src/test/resources/datawithmininteger.csv | 12 + .../spark/sql/common/util/QueryTest.scala | 8 +- .../TestLoadDataWithMaxMinInteger.scala | 98 +++ .../AllDataTypesTestCaseAggregate.scala | 7 +- .../DataCompactionNoDictionaryTest.scala | 56 +- .../dataretention/DataRetentionTestCase.scala | 13 +- .../deleteTable/TestDeleteTableNewDDL.scala | 23 + ...estampDataTypeDirectDictionaryTestCase.scala | 6 + .../AutoHighCardinalityIdentifyTestCase.scala | 2 +- .../util/GlobalDictionaryUtilTestCase.scala | 8 +- pom.xml | 2 +- processing/pom.xml | 11 - .../store/CarbonDataFileAttributes.java | 4 +- 117 files changed, 5119 insertions(+), 6049 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/pom.xml ---------------------------------------------------------------------- diff --git a/core/pom.xml b/core/pom.xml index 6140353..bfb13a6 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -60,39 +60,6 @@ <version>${kettle.version}</version> </dependency> <dependency> - <groupId>eigenbase</groupId> - <artifactId>eigenbase-xom</artifactId> - <version>1.3.4</version> - <exclusions> - <exclusion> - <groupId>*</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>eigenbase</groupId> - <artifactId>eigenbase-properties</artifactId> - <version>1.1.4</version> - <exclusions> - <exclusion> - <groupId>*</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>eigenbase</groupId> - <artifactId>eigenbase-resgen</artifactId> - <version>1.3.4</version> - <exclusions> - <exclusion> - <groupId>*</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> <groupId>it.unimi.dsi</groupId> <artifactId>fastutil</artifactId> <version>6.5.0</version> http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/core/carbon/metadata/CarbonMetadata.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/core/carbon/metadata/CarbonMetadata.java b/core/src/main/java/org/carbondata/core/carbon/metadata/CarbonMetadata.java index a473048..488f548 100644 --- a/core/src/main/java/org/carbondata/core/carbon/metadata/CarbonMetadata.java +++ b/core/src/main/java/org/carbondata/core/carbon/metadata/CarbonMetadata.java @@ -56,7 +56,7 @@ public final class CarbonMetadata { * @param tableUniquName */ public void removeTable(String tableUniquName) { - tableInfoMap.remove(tableUniquName); + tableInfoMap.remove(convertToLowerCase(tableUniquName)); } /** @@ -68,7 +68,7 @@ public final class CarbonMetadata { * @param carbonTable */ public void addCarbonTable(CarbonTable carbonTable) { - tableInfoMap.put(carbonTable.getTableUniqueName(), carbonTable); + tableInfoMap.put(convertToLowerCase(carbonTable.getTableUniqueName()), carbonTable); } /** @@ -77,12 +77,12 @@ public final class CarbonMetadata { * @param tableInfo */ public void loadTableMetadata(TableInfo tableInfo) { - CarbonTable carbonTable = tableInfoMap.get(tableInfo.getTableUniqueName()); + CarbonTable carbonTable = tableInfoMap.get(convertToLowerCase(tableInfo.getTableUniqueName())); if (null == carbonTable || carbonTable.getTableLastUpdatedTime() < tableInfo .getLastUpdatedTime()) { carbonTable = new CarbonTable(); carbonTable.loadCarbonTable(tableInfo); - tableInfoMap.put(tableInfo.getTableUniqueName(), carbonTable); + tableInfoMap.put(convertToLowerCase(tableInfo.getTableUniqueName()), carbonTable); } } @@ -93,7 +93,7 @@ public final class CarbonMetadata { * @return */ public CarbonTable getCarbonTable(String tableUniqueName) { - return tableInfoMap.get(tableUniqueName); + return tableInfoMap.get(convertToLowerCase(tableUniqueName)); } /** @@ -104,6 +104,15 @@ public final class CarbonMetadata { } /** + * returns the given string in lowercase + * @param table + * @return + */ + public String convertToLowerCase(String table) { + return table.toLowerCase(); + } + + /** * method will return dimension instance based on the column identifier * and table instance passed to it. * http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/core/datastorage/store/compression/ValueCompressonHolder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/core/datastorage/store/compression/ValueCompressonHolder.java b/core/src/main/java/org/carbondata/core/datastorage/store/compression/ValueCompressonHolder.java index 64591d5..89bf334 100644 --- a/core/src/main/java/org/carbondata/core/datastorage/store/compression/ValueCompressonHolder.java +++ b/core/src/main/java/org/carbondata/core/datastorage/store/compression/ValueCompressonHolder.java @@ -88,6 +88,7 @@ public final class ValueCompressonHolder { break; case DATA_LONG: + case DATA_BIGINT: value.setValue(longCompressor.unCompress(data)); break; http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java b/core/src/main/java/org/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java index d37dcf1..3954e14 100644 --- a/core/src/main/java/org/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java +++ b/core/src/main/java/org/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java @@ -34,8 +34,6 @@ import static org.carbondata.core.keygenerator.directdictionary.timestamp.TimeSt import static org.carbondata.core.keygenerator.directdictionary.timestamp.TimeStampGranularityConstants.TIME_GRAN_MIN; import static org.carbondata.core.keygenerator.directdictionary.timestamp.TimeStampGranularityConstants.TIME_GRAN_SEC; -import org.apache.spark.sql.columnar.TIMESTAMP; - /** * The class provides the method to generate dictionary key and getting the actual value from * the dictionaryKey for direct dictionary column for TIMESTAMP type. @@ -119,7 +117,7 @@ public class TimeStampDirectDictionaryGenerator implements DirectDictionaryGener try { dateToStr = timeParser.parse(memberStr); } catch (ParseException e) { - LOGGER.error("Cannot convert" + TIMESTAMP.toString() + " to Time/Long type value" + LOGGER.error("Cannot convert" + memberStr + " to Time/Long type value" + e.getMessage()); } if (null == dateToStr) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/core/util/ValueCompressionUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/core/util/ValueCompressionUtil.java b/core/src/main/java/org/carbondata/core/util/ValueCompressionUtil.java index fdea2c8..ac2281d 100644 --- a/core/src/main/java/org/carbondata/core/util/ValueCompressionUtil.java +++ b/core/src/main/java/org/carbondata/core/util/ValueCompressionUtil.java @@ -30,9 +30,7 @@ import org.carbondata.core.datastorage.store.compression.ValueCompressonHolder; import org.carbondata.core.datastorage.store.compression.type.UnCompressByteArray; import org.carbondata.core.datastorage.store.compression.type.UnCompressDefaultLong; import org.carbondata.core.datastorage.store.compression.type.UnCompressMaxMinByte; -import org.carbondata.core.datastorage.store.compression.type.UnCompressMaxMinByteForLong; import org.carbondata.core.datastorage.store.compression.type.UnCompressMaxMinDefault; -import org.carbondata.core.datastorage.store.compression.type.UnCompressMaxMinDefaultLong; import org.carbondata.core.datastorage.store.compression.type.UnCompressMaxMinFloat; import org.carbondata.core.datastorage.store.compression.type.UnCompressMaxMinInt; import org.carbondata.core.datastorage.store.compression.type.UnCompressMaxMinLong; @@ -150,7 +148,8 @@ public final class ValueCompressionUtil { return new CompressionFinder(COMPRESSION_TYPE.CUSTOM_BIGDECIMAL, DataType.DATA_BYTE, DataType.DATA_BYTE); case 'l': - return new CompressionFinder(COMPRESSION_TYPE.NONE, DataType.DATA_LONG, DataType.DATA_LONG); + return new CompressionFinder(COMPRESSION_TYPE.NONE, + DataType.DATA_BIGINT, DataType.DATA_BIGINT); default: break; } @@ -160,6 +159,10 @@ public final class ValueCompressionUtil { getDataType((double) maxValue - (double) minValue, decimal, dataTypeSelected))) { return new CompressionFinder(COMPRESSION_TYPE.MAX_MIN, DataType.DATA_DOUBLE, getDataType((double) maxValue - (double) minValue, decimal, dataTypeSelected)); + } else if (getSize(getDataType((double) maxValue, decimal, dataTypeSelected)) < getSize( + getDataType((double) maxValue - (double) minValue, decimal, dataTypeSelected))) { + return new CompressionFinder(COMPRESSION_TYPE.NONE, DataType.DATA_DOUBLE, + getDataType((double) maxValue - (double) minValue, decimal, dataTypeSelected)); } else { return new CompressionFinder(COMPRESSION_TYPE.NONE, DataType.DATA_DOUBLE, getDataType((double) maxValue, decimal, dataTypeSelected)); @@ -311,6 +314,7 @@ public final class ValueCompressionUtil { return intResult; case DATA_LONG: + case DATA_BIGINT: long[] longResult = new long[value.length]; @@ -547,7 +551,7 @@ public final class ValueCompressionUtil { */ public static ValueCompressonHolder.UnCompressValue unCompressNone(DataType compDataType, DataType actualDataType) { - if (actualDataType == DataType.DATA_LONG) { + if (actualDataType == DataType.DATA_BIGINT) { return new UnCompressDefaultLong(); } else { switch (compDataType) { @@ -574,7 +578,6 @@ public final class ValueCompressionUtil { default: return new UnCompressNoneDefault(); - } } } @@ -584,42 +587,31 @@ public final class ValueCompressionUtil { */ public static ValueCompressonHolder.UnCompressValue unCompressMaxMin(DataType compDataType, DataType actualDataType) { - if (actualDataType == DataType.DATA_LONG) { - switch (compDataType) { - case DATA_BYTE: - return new UnCompressMaxMinByteForLong(); - case DATA_LONG: - return new UnCompressMaxMinDefaultLong(); - default: - return new UnCompressMaxMinDefaultLong(); - } - } else { - switch (compDataType) { - case DATA_BYTE: + switch (compDataType) { + case DATA_BYTE: - return new UnCompressMaxMinByte(); + return new UnCompressMaxMinByte(); - case DATA_SHORT: + case DATA_SHORT: - return new UnCompressMaxMinShort(); + return new UnCompressMaxMinShort(); - case DATA_INT: + case DATA_INT: - return new UnCompressMaxMinInt(); + return new UnCompressMaxMinInt(); - case DATA_LONG: + case DATA_LONG: - return new UnCompressMaxMinLong(); + return new UnCompressMaxMinLong(); - case DATA_FLOAT: + case DATA_FLOAT: - return new UnCompressMaxMinFloat(); + return new UnCompressMaxMinFloat(); - default: + default: - return new UnCompressMaxMinDefault(); + return new UnCompressMaxMinDefault(); - } } } @@ -881,6 +873,9 @@ public final class ValueCompressionUtil { DATA_LONG(), /** * */ + DATA_BIGINT(), /** + * + */ DATA_DOUBLE(); /** http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractDistinctCountAggregatorObjectSet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractDistinctCountAggregatorObjectSet.java b/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractDistinctCountAggregatorObjectSet.java deleted file mode 100644 index a02b1eb..0000000 --- a/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractDistinctCountAggregatorObjectSet.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.carbondata.query.aggregator.impl; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.math.BigDecimal; -import java.util.HashSet; -import java.util.Set; - -import org.carbondata.core.constants.CarbonCommonConstants; -import org.carbondata.query.aggregator.MeasureAggregator; - -public abstract class AbstractDistinctCountAggregatorObjectSet implements MeasureAggregator { - - private static final long serialVersionUID = 6313463368629960186L; - - protected Set<Object> valueSetForObj; - - public AbstractDistinctCountAggregatorObjectSet() { - valueSetForObj = new HashSet<Object>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - } - - /** - * just need to add the unique values to agg set - */ - @Override public void agg(double newVal) { - } - - /** - * Distinct count Aggregate function which update the Distinct count - * - * @param newVal new value - */ - @Override public void agg(Object newVal) { - valueSetForObj.add(newVal); - } - - /** - * Below method will be used to get the value byte array - */ - @Override public byte[] getByteArray() { - return null; - } - - @Override public Double getDoubleValue() { - return (double) valueSetForObj.size(); - } - - @Override public Long getLongValue() { - return (long) valueSetForObj.size(); - } - - @Override public BigDecimal getBigDecimalValue() { - return new BigDecimal(valueSetForObj.size()); - } - - @Override public Object getValueObject() { - return valueSetForObj.size(); - } - - @Override public void setNewValue(Object newValue) { - valueSetForObj.add(newValue); - } - - @Override public boolean isFirstTime() { - return false; - } - - @Override public void writeData(DataOutput output) throws IOException { - - } - - @Override public void readData(DataInput inPut) throws IOException { - - } - - public String toString() { - return valueSetForObj.size() + ""; - } - - @Override public void merge(byte[] value) { - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractMaxAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractMaxAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractMaxAggregator.java deleted file mode 100644 index 13e6640..0000000 --- a/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractMaxAggregator.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.carbondata.query.aggregator.impl; - -import java.io.ByteArrayInputStream; -import java.io.ObjectInput; -import java.io.ObjectInputStream; - -import org.carbondata.common.logging.LogService; -import org.carbondata.common.logging.LogServiceFactory; -import org.carbondata.core.util.CarbonUtil; - -public abstract class AbstractMaxAggregator extends AbstractMeasureAggregatorMaxMin { - - private static final LogService LOGGER = - LogServiceFactory.getLogService(AbstractMaxAggregator.class.getName()); - - protected void internalAgg(Object value) { - if (value instanceof Comparable) { - @SuppressWarnings("unchecked") Comparable<Object> newValue = ((Comparable<Object>) value); - aggVal = (aggVal == null || aggVal.compareTo(newValue) < 0) ? newValue : aggVal; - } - } - - @Override public void merge(byte[] value) { - if (0 == value.length) { - return; - } - ByteArrayInputStream bytesInputStream = null; - ObjectInput in = null; - try { - bytesInputStream = new ByteArrayInputStream(value); - in = new ObjectInputStream(bytesInputStream); - Object newVal = (Comparable<Object>) in.readObject(); - internalAgg(newVal); - firstTime = false; - } catch (Exception e) { - LOGGER.error(e, "Problem while merging byte array in maxAggregator: " + e.getMessage()); - } finally { - CarbonUtil.closeStreams(bytesInputStream); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractMeasureAggregatorDummy.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractMeasureAggregatorDummy.java b/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractMeasureAggregatorDummy.java deleted file mode 100644 index 155687c..0000000 --- a/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractMeasureAggregatorDummy.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.carbondata.query.aggregator.impl; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.carbondata.query.aggregator.MeasureAggregator; - -/** - * AbstractMeasureAggregatorDummy - * Used for custom Carbon Aggregator dummy - */ -public abstract class AbstractMeasureAggregatorDummy extends AbstractMeasureAggregatorBasic { - private static final long serialVersionUID = 1L; - - @Override public int compareTo(MeasureAggregator o) { - if (equals(o)) { - return 0; - } - return -1; - } - - @Override public boolean equals(Object arg0) { - return super.equals(arg0); - } - - @Override public int hashCode() { - return super.hashCode(); - } - - @Override public byte[] getByteArray() { - return null; - } - - @Override public void merge(MeasureAggregator aggregator) { - } - - @Override public MeasureAggregator getCopy() { - return null; - } - - @Override public void writeData(DataOutput output) throws IOException { - } - - @Override public void readData(DataInput inPut) throws IOException { - } - - @Override public void merge(byte[] value) { - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractMeasureAggregatorMaxMin.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractMeasureAggregatorMaxMin.java b/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractMeasureAggregatorMaxMin.java index 4f3163a..99a8ed9 100644 --- a/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractMeasureAggregatorMaxMin.java +++ b/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractMeasureAggregatorMaxMin.java @@ -34,6 +34,7 @@ import org.carbondata.common.logging.LogService; import org.carbondata.common.logging.LogServiceFactory; import org.carbondata.core.util.CarbonUtil; import org.carbondata.query.aggregator.MeasureAggregator; +import org.carbondata.query.aggregator.impl.max.MaxAggregator; /** * AbstractMeasureAggregatorMaxMin @@ -45,9 +46,9 @@ public abstract class AbstractMeasureAggregatorMaxMin implements MeasureAggregat private static final LogService LOGGER = LogServiceFactory.getLogService(MaxAggregator.class.getName()); - protected Comparable<Object> aggVal; + public Comparable<Object> aggVal; - protected boolean firstTime = true; + public boolean firstTime = true; protected abstract void internalAgg(Object value); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractMinAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractMinAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractMinAggregator.java deleted file mode 100644 index 19fc69f..0000000 --- a/core/src/main/java/org/carbondata/query/aggregator/impl/AbstractMinAggregator.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.carbondata.query.aggregator.impl; - -import java.io.ByteArrayInputStream; -import java.io.ObjectInput; -import java.io.ObjectInputStream; - -import org.carbondata.common.logging.LogService; -import org.carbondata.common.logging.LogServiceFactory; -import org.carbondata.core.util.CarbonUtil; - -public abstract class AbstractMinAggregator extends AbstractMeasureAggregatorMaxMin { - - private static final LogService LOGGER = - LogServiceFactory.getLogService(AbstractMinAggregator.class.getName()); - - protected void internalAgg(Object value) { - if (value instanceof Comparable) { - @SuppressWarnings("unchecked") Comparable<Object> newValue = ((Comparable<Object>) value); - aggVal = (aggVal == null || aggVal.compareTo(newValue) > 0) ? newValue : aggVal; - } - } - - @Override public void merge(byte[] value) { - if (0 == value.length) { - return; - } - ByteArrayInputStream bis = null; - ObjectInput objectInput = null; - try { - bis = new ByteArrayInputStream(value); - objectInput = new ObjectInputStream(bis); - Object newVal = (Comparable<Object>) objectInput.readObject(); - internalAgg(newVal); - firstTime = false; - } catch (Exception e) { - LOGGER.error(e, "Problem while merging byte array in minAggregator: " + e.getMessage()); - } finally { - CarbonUtil.closeStreams(bis); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/AvgBigDecimalAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/AvgBigDecimalAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/AvgBigDecimalAggregator.java deleted file mode 100644 index 8c67cfc..0000000 --- a/core/src/main/java/org/carbondata/query/aggregator/impl/AvgBigDecimalAggregator.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.carbondata.query.aggregator.impl; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.math.BigDecimal; -import java.nio.ByteBuffer; - -import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; -import org.carbondata.core.constants.CarbonCommonConstants; -import org.carbondata.core.util.DataTypeUtil; -import org.carbondata.query.aggregator.MeasureAggregator; - -public class AvgBigDecimalAggregator extends AbstractMeasureAggregatorBasic { - - /** - * serialVersionUID - */ - private static final long serialVersionUID = 5463736686281089871L; - - /** - * total number of aggregate values - */ - protected double count; - - /** - * aggregate value - */ - protected BigDecimal aggVal; - - public AvgBigDecimalAggregator() { - aggVal = new BigDecimal(0); - } - - /** - * Average Aggregate function which will add all the aggregate values and it - * will increment the total count every time, for average value - * - * @param newVal new value - */ - @Override public void agg(Object newVal) { - if (newVal instanceof byte[]) { - ByteBuffer buffer = ByteBuffer.wrap((byte[]) newVal); - buffer.rewind(); - while (buffer.hasRemaining()) { - byte[] valueByte = new byte[buffer.getInt()]; - buffer.get(valueByte); - BigDecimal valueBigDecimal = DataTypeUtil.byteToBigDecimal(valueByte); - aggVal = aggVal.add(valueBigDecimal); - - count += buffer.getDouble(); - firstTime = false; - } - return; - } - - if (firstTime) { - aggVal = (BigDecimal) newVal; - firstTime = false; - } else { - aggVal = aggVal.add((BigDecimal) newVal); - } - count++; - } - - @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { - if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { - BigDecimal value = dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index); - aggVal = aggVal.add(value); - firstTime = false; - count++; - } - } - - /** - * Below method will be used to get the value byte array - */ - @Override public byte[] getByteArray() { - if (firstTime) { - return new byte[0]; - } - byte[] bytes = DataTypeUtil.bigDecimalToByte(aggVal); - ByteBuffer allocate = - ByteBuffer.allocate(4 + bytes.length + CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE); - allocate.putInt(bytes.length); - allocate.put(bytes); - allocate.putDouble(count); - allocate.rewind(); - - return allocate.array(); - } - - /** - * Return the average of the aggregate values - * - * @return average aggregate value - */ - @Override public BigDecimal getBigDecimalValue() { - return aggVal.divide(new BigDecimal(count), 6); - } - - /** - * This method merge the aggregated value, in average aggregator it will add - * count and aggregate value - * - * @param aggregator Avg Aggregator - */ - @Override public void merge(MeasureAggregator aggregator) { - AvgBigDecimalAggregator avgAggregator = (AvgBigDecimalAggregator) aggregator; - if (!avgAggregator.isFirstTime()) { - aggVal = aggVal.add(avgAggregator.aggVal); - count += avgAggregator.count; - firstTime = false; - } - } - - /** - * This method return the average value as an object - * - * @return average value as an object - */ - @Override public Object getValueObject() { - return aggVal.divide(new BigDecimal(count)); - } - - /** - * @see MeasureAggregator#setNewValue(Object) - */ - @Override public void setNewValue(Object newValue) { - aggVal = (BigDecimal) newValue; - count = 1; - } - - @Override public void writeData(DataOutput output) throws IOException { - output.writeBoolean(firstTime); - output.writeUTF(aggVal.toString()); - output.writeDouble(count); - - } - - @Override public void readData(DataInput inPut) throws IOException { - firstTime = inPut.readBoolean(); - aggVal = new BigDecimal(inPut.readUTF()); - count = inPut.readDouble(); - } - - @Override public MeasureAggregator getCopy() { - AvgBigDecimalAggregator avg = new AvgBigDecimalAggregator(); - avg.aggVal = aggVal; - avg.count = count; - avg.firstTime = firstTime; - return avg; - } - - @Override public int compareTo(MeasureAggregator o) { - BigDecimal val = getBigDecimalValue(); - BigDecimal otherVal = o.getBigDecimalValue(); - - return val.compareTo(otherVal); - } - - @Override public boolean equals(Object obj) { - if (!(obj instanceof AvgBigDecimalAggregator)) { - return false; - } - AvgBigDecimalAggregator o = (AvgBigDecimalAggregator) obj; - return getBigDecimalValue().equals(o.getBigDecimalValue()); - } - - @Override public int hashCode() { - return getBigDecimalValue().hashCode(); - } - - @Override public void merge(byte[] value) { - if (0 == value.length) { - return; - } - ByteBuffer buffer = ByteBuffer.wrap(value); - - byte[] valueByte = new byte[buffer.getInt()]; - buffer.get(valueByte); - BigDecimal valueBigDecimal = DataTypeUtil.byteToBigDecimal(valueByte); - aggVal = aggVal.add(valueBigDecimal); - count += buffer.getDouble(); - firstTime = false; - } - - public String toString() { - return (aggVal.divide(new BigDecimal(count))) + ""; - } - - @Override public MeasureAggregator getNew() { - return new AvgBigDecimalAggregator(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/AvgDoubleAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/AvgDoubleAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/AvgDoubleAggregator.java deleted file mode 100644 index aea7007..0000000 --- a/core/src/main/java/org/carbondata/query/aggregator/impl/AvgDoubleAggregator.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.carbondata.query.aggregator.impl; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; -import org.carbondata.core.constants.CarbonCommonConstants; -import org.carbondata.query.aggregator.MeasureAggregator; - -public class AvgDoubleAggregator extends AbstractMeasureAggregatorBasic { - - /** - * serialVersionUID - */ - private static final long serialVersionUID = 5463736686281089871L; - - /** - * total number of aggregate values - */ - protected double count; - - /** - * aggregate value - */ - protected double aggVal; - - /** - * Average Aggregate function which will add all the aggregate values and it - * will increment the total count every time, for average value - * - * @param newVal new value - */ - @Override public void agg(double newVal) { - aggVal += newVal; - count++; - firstTime = false; - } - - /** - * Average Aggregate function which will add all the aggregate values and it - * will increment the total count every time, for average value - * - * @param newVal new value - */ - @Override public void agg(Object newVal) { - if (newVal instanceof byte[]) { - ByteBuffer buffer = ByteBuffer.wrap((byte[]) newVal); - buffer.rewind(); - while (buffer.hasRemaining()) { - aggVal += buffer.getDouble(); - count += buffer.getDouble(); - firstTime = false; - } - return; - } - aggVal += ((Number) newVal).doubleValue(); - count++; - firstTime = false; - } - - @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { - if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { - aggVal += dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index); - count++; - firstTime = false; - } - } - - /** - * Below method will be used to get the value byte array - */ - @Override public byte[] getByteArray() { - if (firstTime) { - return new byte[0]; - } - ByteBuffer buffer = ByteBuffer.allocate(2 * CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE); - buffer.putDouble(aggVal); - buffer.putDouble(count); - return buffer.array(); - } - - /** - * Return the average of the aggregate values - * - * @return average aggregate value - */ - @Override public Double getDoubleValue() { - return aggVal / count; - } - - /** - * This method merge the aggregated value, in average aggregator it will add - * count and aggregate value - * - * @param aggregator Avg Aggregator - */ - @Override public void merge(MeasureAggregator aggregator) { - AvgDoubleAggregator avgAggregator = (AvgDoubleAggregator) aggregator; - if (!avgAggregator.isFirstTime()) { - aggVal += avgAggregator.aggVal; - count += avgAggregator.count; - firstTime = false; - } - } - - /** - * This method return the average value as an object - * - * @return average value as an object - */ - @Override public Object getValueObject() { - return aggVal / count; - } - - /** - * @see MeasureAggregator#setNewValue(Object) - */ - @Override public void setNewValue(Object newValue) { - aggVal = (Double) newValue; - count = 1; - } - - @Override public void writeData(DataOutput output) throws IOException { - output.writeBoolean(firstTime); - output.writeDouble(aggVal); - output.writeDouble(count); - - } - - @Override public void readData(DataInput inPut) throws IOException { - firstTime = inPut.readBoolean(); - aggVal = inPut.readDouble(); - count = inPut.readDouble(); - } - - @Override public MeasureAggregator getCopy() { - AvgDoubleAggregator avg = new AvgDoubleAggregator(); - avg.aggVal = aggVal; - avg.count = count; - avg.firstTime = firstTime; - return avg; - } - - @Override public int compareTo(MeasureAggregator o) { - double val = getDoubleValue(); - double otherVal = o.getDoubleValue(); - if (val > otherVal) { - return 1; - } - if (val < otherVal) { - return -1; - } - return 0; - } - - @Override public boolean equals(Object obj) { - if(!(obj instanceof AvgDoubleAggregator)) { - return false; - } - AvgDoubleAggregator o = (AvgDoubleAggregator)obj; - return getDoubleValue().equals(o.getDoubleValue()); - } - - @Override public int hashCode() { - return getDoubleValue().hashCode(); - } - - @Override public void merge(byte[] value) { - if (0 == value.length) { - return; - } - ByteBuffer buffer = ByteBuffer.wrap(value); - aggVal += buffer.getDouble(); - count += buffer.getDouble(); - firstTime = false; - } - - public String toString() { - return (aggVal / count) + ""; - } - - @Override public MeasureAggregator getNew() { - return new AvgDoubleAggregator(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/AvgLongAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/AvgLongAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/AvgLongAggregator.java deleted file mode 100644 index d608c90..0000000 --- a/core/src/main/java/org/carbondata/query/aggregator/impl/AvgLongAggregator.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.carbondata.query.aggregator.impl; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; -import org.carbondata.core.constants.CarbonCommonConstants; -import org.carbondata.query.aggregator.MeasureAggregator; - -public class AvgLongAggregator extends AbstractMeasureAggregatorBasic { - - /** - * serialVersionUID - */ - private static final long serialVersionUID = 5463736686281089871L; - - /** - * total number of aggregate values - */ - protected double count; - - /** - * aggregate value - */ - protected long aggVal; - - /** - * Average Aggregate function which will add all the aggregate values and it - * will increment the total count every time, for average value - * - * @param newVal new value - */ - @Override public void agg(Object newVal) { - if (newVal instanceof byte[]) { - ByteBuffer buffer = ByteBuffer.wrap((byte[]) newVal); - buffer.rewind(); - while (buffer.hasRemaining()) { - aggVal += buffer.getLong(); - count += buffer.getDouble(); - firstTime = false; - } - return; - } - aggVal += (Long) newVal; - count++; - firstTime = false; - } - - @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { - if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { - aggVal += dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index); - count++; - firstTime = false; - } - } - - /** - * Below method will be used to get the value byte array - */ - @Override public byte[] getByteArray() { - if (firstTime) { - return new byte[0]; - } - ByteBuffer buffer = ByteBuffer.allocate( - CarbonCommonConstants.LONG_SIZE_IN_BYTE + CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE); - buffer.putLong(aggVal); - buffer.putDouble(count); - return buffer.array(); - } - - @Override public Long getLongValue() { - return aggVal / (long) count; - } - - /** - * This method merge the aggregated value, in average aggregator it will add - * count and aggregate value - * - * @param aggregator Avg Aggregator - */ - @Override public void merge(MeasureAggregator aggregator) { - AvgLongAggregator avgAggregator = (AvgLongAggregator) aggregator; - if (!avgAggregator.isFirstTime()) { - aggVal += avgAggregator.aggVal; - count += avgAggregator.count; - firstTime = false; - } - } - - /** - * This method return the average value as an object - * - * @return average value as an object - */ - @Override public Object getValueObject() { - return aggVal / count; - } - - /** - * @see MeasureAggregator#setNewValue(Object) - */ - @Override public void setNewValue(Object newValue) { - aggVal = (Long) newValue; - count = 1; - } - - @Override public void writeData(DataOutput output) throws IOException { - output.writeBoolean(firstTime); - output.writeLong(aggVal); - output.writeDouble(count); - - } - - @Override public void readData(DataInput inPut) throws IOException { - firstTime = inPut.readBoolean(); - aggVal = inPut.readLong(); - count = inPut.readDouble(); - } - - @Override public MeasureAggregator getCopy() { - AvgLongAggregator avg = new AvgLongAggregator(); - avg.aggVal = aggVal; - avg.count = count; - avg.firstTime = firstTime; - return avg; - } - - @Override public int compareTo(MeasureAggregator o) { - long val = getLongValue(); - long otherVal = o.getLongValue(); - if (val > otherVal) { - return 1; - } else if (val < otherVal) { - return -1; - } else { - return 0; - } - } - - @Override public boolean equals(Object obj) { - if(!(obj instanceof AvgLongAggregator)) { - return false; - } - AvgLongAggregator o = (AvgLongAggregator)obj; - return getLongValue().equals(o.getLongValue()); - } - - @Override public int hashCode() { - return getLongValue().hashCode(); - } - - @Override public void merge(byte[] value) { - if (0 == value.length) { - return; - } - ByteBuffer buffer = ByteBuffer.wrap(value); - aggVal += buffer.getLong(); - count += buffer.getDouble(); - firstTime = false; - } - - public String toString() { - return (aggVal / count) + ""; - } - - @Override public MeasureAggregator getNew() { - // TODO Auto-generated method stub - return new AvgLongAggregator(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/CountAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/CountAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/CountAggregator.java deleted file mode 100644 index 12c2061..0000000 --- a/core/src/main/java/org/carbondata/query/aggregator/impl/CountAggregator.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.carbondata.query.aggregator.impl; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.math.BigDecimal; -import java.nio.ByteBuffer; - -import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; -import org.carbondata.core.constants.CarbonCommonConstants; -import org.carbondata.query.aggregator.MeasureAggregator; - -/** - * Class Description : It will return total count of values - */ -public class CountAggregator implements MeasureAggregator { - - /** - * serialVersionUID - */ - private static final long serialVersionUID = 2678878935295306313L; - - /** - * aggregate value - */ - private double aggVal; - - /** - * Count Aggregate function which update the total count - * - * @param newVal new value - */ - @Override public void agg(double newVal) { - aggVal++; - } - - /** - * Count Aggregate function which update the total count - * - * @param newVal new value - */ - @Override public void agg(Object newVal) { - aggVal++; - } - - @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { - if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { - aggVal++; - } - } - - /** - * Below method will be used to get the value byte array - */ - @Override public byte[] getByteArray() { - ByteBuffer buffer = ByteBuffer.allocate(CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE); - buffer.putDouble(aggVal); - return buffer.array(); - } - - /** - * Returns the total count - * - * @return total count - */ - @Override public Double getDoubleValue() { - return aggVal; - } - - @Override public Long getLongValue() { - return (long) aggVal; - } - - @Override public BigDecimal getBigDecimalValue() { - return new BigDecimal(aggVal); - } - - /** - * Merge the total count with the aggregator - * - * @param aggregator count aggregator - */ - @Override public void merge(MeasureAggregator aggregator) { - CountAggregator countAggregator = (CountAggregator) aggregator; - aggVal += countAggregator.aggVal; - } - - /** - * Overloaded Aggregate function will be used for Aggregate tables because - * aggregate table will have fact_count as a measure. It will update the - * total count - * - * @param newVal - * new value - * @param factCount - * total fact count - * - */ - // @Override - // public void agg(double newVal, double factCount) - // { - // agg(newVal, null, 0, 0); - // } - - /** - * This method return the count value as an object - * - * @return count value as an object - */ - - @Override public Object getValueObject() { - return aggVal; - } - - /** - * @see MeasureAggregator#setNewValue(Object) - */ - @Override public void setNewValue(Object newValue) { - aggVal += Double.parseDouble(String.valueOf(newValue)); - } - - @Override public boolean isFirstTime() { - return false; - } - - @Override public void writeData(DataOutput output) throws IOException { - output.writeDouble(aggVal); - - } - - @Override public void readData(DataInput inPut) throws IOException { - aggVal = inPut.readDouble(); - } - - @Override public MeasureAggregator getCopy() { - CountAggregator aggregator = new CountAggregator(); - aggregator.aggVal = aggVal; - return aggregator; - } - - @Override public void merge(byte[] value) { - if (0 == value.length) { - return; - } - ByteBuffer buffer = ByteBuffer.wrap(value); - aggVal += buffer.getDouble(); - } - - @Override public int compareTo(MeasureAggregator obj) { - double val = getDoubleValue(); - double otherVal = obj.getDoubleValue(); - if (val > otherVal) { - return 1; - } - if (val < otherVal) { - return -1; - } - return 0; - } - - @Override public boolean equals(Object obj) { - if(!(obj instanceof CountAggregator)) { - return false; - } - CountAggregator o = (CountAggregator)obj; - return getDoubleValue().equals(o.getDoubleValue()); - } - - @Override public int hashCode() { - return getDoubleValue().hashCode(); - } - - @Override public MeasureAggregator get() { - return this; - } - - public String toString() { - return aggVal + ""; - } - - @Override public MeasureAggregator getNew() { - return new CountAggregator(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/DistinctCountAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/DistinctCountAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/DistinctCountAggregator.java deleted file mode 100644 index f634ab3..0000000 --- a/core/src/main/java/org/carbondata/query/aggregator/impl/DistinctCountAggregator.java +++ /dev/null @@ -1,319 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.carbondata.query.aggregator.impl; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.IOException; -import java.math.BigDecimal; -import java.nio.ByteBuffer; - -import org.carbondata.common.logging.LogService; -import org.carbondata.common.logging.LogServiceFactory; -import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; -import org.carbondata.query.aggregator.MeasureAggregator; - -import org.roaringbitmap.IntIterator; -import org.roaringbitmap.RoaringBitmap; - -/** - * * The distinct count aggregator - * Ex: - * ID NAME Sales - * <p>1 a 200 - * <p>2 a 100 - * <p>3 a 200 - * select count(distinct sales) # would result 2 - * select count(sales) # would result 3 - */ -public class DistinctCountAggregator implements MeasureAggregator { - - private static final LogService LOGGER = - LogServiceFactory.getLogService(DistinctCountAggregator.class.getName()); - /** - * - */ - private static final long serialVersionUID = 6313463368629960186L; - /** - * For Spark CARBON to avoid heavy object transfer it better to flatten - * the Aggregators. There is no aggregation expected after setting this value. - */ - private Double computedFixedValue; - /** - * - */ - // private Set<Double> valueSet; - private RoaringBitmap valueSet; - - private byte[] data; - - private double minValue; - - public DistinctCountAggregator(Object minValue) { - valueSet = new RoaringBitmap(); - if (minValue instanceof BigDecimal) { - this.minValue = ((BigDecimal) minValue).doubleValue(); - } else if (minValue instanceof Long) { - this.minValue = ((Long) minValue).doubleValue(); - } else { - this.minValue = (Double) minValue; - } - } - - public DistinctCountAggregator() { - valueSet = new RoaringBitmap(); - } - - /** - * just need to add the unique values to agg set - */ - @Override public void agg(double newVal) { - valueSet.add((int) (newVal - minValue)); - } - - /** - * Distinct count Aggregate function which update the Distinct count - * - * @param newVal new value - */ - @Override public void agg(Object newVal) { - if (newVal instanceof byte[]) { - byte[] values = (byte[]) newVal; - ByteBuffer buffer = ByteBuffer.wrap(values); - buffer.rewind(); - while (buffer.hasRemaining()) { - valueSet.add(buffer.getInt()); - } - return; - } else { - double value = new Double(newVal.toString()); - agg(value); - } - } - - @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { - if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { - valueSet.add((int) dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index)); - } - } - - /** - * Below method will be used to get the value byte array - */ - @Override public byte[] getByteArray() { - if (valueSet.getCardinality() == 0) { - return new byte[0]; - } - IntIterator iterator = valueSet.getIntIterator(); - ByteBuffer buffer = ByteBuffer.allocate(valueSet.getCardinality() * 4 + 8); - buffer.putDouble(minValue); - while (iterator.hasNext()) { - buffer.putInt(iterator.next()); - } - buffer.rewind(); - return buffer.array(); - } - - private void agg(RoaringBitmap set2, double minValue) { - if (this.minValue == minValue) { - valueSet.or(set2); - } else { - if (this.minValue > minValue) { - IntIterator intIterator = valueSet.getIntIterator(); - while (intIterator.hasNext()) { - set2.add((int) ((double) (intIterator.next() + this.minValue) - minValue)); - } - this.minValue = minValue; - this.valueSet = set2; - } else { - IntIterator intIterator = set2.getIntIterator(); - while (intIterator.hasNext()) { - valueSet.add((int) ((double) (intIterator.next() + minValue) - this.minValue)); - } - } - } - } - - /** - * merge the valueset so that we get the count of unique values - */ - @Override public void merge(MeasureAggregator aggregator) { - DistinctCountAggregator distinctCountAggregator = (DistinctCountAggregator) aggregator; - readData(); - distinctCountAggregator.readData(); - if (distinctCountAggregator.valueSet != null) { - agg(distinctCountAggregator.valueSet, distinctCountAggregator.minValue); - } - } - - @Override public Double getDoubleValue() { - if (computedFixedValue == null) { - readData(); - return (double) valueSet.getCardinality(); - } - return computedFixedValue; - } - - @Override public Long getLongValue() { - if (computedFixedValue == null) { - readData(); - return (long) valueSet.getCardinality(); - } - return computedFixedValue.longValue(); - } - - @Override public BigDecimal getBigDecimalValue() { - if (computedFixedValue == null) { - readData(); - return new BigDecimal(valueSet.getCardinality()); - } - return new BigDecimal(computedFixedValue); - } - - @Override public Object getValueObject() { - return valueSet.getCardinality(); - } - - @Override public void setNewValue(Object newValue) { - computedFixedValue = (Double) newValue; - valueSet = null; - } - - @Override public boolean isFirstTime() { - return false; - } - - @Override public void writeData(DataOutput output) throws IOException { - - if (computedFixedValue != null) { - ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 8); - byteBuffer.putInt(-1); - byteBuffer.putDouble(computedFixedValue); - byteBuffer.flip(); - output.write(byteBuffer.array()); - } else { - if (valueSet != null) { - valueSet.serialize(output); - } else { - output.write(data); - } - } - } - - @Override public void readData(DataInput inPut) throws IOException { - valueSet = new RoaringBitmap(); - valueSet.deserialize(inPut); - } - - private void readData() { - if (data != null && (valueSet == null || valueSet.isEmpty())) { - ByteArrayInputStream stream = new ByteArrayInputStream(data); - DataInputStream outputStream = new DataInputStream(stream); - try { - readData(outputStream); - outputStream.close(); - data = null; - } catch (IOException e) { - LOGGER.error(e, e.getMessage()); - } - } - } - - @Override public MeasureAggregator getCopy() { - DistinctCountAggregator aggr = new DistinctCountAggregator(minValue); - aggr.valueSet = valueSet.clone(); - return aggr; - } - - @Override public int compareTo(MeasureAggregator measureAggr) { - double compFixedVal = getDoubleValue(); - double otherVal = measureAggr.getDoubleValue(); - if (compFixedVal > otherVal) { - return 1; - } - if (compFixedVal < otherVal) { - return -1; - } - return 0; - } - - @Override public boolean equals(Object obj) { - if(!(obj instanceof DistinctCountAggregator)) { - return false; - } - DistinctCountAggregator o = (DistinctCountAggregator) obj; - return getDoubleValue().equals(o.getDoubleValue()); - } - - @Override public int hashCode() { - return getDoubleValue().hashCode(); - } - - @Override public MeasureAggregator get() { - ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); - DataOutputStream outputStream = new DataOutputStream(byteStream); - try { - writeData(outputStream); - } catch (IOException ex) { - LOGGER.error(ex, ex.getMessage()); - } - data = byteStream.toByteArray(); - valueSet = null; - return this; - } - - public String toString() { - if (computedFixedValue == null) { - readData(); - return valueSet.getCardinality() + ""; - } - return computedFixedValue + ""; - } - - public RoaringBitmap getBitMap() { - return valueSet; - } - - public double getMinValue() { - return minValue; - } - - @Override public void merge(byte[] value) { - if (0 == value.length) { - return; - } - ByteBuffer buffer = ByteBuffer.wrap(value); - buffer.rewind(); - double currentMinValue = buffer.getDouble(); - while (buffer.hasRemaining()) { - agg(buffer.getInt() + currentMinValue); - } - } - - @Override public MeasureAggregator getNew() { - // TODO Auto-generated method stub - return new DistinctCountAggregator(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/DistinctCountAggregatorObjectSet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/DistinctCountAggregatorObjectSet.java b/core/src/main/java/org/carbondata/query/aggregator/impl/DistinctCountAggregatorObjectSet.java deleted file mode 100644 index f6210dc..0000000 --- a/core/src/main/java/org/carbondata/query/aggregator/impl/DistinctCountAggregatorObjectSet.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.carbondata.query.aggregator.impl; - -import java.util.HashSet; -import java.util.Set; - -import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; -import org.carbondata.query.aggregator.MeasureAggregator; - -public class DistinctCountAggregatorObjectSet extends AbstractDistinctCountAggregatorObjectSet { - - private static final long serialVersionUID = 6313463368629960186L; - - /** - * just need to add the unique values to agg set - */ - @Override public void agg(double newVal) { - valueSetForObj.add(newVal); - } - - @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { - if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { - valueSetForObj.add(dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index)); - } - } - - private void agg(Set<Object> set2) { - valueSetForObj.addAll(set2); - } - - /** - * merge the valueset so that we get the count of unique values - */ - @Override public void merge(MeasureAggregator aggregator) { - DistinctCountAggregatorObjectSet distinctCountAggregator = - (DistinctCountAggregatorObjectSet) aggregator; - agg(distinctCountAggregator.valueSetForObj); - } - - @Override public MeasureAggregator getCopy() { - DistinctCountAggregatorObjectSet aggregator = new DistinctCountAggregatorObjectSet(); - aggregator.valueSetForObj = new HashSet<Object>(valueSetForObj); - return aggregator; - } - - @Override public int compareTo(MeasureAggregator measureAggr) { - double valueSetForObjSize = getDoubleValue(); - double otherVal = measureAggr.getDoubleValue(); - if (valueSetForObjSize > otherVal) { - return 1; - } - if (valueSetForObjSize < otherVal) { - return -1; - } - return 0; - } - - @Override public boolean equals(Object obj) { - if (!(obj instanceof DistinctCountAggregatorObjectSet)) { - return false; - } - DistinctCountAggregatorObjectSet o = (DistinctCountAggregatorObjectSet) obj; - return getDoubleValue().equals(o.getDoubleValue()); - } - - @Override public int hashCode() { - return getDoubleValue().hashCode(); - } - - @Override public MeasureAggregator get() { - return this; - } - - @Override public MeasureAggregator getNew() { - return new DistinctCountAggregatorObjectSet(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/DistinctCountBigDecimalAggregatorObjectSet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/DistinctCountBigDecimalAggregatorObjectSet.java b/core/src/main/java/org/carbondata/query/aggregator/impl/DistinctCountBigDecimalAggregatorObjectSet.java deleted file mode 100644 index 21dc142..0000000 --- a/core/src/main/java/org/carbondata/query/aggregator/impl/DistinctCountBigDecimalAggregatorObjectSet.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.carbondata.query.aggregator.impl; - -import java.math.BigDecimal; -import java.util.HashSet; -import java.util.Set; - -import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; -import org.carbondata.query.aggregator.MeasureAggregator; - -public class DistinctCountBigDecimalAggregatorObjectSet - extends AbstractDistinctCountAggregatorObjectSet { - - private static final long serialVersionUID = 6313463368629960186L; - - @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { - if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { - valueSetForObj.add(dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index)); - } - } - - private void agg(Set<Object> set2) { - valueSetForObj.addAll(set2); - } - - /** - * merge the valueset so that we get the count of unique values - */ - @Override public void merge(MeasureAggregator aggregator) { - DistinctCountBigDecimalAggregatorObjectSet distinctCountBigDecimalAggregatorObjectSet = - (DistinctCountBigDecimalAggregatorObjectSet) aggregator; - agg(distinctCountBigDecimalAggregatorObjectSet.valueSetForObj); - } - - @Override public MeasureAggregator getCopy() { - DistinctCountBigDecimalAggregatorObjectSet aggregator = - new DistinctCountBigDecimalAggregatorObjectSet(); - aggregator.valueSetForObj = new HashSet<Object>(valueSetForObj); - return aggregator; - } - - @Override public int compareTo(MeasureAggregator measureAggr) { - BigDecimal valueSetForObjSize = getBigDecimalValue(); - BigDecimal otherVal = measureAggr.getBigDecimalValue(); - return valueSetForObjSize.compareTo(otherVal); - } - - @Override public boolean equals(Object obj) { - if (!(obj instanceof DistinctCountBigDecimalAggregatorObjectSet)) { - return false; - } - DistinctCountBigDecimalAggregatorObjectSet o = (DistinctCountBigDecimalAggregatorObjectSet) obj; - return getBigDecimalValue().equals(o.getBigDecimalValue()); - } - - @Override public int hashCode() { - return getBigDecimalValue().hashCode(); - } - - @Override public MeasureAggregator get() { - return this; - } - - @Override public MeasureAggregator getNew() { - return new DistinctCountBigDecimalAggregatorObjectSet(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/DistinctCountLongAggregatorObjectSet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/DistinctCountLongAggregatorObjectSet.java b/core/src/main/java/org/carbondata/query/aggregator/impl/DistinctCountLongAggregatorObjectSet.java deleted file mode 100644 index 7f8cd0a..0000000 --- a/core/src/main/java/org/carbondata/query/aggregator/impl/DistinctCountLongAggregatorObjectSet.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.carbondata.query.aggregator.impl; - -import java.util.HashSet; -import java.util.Set; - -import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; -import org.carbondata.query.aggregator.MeasureAggregator; - -public class DistinctCountLongAggregatorObjectSet extends AbstractDistinctCountAggregatorObjectSet { - - private static final long serialVersionUID = 6313463368629960186L; - - @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { - if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { - valueSetForObj.add(dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index)); - } - } - - private void agg(Set<Object> set2) { - valueSetForObj.addAll(set2); - } - - /** - * merge the valueset so that we get the count of unique values - */ - @Override public void merge(MeasureAggregator aggregator) { - DistinctCountLongAggregatorObjectSet distinctCountAggregator = - (DistinctCountLongAggregatorObjectSet) aggregator; - agg(distinctCountAggregator.valueSetForObj); - } - - @Override public MeasureAggregator getCopy() { - DistinctCountLongAggregatorObjectSet aggregator = new DistinctCountLongAggregatorObjectSet(); - aggregator.valueSetForObj = new HashSet<Object>(valueSetForObj); - return aggregator; - } - - @Override public int compareTo(MeasureAggregator measureAggr) { - long valueSetForObjSize = getLongValue(); - long otherVal = measureAggr.getLongValue(); - if (valueSetForObjSize > otherVal) { - return 1; - } - if (valueSetForObjSize < otherVal) { - return -1; - } - return 0; - } - - @Override public boolean equals(Object obj) { - if (!(obj instanceof DistinctCountLongAggregatorObjectSet)) { - return false; - } - DistinctCountLongAggregatorObjectSet o = (DistinctCountLongAggregatorObjectSet) obj; - return getLongValue().equals(o.getLongValue()); - } - - @Override public int hashCode() { - return getLongValue().hashCode(); - } - - @Override public MeasureAggregator get() { - return this; - } - - @Override public MeasureAggregator getNew() { - return new DistinctCountLongAggregatorObjectSet(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/DistinctStringCountAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/DistinctStringCountAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/DistinctStringCountAggregator.java deleted file mode 100644 index 89cdeeb..0000000 --- a/core/src/main/java/org/carbondata/query/aggregator/impl/DistinctStringCountAggregator.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.carbondata.query.aggregator.impl; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.math.BigDecimal; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.HashSet; -import java.util.Set; - -import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; -import org.carbondata.core.constants.CarbonCommonConstants; -import org.carbondata.query.aggregator.MeasureAggregator; - -public class DistinctStringCountAggregator implements MeasureAggregator { - private static final long serialVersionUID = 6313463368629960186L; - - private Set<String> valueSetForStr; - - public DistinctStringCountAggregator() { - this.valueSetForStr = new HashSet<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - } - - public void agg(double newVal) { - } - - public void agg(String newVal) { - this.valueSetForStr.add(newVal); - } - - private void agg(Set<String> set2) { - this.valueSetForStr.addAll(set2); - } - - public void merge(MeasureAggregator aggregator) { - DistinctStringCountAggregator distinctCountAggregator = - (DistinctStringCountAggregator) aggregator; - agg(distinctCountAggregator.valueSetForStr); - } - - public Double getDoubleValue() { - return (double) this.valueSetForStr.size(); - } - - public Long getLongValue() { - return (long) this.valueSetForStr.size(); - } - - public BigDecimal getBigDecimalValue() { - return new BigDecimal(this.valueSetForStr.size()); - } - - public Object getValueObject() { - return Integer.valueOf(this.valueSetForStr.size()); - } - - public void setNewValue(Object newValue) { - } - - public boolean isFirstTime() { - return false; - } - - public void writeData(DataOutput output) throws IOException { - int length = this.valueSetForStr.size() * 8; - ByteBuffer byteBuffer = ByteBuffer.allocate(length + 4); - byteBuffer.putInt(length); - for (String val : this.valueSetForStr) { - byte[] b = val.getBytes(Charset.defaultCharset()); - byteBuffer.putInt(b.length); - byteBuffer.put(b); - } - byteBuffer.flip(); - output.write(byteBuffer.array()); - } - - public void readData(DataInput inPut) throws IOException { - int length = inPut.readInt(); - length /= 8; - this.valueSetForStr = new HashSet<String>(length + 1, 1.0F); - for (int i = 0; i < length; i++) { - byte[] b = new byte[inPut.readInt()]; - inPut.readFully(b); - this.valueSetForStr.add(new String(b, Charset.defaultCharset())); - } - } - - public MeasureAggregator getCopy() { - DistinctStringCountAggregator aggregator = new DistinctStringCountAggregator(); - aggregator.valueSetForStr = new HashSet<String>(this.valueSetForStr); - return aggregator; - } - - public int compareTo(MeasureAggregator o) { - double val = getDoubleValue(); - double otherVal = o.getDoubleValue(); - if (val > otherVal) { - return 1; - } - if (val < otherVal) { - return -1; - } - return 0; - } - - @Override public boolean equals(Object obj) { - if(!(obj instanceof DistinctStringCountAggregator)) { - return false; - } - DistinctStringCountAggregator o = (DistinctStringCountAggregator) obj; - return getDoubleValue().equals(o.getDoubleValue()); - } - - @Override public int hashCode() { - return getDoubleValue().hashCode(); - } - - @Override public void agg(Object newVal) { - this.valueSetForStr.add((String) newVal); - } - - @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { - } - - @Override public byte[] getByteArray() { - return null; - } - - @Override public MeasureAggregator get() { - return this; - } - - public String toString() { - return valueSetForStr.size() + ""; - } - - @Override public void merge(byte[] value) { - } - - @Override public MeasureAggregator getNew() { - // TODO Auto-generated method stub - return new DistinctStringCountAggregator(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/DummyBigDecimalAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/DummyBigDecimalAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/DummyBigDecimalAggregator.java deleted file mode 100644 index 9437f14..0000000 --- a/core/src/main/java/org/carbondata/query/aggregator/impl/DummyBigDecimalAggregator.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.carbondata.query.aggregator.impl; - -import java.math.BigDecimal; - -import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; -import org.carbondata.query.aggregator.MeasureAggregator; - -public class DummyBigDecimalAggregator extends AbstractMeasureAggregatorDummy { - private static final long serialVersionUID = 1L; - - /** - * aggregate value - */ - private BigDecimal aggVal; - - @Override public void agg(Object newVal) { - aggVal = (BigDecimal) newVal; - firstTime = false; - } - - @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { - if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { - aggVal = dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index); - firstTime = false; - } - } - - @Override public BigDecimal getBigDecimalValue() { - return aggVal; - } - - @Override public Object getValueObject() { - return aggVal; - } - - @Override public void setNewValue(Object newValue) { - aggVal = (BigDecimal) newValue; - } - - @Override public MeasureAggregator getNew() { - // TODO Auto-generated method stub - return new DummyBigDecimalAggregator(); - } -}