http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/AbstractDistinctCountAggregatorObjectSet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/AbstractDistinctCountAggregatorObjectSet.java b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/AbstractDistinctCountAggregatorObjectSet.java new file mode 100644 index 0000000..0629007 --- /dev/null +++ b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/AbstractDistinctCountAggregatorObjectSet.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.carbondata.query.aggregator.impl.distinct; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.math.BigDecimal; +import java.util.HashSet; +import java.util.Set; + +import org.carbondata.core.constants.CarbonCommonConstants; +import org.carbondata.query.aggregator.MeasureAggregator; + +public abstract class AbstractDistinctCountAggregatorObjectSet implements MeasureAggregator { + + private static final long serialVersionUID = 6313463368629960186L; + + protected Set<Object> valueSetForObj; + + public AbstractDistinctCountAggregatorObjectSet() { + valueSetForObj = new HashSet<Object>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + } + + /** + * just need to add the unique values to agg set + */ + @Override public void agg(double newVal) { + } + + /** + * Distinct count Aggregate function which update the Distinct count + * + * @param newVal new value + */ + @Override public void agg(Object newVal) { + valueSetForObj.add(newVal); + } + + /** + * Below method will be used to get the value byte array + */ + @Override public byte[] getByteArray() { + return null; + } + + @Override public Double getDoubleValue() { + return (double) valueSetForObj.size(); + } + + @Override public Long getLongValue() { + return (long) valueSetForObj.size(); + } + + @Override public BigDecimal getBigDecimalValue() { + return new BigDecimal(valueSetForObj.size()); + } + + @Override public Object getValueObject() { + return valueSetForObj.size(); + } + + @Override public void setNewValue(Object newValue) { + valueSetForObj.add(newValue); + } + + @Override public boolean isFirstTime() { + return false; + } + + @Override public void writeData(DataOutput output) throws IOException { + + } + + @Override public void readData(DataInput inPut) throws IOException { + + } + + public String toString() { + return valueSetForObj.size() + ""; + } + + @Override public void merge(byte[] value) { + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregator.java new file mode 100644 index 0000000..1b2b33d --- /dev/null +++ b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregator.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.carbondata.query.aggregator.impl.distinct; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; + +import org.carbondata.common.logging.LogService; +import org.carbondata.common.logging.LogServiceFactory; +import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; +import org.carbondata.query.aggregator.MeasureAggregator; + +import org.roaringbitmap.IntIterator; +import org.roaringbitmap.RoaringBitmap; + +/** + * * The distinct count aggregator + * Ex: + * ID NAME Sales + * <p>1 a 200 + * <p>2 a 100 + * <p>3 a 200 + * select count(distinct sales) # would result 2 + * select count(sales) # would result 3 + */ +public class DistinctCountAggregator implements MeasureAggregator { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(DistinctCountAggregator.class.getName()); + /** + * + */ + private static final long serialVersionUID = 6313463368629960186L; + /** + * For Spark CARBON to avoid heavy object transfer it better to flatten + * the Aggregators. There is no aggregation expected after setting this value. + */ + private Double computedFixedValue; + /** + * + */ + // private Set<Double> valueSet; + private RoaringBitmap valueSet; + + private byte[] data; + + private double minValue; + + public DistinctCountAggregator(Object minValue) { + valueSet = new RoaringBitmap(); + if (minValue instanceof BigDecimal) { + this.minValue = ((BigDecimal) minValue).doubleValue(); + } else if (minValue instanceof Long) { + this.minValue = ((Long) minValue).doubleValue(); + } else { + this.minValue = (Double) minValue; + } + } + + public DistinctCountAggregator() { + valueSet = new RoaringBitmap(); + } + + /** + * just need to add the unique values to agg set + */ + @Override public void agg(double newVal) { + valueSet.add((int) (newVal - minValue)); + } + + /** + * Distinct count Aggregate function which update the Distinct count + * + * @param newVal new value + */ + @Override public void agg(Object newVal) { + if (newVal instanceof byte[]) { + byte[] values = (byte[]) newVal; + ByteBuffer buffer = ByteBuffer.wrap(values); + buffer.rewind(); + while (buffer.hasRemaining()) { + valueSet.add(buffer.getInt()); + } + return; + } else { + double value = new Double(newVal.toString()); + agg(value); + } + } + + @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { + if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { + valueSet.add((int) dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index)); + } + } + + /** + * Below method will be used to get the value byte array + */ + @Override public byte[] getByteArray() { + if (valueSet.getCardinality() == 0) { + return new byte[0]; + } + IntIterator iterator = valueSet.getIntIterator(); + ByteBuffer buffer = ByteBuffer.allocate(valueSet.getCardinality() * 4 + 8); + buffer.putDouble(minValue); + while (iterator.hasNext()) { + buffer.putInt(iterator.next()); + } + buffer.rewind(); + return buffer.array(); + } + + private void agg(RoaringBitmap set2, double minValue) { + if (this.minValue == minValue) { + valueSet.or(set2); + } else { + if (this.minValue > minValue) { + IntIterator intIterator = valueSet.getIntIterator(); + while (intIterator.hasNext()) { + set2.add((int) ((double) (intIterator.next() + this.minValue) - minValue)); + } + this.minValue = minValue; + this.valueSet = set2; + } else { + IntIterator intIterator = set2.getIntIterator(); + while (intIterator.hasNext()) { + valueSet.add((int) ((double) (intIterator.next() + minValue) - this.minValue)); + } + } + } + } + + /** + * merge the valueset so that we get the count of unique values + */ + @Override public void merge(MeasureAggregator aggregator) { + DistinctCountAggregator distinctCountAggregator = (DistinctCountAggregator) aggregator; + readData(); + distinctCountAggregator.readData(); + if (distinctCountAggregator.valueSet != null) { + agg(distinctCountAggregator.valueSet, distinctCountAggregator.minValue); + } + } + + @Override public Double getDoubleValue() { + if (computedFixedValue == null) { + readData(); + return (double) valueSet.getCardinality(); + } + return computedFixedValue; + } + + @Override public Long getLongValue() { + if (computedFixedValue == null) { + readData(); + return (long) valueSet.getCardinality(); + } + return computedFixedValue.longValue(); + } + + @Override public BigDecimal getBigDecimalValue() { + if (computedFixedValue == null) { + readData(); + return new BigDecimal(valueSet.getCardinality()); + } + return new BigDecimal(computedFixedValue); + } + + @Override public Object getValueObject() { + return valueSet.getCardinality(); + } + + @Override public void setNewValue(Object newValue) { + computedFixedValue = (Double) newValue; + valueSet = null; + } + + @Override public boolean isFirstTime() { + return false; + } + + @Override public void writeData(DataOutput output) throws IOException { + + if (computedFixedValue != null) { + ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 8); + byteBuffer.putInt(-1); + byteBuffer.putDouble(computedFixedValue); + byteBuffer.flip(); + output.write(byteBuffer.array()); + } else { + if (valueSet != null) { + valueSet.serialize(output); + } else { + output.write(data); + } + } + } + + @Override public void readData(DataInput inPut) throws IOException { + valueSet = new RoaringBitmap(); + valueSet.deserialize(inPut); + } + + private void readData() { + if (data != null && (valueSet == null || valueSet.isEmpty())) { + ByteArrayInputStream stream = new ByteArrayInputStream(data); + DataInputStream outputStream = new DataInputStream(stream); + try { + readData(outputStream); + outputStream.close(); + data = null; + } catch (IOException e) { + LOGGER.error(e, e.getMessage()); + } + } + } + + @Override public MeasureAggregator getCopy() { + DistinctCountAggregator aggr = new DistinctCountAggregator(minValue); + aggr.valueSet = valueSet.clone(); + return aggr; + } + + @Override public int compareTo(MeasureAggregator measureAggr) { + double compFixedVal = getDoubleValue(); + double otherVal = measureAggr.getDoubleValue(); + if (compFixedVal > otherVal) { + return 1; + } + if (compFixedVal < otherVal) { + return -1; + } + return 0; + } + + @Override public boolean equals(Object obj) { + if(!(obj instanceof DistinctCountAggregator)) { + return false; + } + DistinctCountAggregator o = (DistinctCountAggregator) obj; + return getDoubleValue().equals(o.getDoubleValue()); + } + + @Override public int hashCode() { + return getDoubleValue().hashCode(); + } + + @Override public MeasureAggregator get() { + ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + DataOutputStream outputStream = new DataOutputStream(byteStream); + try { + writeData(outputStream); + } catch (IOException ex) { + LOGGER.error(ex, ex.getMessage()); + } + data = byteStream.toByteArray(); + valueSet = null; + return this; + } + + public String toString() { + if (computedFixedValue == null) { + readData(); + return valueSet.getCardinality() + ""; + } + return computedFixedValue + ""; + } + + public RoaringBitmap getBitMap() { + return valueSet; + } + + public double getMinValue() { + return minValue; + } + + @Override public void merge(byte[] value) { + if (0 == value.length) { + return; + } + ByteBuffer buffer = ByteBuffer.wrap(value); + buffer.rewind(); + double currentMinValue = buffer.getDouble(); + while (buffer.hasRemaining()) { + agg(buffer.getInt() + currentMinValue); + } + } + + @Override public MeasureAggregator getNew() { + // TODO Auto-generated method stub + return new DistinctCountAggregator(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregatorObjectSet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregatorObjectSet.java b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregatorObjectSet.java new file mode 100644 index 0000000..3b26e53 --- /dev/null +++ b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountAggregatorObjectSet.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.carbondata.query.aggregator.impl.distinct; + +import java.util.HashSet; +import java.util.Set; + +import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; +import org.carbondata.query.aggregator.MeasureAggregator; + +public class DistinctCountAggregatorObjectSet extends AbstractDistinctCountAggregatorObjectSet { + + private static final long serialVersionUID = 6313463368629960186L; + + /** + * just need to add the unique values to agg set + */ + @Override public void agg(double newVal) { + valueSetForObj.add(newVal); + } + + @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { + if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { + valueSetForObj.add(dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index)); + } + } + + private void agg(Set<Object> set2) { + valueSetForObj.addAll(set2); + } + + /** + * merge the valueset so that we get the count of unique values + */ + @Override public void merge(MeasureAggregator aggregator) { + DistinctCountAggregatorObjectSet distinctCountAggregator = + (DistinctCountAggregatorObjectSet) aggregator; + agg(distinctCountAggregator.valueSetForObj); + } + + @Override public MeasureAggregator getCopy() { + DistinctCountAggregatorObjectSet aggregator = new DistinctCountAggregatorObjectSet(); + aggregator.valueSetForObj = new HashSet<Object>(valueSetForObj); + return aggregator; + } + + @Override public int compareTo(MeasureAggregator measureAggr) { + double valueSetForObjSize = getDoubleValue(); + double otherVal = measureAggr.getDoubleValue(); + if (valueSetForObjSize > otherVal) { + return 1; + } + if (valueSetForObjSize < otherVal) { + return -1; + } + return 0; + } + + @Override public boolean equals(Object obj) { + if (!(obj instanceof DistinctCountAggregatorObjectSet)) { + return false; + } + DistinctCountAggregatorObjectSet o = (DistinctCountAggregatorObjectSet) obj; + return getDoubleValue().equals(o.getDoubleValue()); + } + + @Override public int hashCode() { + return getDoubleValue().hashCode(); + } + + @Override public MeasureAggregator get() { + return this; + } + + @Override public MeasureAggregator getNew() { + return new DistinctCountAggregatorObjectSet(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountBigDecimalAggregatorObjectSet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountBigDecimalAggregatorObjectSet.java b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountBigDecimalAggregatorObjectSet.java new file mode 100644 index 0000000..2f44bca --- /dev/null +++ b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountBigDecimalAggregatorObjectSet.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.carbondata.query.aggregator.impl.distinct; + +import java.math.BigDecimal; +import java.util.HashSet; +import java.util.Set; + +import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; +import org.carbondata.query.aggregator.MeasureAggregator; + +public class DistinctCountBigDecimalAggregatorObjectSet + extends AbstractDistinctCountAggregatorObjectSet { + + private static final long serialVersionUID = 6313463368629960186L; + + @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { + if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { + valueSetForObj.add(dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index)); + } + } + + private void agg(Set<Object> set2) { + valueSetForObj.addAll(set2); + } + + /** + * merge the valueset so that we get the count of unique values + */ + @Override public void merge(MeasureAggregator aggregator) { + DistinctCountBigDecimalAggregatorObjectSet distinctCountBigDecimalAggregatorObjectSet = + (DistinctCountBigDecimalAggregatorObjectSet) aggregator; + agg(distinctCountBigDecimalAggregatorObjectSet.valueSetForObj); + } + + @Override public MeasureAggregator getCopy() { + DistinctCountBigDecimalAggregatorObjectSet aggregator = + new DistinctCountBigDecimalAggregatorObjectSet(); + aggregator.valueSetForObj = new HashSet<Object>(valueSetForObj); + return aggregator; + } + + @Override public int compareTo(MeasureAggregator measureAggr) { + BigDecimal valueSetForObjSize = getBigDecimalValue(); + BigDecimal otherVal = measureAggr.getBigDecimalValue(); + return valueSetForObjSize.compareTo(otherVal); + } + + @Override public boolean equals(Object obj) { + if (!(obj instanceof DistinctCountBigDecimalAggregatorObjectSet)) { + return false; + } + DistinctCountBigDecimalAggregatorObjectSet o = (DistinctCountBigDecimalAggregatorObjectSet) obj; + return getBigDecimalValue().equals(o.getBigDecimalValue()); + } + + @Override public int hashCode() { + return getBigDecimalValue().hashCode(); + } + + @Override public MeasureAggregator get() { + return this; + } + + @Override public MeasureAggregator getNew() { + return new DistinctCountBigDecimalAggregatorObjectSet(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountLongAggregatorObjectSet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountLongAggregatorObjectSet.java b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountLongAggregatorObjectSet.java new file mode 100644 index 0000000..c4f7216 --- /dev/null +++ b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctCountLongAggregatorObjectSet.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.carbondata.query.aggregator.impl.distinct; + +import java.util.HashSet; +import java.util.Set; + +import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; +import org.carbondata.query.aggregator.MeasureAggregator; + +public class DistinctCountLongAggregatorObjectSet extends AbstractDistinctCountAggregatorObjectSet { + + private static final long serialVersionUID = 6313463368629960186L; + + @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { + if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { + valueSetForObj.add(dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index)); + } + } + + private void agg(Set<Object> set2) { + valueSetForObj.addAll(set2); + } + + /** + * merge the valueset so that we get the count of unique values + */ + @Override public void merge(MeasureAggregator aggregator) { + DistinctCountLongAggregatorObjectSet distinctCountAggregator = + (DistinctCountLongAggregatorObjectSet) aggregator; + agg(distinctCountAggregator.valueSetForObj); + } + + @Override public MeasureAggregator getCopy() { + DistinctCountLongAggregatorObjectSet aggregator = new DistinctCountLongAggregatorObjectSet(); + aggregator.valueSetForObj = new HashSet<Object>(valueSetForObj); + return aggregator; + } + + @Override public int compareTo(MeasureAggregator measureAggr) { + long valueSetForObjSize = getLongValue(); + long otherVal = measureAggr.getLongValue(); + if (valueSetForObjSize > otherVal) { + return 1; + } + if (valueSetForObjSize < otherVal) { + return -1; + } + return 0; + } + + @Override public boolean equals(Object obj) { + if (!(obj instanceof DistinctCountLongAggregatorObjectSet)) { + return false; + } + DistinctCountLongAggregatorObjectSet o = (DistinctCountLongAggregatorObjectSet) obj; + return getLongValue().equals(o.getLongValue()); + } + + @Override public int hashCode() { + return getLongValue().hashCode(); + } + + @Override public MeasureAggregator get() { + return this; + } + + @Override public MeasureAggregator getNew() { + return new DistinctCountLongAggregatorObjectSet(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctStringCountAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctStringCountAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctStringCountAggregator.java new file mode 100644 index 0000000..e3d4623 --- /dev/null +++ b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/DistinctStringCountAggregator.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.carbondata.query.aggregator.impl.distinct; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.HashSet; +import java.util.Set; + +import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; +import org.carbondata.core.constants.CarbonCommonConstants; +import org.carbondata.query.aggregator.MeasureAggregator; + +public class DistinctStringCountAggregator implements MeasureAggregator { + private static final long serialVersionUID = 6313463368629960186L; + + private Set<String> valueSetForStr; + + public DistinctStringCountAggregator() { + this.valueSetForStr = new HashSet<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + } + + public void agg(double newVal) { + } + + public void agg(String newVal) { + this.valueSetForStr.add(newVal); + } + + private void agg(Set<String> set2) { + this.valueSetForStr.addAll(set2); + } + + public void merge(MeasureAggregator aggregator) { + DistinctStringCountAggregator distinctCountAggregator = + (DistinctStringCountAggregator) aggregator; + agg(distinctCountAggregator.valueSetForStr); + } + + public Double getDoubleValue() { + return (double) this.valueSetForStr.size(); + } + + public Long getLongValue() { + return (long) this.valueSetForStr.size(); + } + + public BigDecimal getBigDecimalValue() { + return new BigDecimal(this.valueSetForStr.size()); + } + + public Object getValueObject() { + return Integer.valueOf(this.valueSetForStr.size()); + } + + public void setNewValue(Object newValue) { + } + + public boolean isFirstTime() { + return false; + } + + public void writeData(DataOutput output) throws IOException { + int length = this.valueSetForStr.size() * 8; + ByteBuffer byteBuffer = ByteBuffer.allocate(length + 4); + byteBuffer.putInt(length); + for (String val : this.valueSetForStr) { + byte[] b = val.getBytes(Charset.defaultCharset()); + byteBuffer.putInt(b.length); + byteBuffer.put(b); + } + byteBuffer.flip(); + output.write(byteBuffer.array()); + } + + public void readData(DataInput inPut) throws IOException { + int length = inPut.readInt(); + length /= 8; + this.valueSetForStr = new HashSet<String>(length + 1, 1.0F); + for (int i = 0; i < length; i++) { + byte[] b = new byte[inPut.readInt()]; + inPut.readFully(b); + this.valueSetForStr.add(new String(b, Charset.defaultCharset())); + } + } + + public MeasureAggregator getCopy() { + DistinctStringCountAggregator aggregator = new DistinctStringCountAggregator(); + aggregator.valueSetForStr = new HashSet<String>(this.valueSetForStr); + return aggregator; + } + + public int compareTo(MeasureAggregator o) { + double val = getDoubleValue(); + double otherVal = o.getDoubleValue(); + if (val > otherVal) { + return 1; + } + if (val < otherVal) { + return -1; + } + return 0; + } + + @Override public boolean equals(Object obj) { + if(!(obj instanceof DistinctStringCountAggregator)) { + return false; + } + DistinctStringCountAggregator o = (DistinctStringCountAggregator) obj; + return getDoubleValue().equals(o.getDoubleValue()); + } + + @Override public int hashCode() { + return getDoubleValue().hashCode(); + } + + @Override public void agg(Object newVal) { + this.valueSetForStr.add((String) newVal); + } + + @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { + } + + @Override public byte[] getByteArray() { + return null; + } + + @Override public MeasureAggregator get() { + return this; + } + + public String toString() { + return valueSetForStr.size() + ""; + } + + @Override public void merge(byte[] value) { + } + + @Override public MeasureAggregator getNew() { + // TODO Auto-generated method stub + return new DistinctStringCountAggregator(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctBigDecimalAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctBigDecimalAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctBigDecimalAggregator.java new file mode 100644 index 0000000..6a59ec9 --- /dev/null +++ b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctBigDecimalAggregator.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.carbondata.query.aggregator.impl.distinct; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; +import org.carbondata.core.constants.CarbonCommonConstants; +import org.carbondata.core.util.DataTypeUtil; +import org.carbondata.query.aggregator.MeasureAggregator; +import org.carbondata.query.aggregator.impl.AbstractMeasureAggregatorBasic; + +/** + * The sum distinct aggregator + * Ex: + * ID NAME Sales + * 1 a 200 + * 2 a 100 + * 3 a 200 + * select sum(distinct sales) # would result 300 + */ +public class SumDistinctBigDecimalAggregator extends AbstractMeasureAggregatorBasic { + + /** + * + */ + private static final long serialVersionUID = 6313463368629960155L; + + /** + * For Spark CARBON to avoid heavy object transfer it better to flatten the + * Aggregators. There is no aggregation expected after setting this value. + */ + private BigDecimal computedFixedValue; + + /** + * + */ + private Set<BigDecimal> valueSet; + + public SumDistinctBigDecimalAggregator() { + valueSet = new HashSet<BigDecimal>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + } + + /** + * Distinct Aggregate function which update the Distinct set + * + * @param newVal new value + */ + @Override public void agg(Object newVal) { + valueSet.add( + newVal instanceof BigDecimal ? (BigDecimal) newVal : new BigDecimal(newVal.toString())); + firstTime = false; + } + + @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { + if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { + valueSet.add(dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index)); + firstTime = false; + } + } + + /** + * Below method will be used to get the value byte array + */ + @Override public byte[] getByteArray() { + Iterator<BigDecimal> iterator = valueSet.iterator(); + ByteBuffer buffer = + ByteBuffer.allocate(valueSet.size() * CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE); + while (iterator.hasNext()) { + byte[] bytes = DataTypeUtil.bigDecimalToByte(iterator.next()); + buffer.putInt(bytes.length); + buffer.put(bytes); + } + buffer.rewind(); + return buffer.array(); + } + + private void agg(Set<BigDecimal> set2) { + valueSet.addAll(set2); + } + + /** + * merge the valueset so that we get the count of unique values + */ + @Override public void merge(MeasureAggregator aggregator) { + SumDistinctBigDecimalAggregator distinctAggregator = + (SumDistinctBigDecimalAggregator) aggregator; + if (!aggregator.isFirstTime()) { + agg(distinctAggregator.valueSet); + firstTime = false; + } + } + + @Override public BigDecimal getBigDecimalValue() { + if (computedFixedValue == null) { + BigDecimal result = new BigDecimal(0); + for (BigDecimal aValue : valueSet) { + result = result.add(aValue); + } + return result; + } + return computedFixedValue; + } + + @Override public Object getValueObject() { + return getBigDecimalValue(); + } + + @Override public void setNewValue(Object newValue) { + computedFixedValue = (BigDecimal) newValue; + valueSet = null; + } + + @Override public boolean isFirstTime() { + return firstTime; + } + + @Override public void writeData(DataOutput dataOutput) throws IOException { + if (computedFixedValue != null) { + byte[] bytes = DataTypeUtil.bigDecimalToByte(computedFixedValue); + ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 4 + bytes.length); + byteBuffer.putInt(-1); + byteBuffer.putInt(bytes.length); + byteBuffer.put(bytes); + byteBuffer.flip(); + dataOutput.write(byteBuffer.array()); + } else { + int length = valueSet.size() * 8 + valueSet.size() * 4; + ByteBuffer byteBuffer = ByteBuffer.allocate(length + 4 + 1); + byteBuffer.putInt(length); + for (BigDecimal val : valueSet) { + byte[] bytes = + val.toString().getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)); + byteBuffer.putInt(-1); + byteBuffer.putInt(bytes.length); + byteBuffer.put(bytes); + } + byteBuffer.flip(); + dataOutput.write(byteBuffer.array()); + } + } + + @Override public void readData(DataInput inPut) throws IOException { + int length = inPut.readInt(); + + if (length == -1) { + computedFixedValue = new BigDecimal(inPut.readUTF()); + valueSet = null; + } else { + length = length / 8; + valueSet = new HashSet<BigDecimal>(length + 1, 1.0f); + for (int i = 0; i < length; i++) { + valueSet.add(new BigDecimal(inPut.readUTF())); + } + } + } + + @Override public void merge(byte[] value) { + if (0 == value.length) { + return; + } + ByteBuffer buffer = ByteBuffer.wrap(value); + buffer.rewind(); + while (buffer.hasRemaining()) { + byte[] valueByte = new byte[buffer.getInt()]; + buffer.get(valueByte); + BigDecimal valueBigDecimal = DataTypeUtil.byteToBigDecimal(valueByte); + agg(valueBigDecimal); + } + } + + public String toString() { + if (computedFixedValue == null) { + return valueSet.size() + ""; + } + return computedFixedValue + ""; + } + + @Override public MeasureAggregator getCopy() { + SumDistinctBigDecimalAggregator aggregator = new SumDistinctBigDecimalAggregator(); + aggregator.valueSet = new HashSet<BigDecimal>(valueSet); + return aggregator; + } + + @Override public int compareTo(MeasureAggregator msr) { + BigDecimal msrValObj = getBigDecimalValue(); + BigDecimal otherVal = msr.getBigDecimalValue(); + + return msrValObj.compareTo(otherVal); + } + + @Override public boolean equals(Object obj) { + if (!(obj instanceof SumDistinctBigDecimalAggregator)) { + return false; + } + SumDistinctBigDecimalAggregator o = (SumDistinctBigDecimalAggregator) obj; + return getBigDecimalValue().equals(o.getBigDecimalValue()); + } + + @Override public int hashCode() { + return getBigDecimalValue().hashCode(); + } + + @Override public MeasureAggregator getNew() { + // TODO Auto-generated method stub + return new SumDistinctBigDecimalAggregator(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctDoubleAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctDoubleAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctDoubleAggregator.java new file mode 100644 index 0000000..0229f24 --- /dev/null +++ b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctDoubleAggregator.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.carbondata.query.aggregator.impl.distinct; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; +import org.carbondata.core.constants.CarbonCommonConstants; +import org.carbondata.query.aggregator.MeasureAggregator; +import org.carbondata.query.aggregator.impl.AbstractMeasureAggregatorBasic; + +/** + * The sum distinct aggregator + * Ex: + * ID NAME Sales + * 1 a 200 + * 2 a 100 + * 3 a 200 + * select sum(distinct sales) # would result 300 + */ + +public class SumDistinctDoubleAggregator extends AbstractMeasureAggregatorBasic { + + /** + * + */ + private static final long serialVersionUID = 6313463368629960155L; + + /** + * For Spark CARBON to avoid heavy object transfer it better to flatten the + * Aggregators. There is no aggregation expected after setting this value. + */ + private Double computedFixedValue; + + /** + * + */ + private Set<Double> valueSet; + + public SumDistinctDoubleAggregator() { + valueSet = new HashSet<Double>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + } + + /** + * just need to add the unique values to agg set + */ + @Override public void agg(double newVal) { + valueSet.add(newVal); + firstTime = false; + } + + /** + * Distinct Aggregate function which update the Distinct set + * + * @param newVal new value + */ + @Override public void agg(Object newVal) { + valueSet.add(newVal instanceof Double ? (Double) newVal : new Double(newVal.toString())); + firstTime = false; + } + + @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { + if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { + valueSet.add(dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index)); + firstTime = false; + } + } + + /** + * Below method will be used to get the value byte array + */ + @Override public byte[] getByteArray() { + Iterator<Double> iterator = valueSet.iterator(); + ByteBuffer buffer = + ByteBuffer.allocate(valueSet.size() * CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE); + while (iterator.hasNext()) { + buffer.putDouble(iterator.next()); + } + buffer.rewind(); + return buffer.array(); + } + + private void agg(Set<Double> set2) { + valueSet.addAll(set2); + } + + /** + * merge the valueset so that we get the count of unique values + */ + @Override public void merge(MeasureAggregator aggregator) { + SumDistinctDoubleAggregator distinctAggregator = (SumDistinctDoubleAggregator) aggregator; + if (!aggregator.isFirstTime()) { + agg(distinctAggregator.valueSet); + firstTime = false; + } + } + + @Override public Double getDoubleValue() { + if (computedFixedValue == null) { + double result = 0; + for (Double aValue : valueSet) { + result += aValue; + } + return result; + } + return computedFixedValue; + } + + @Override public Object getValueObject() { + return getDoubleValue(); + } + + @Override public void setNewValue(Object newValue) { + computedFixedValue = (Double) newValue; + valueSet = null; + } + + @Override public boolean isFirstTime() { + return firstTime; + } + + @Override public void writeData(DataOutput dataOutput) throws IOException { + if (computedFixedValue != null) { + ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 8); + byteBuffer.putInt(-1); + byteBuffer.putDouble(computedFixedValue); + byteBuffer.flip(); + dataOutput.write(byteBuffer.array()); + } else { + int length = valueSet.size() * 8; + ByteBuffer byteBuffer = ByteBuffer.allocate(length + 4 + 1); + byteBuffer.putInt(length); + for (double val : valueSet) { + byteBuffer.putDouble(val); + } + byteBuffer.flip(); + dataOutput.write(byteBuffer.array()); + } + } + + @Override public void readData(DataInput inPut) throws IOException { + int length = inPut.readInt(); + + if (length == -1) { + computedFixedValue = inPut.readDouble(); + valueSet = null; + } else { + length = length / 8; + valueSet = new HashSet<Double>(length + 1, 1.0f); + for (int i = 0; i < length; i++) { + valueSet.add(inPut.readDouble()); + } + } + } + + @Override public void merge(byte[] value) { + if (0 == value.length) { + return; + } + ByteBuffer buffer = ByteBuffer.wrap(value); + buffer.rewind(); + while (buffer.hasRemaining()) { + agg(buffer.getDouble()); + } + } + + public String toString() { + if (computedFixedValue == null) { + return valueSet.size() + ""; + } + return computedFixedValue + ""; + } + + @Override public MeasureAggregator getCopy() { + SumDistinctDoubleAggregator aggregator = new SumDistinctDoubleAggregator(); + aggregator.valueSet = new HashSet<Double>(valueSet); + return aggregator; + } + + @Override public int compareTo(MeasureAggregator msr) { + double msrValObj = getDoubleValue(); + double otherVal = msr.getDoubleValue(); + if (msrValObj > otherVal) { + return 1; + } + if (msrValObj < otherVal) { + return -1; + } + return 0; + } + + @Override public boolean equals(Object obj) { + if (!(obj instanceof SumDistinctDoubleAggregator)) { + return false; + } + SumDistinctDoubleAggregator o = (SumDistinctDoubleAggregator) obj; + return getDoubleValue().equals(o.getDoubleValue()); + } + + @Override public int hashCode() { + return getDoubleValue().hashCode(); + } + + @Override public MeasureAggregator getNew() { + // TODO Auto-generated method stub + return new SumDistinctDoubleAggregator(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctLongAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctLongAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctLongAggregator.java new file mode 100644 index 0000000..d57c34b --- /dev/null +++ b/core/src/main/java/org/carbondata/query/aggregator/impl/distinct/SumDistinctLongAggregator.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.carbondata.query.aggregator.impl.distinct; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; +import org.carbondata.core.constants.CarbonCommonConstants; +import org.carbondata.query.aggregator.MeasureAggregator; +import org.carbondata.query.aggregator.impl.AbstractMeasureAggregatorBasic; + +/** + * The sum distinct aggregator + * Ex: + * ID NAME Sales + * 1 a 200 + * 2 a 100 + * 3 a 200 + * select sum(distinct sales) # would result 300 + */ + +public class SumDistinctLongAggregator extends AbstractMeasureAggregatorBasic { + + private static final long serialVersionUID = 6313463368629960155L; + + /** + * For Spark CARBON to avoid heavy object transfer it better to flatten the + * Aggregators. There is no aggregation expected after setting this value. + */ + private Long computedFixedValue; + + /** + * + */ + private Set<Long> valueSet; + + public SumDistinctLongAggregator() { + valueSet = new HashSet<Long>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + } + + /** + * Distinct Aggregate function which update the Distinct set + * + * @param newVal new value + */ + @Override public void agg(Object newVal) { + valueSet.add(newVal instanceof Long ? (Long) newVal : Long.valueOf(newVal.toString())); + firstTime = false; + } + + @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { + if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { + valueSet.add(dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index)); + firstTime = false; + } + } + + /** + * Below method will be used to get the value byte array + */ + @Override public byte[] getByteArray() { + Iterator<Long> iterator = valueSet.iterator(); + ByteBuffer buffer = + ByteBuffer.allocate(valueSet.size() * CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE); + while (iterator.hasNext()) { + buffer.putLong(iterator.next()); + } + buffer.rewind(); + return buffer.array(); + } + + private void agg(Set<Long> set2) { + valueSet.addAll(set2); + } + + /** + * merge the valueset so that we get the count of unique values + */ + @Override public void merge(MeasureAggregator aggregator) { + SumDistinctLongAggregator distinctAggregator = (SumDistinctLongAggregator) aggregator; + if (!aggregator.isFirstTime()) { + agg(distinctAggregator.valueSet); + firstTime = false; + } + } + + @Override public Long getLongValue() { + if (computedFixedValue == null) { + long result = 0; + for (Long aValue : valueSet) { + result += aValue; + } + return result; + } + return computedFixedValue; + } + + @Override public Object getValueObject() { + return getLongValue(); + } + + @Override public void setNewValue(Object newValue) { + computedFixedValue = (Long) newValue; + valueSet = null; + } + + @Override public boolean isFirstTime() { + return firstTime; + } + + @Override public void writeData(DataOutput dataOutput) throws IOException { + if (computedFixedValue != null) { + ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 8); + byteBuffer.putInt(-1); + byteBuffer.putLong(computedFixedValue); + byteBuffer.flip(); + dataOutput.write(byteBuffer.array()); + } else { + int length = valueSet.size() * 8; + ByteBuffer byteBuffer = ByteBuffer.allocate(length + 4 + 1); + byteBuffer.putInt(length); + for (long val : valueSet) { + byteBuffer.putLong(val); + } + byteBuffer.flip(); + dataOutput.write(byteBuffer.array()); + } + } + + @Override public void readData(DataInput inPut) throws IOException { + int length = inPut.readInt(); + + if (length == -1) { + computedFixedValue = inPut.readLong(); + valueSet = null; + } else { + length = length / 8; + valueSet = new HashSet<Long>(length + 1, 1.0f); + for (int i = 0; i < length; i++) { + valueSet.add(inPut.readLong()); + } + } + + } + + @Override public void merge(byte[] value) { + if (0 == value.length) { + return; + } + ByteBuffer buffer = ByteBuffer.wrap(value); + buffer.rewind(); + while (buffer.hasRemaining()) { + agg(buffer.getLong()); + } + } + + public String toString() { + if (computedFixedValue == null) { + return valueSet.size() + ""; + } + return computedFixedValue + ""; + } + + @Override public MeasureAggregator getCopy() { + SumDistinctLongAggregator aggregator = new SumDistinctLongAggregator(); + aggregator.valueSet = new HashSet<Long>(valueSet); + return aggregator; + } + + @Override public int compareTo(MeasureAggregator msr) { + long msrValObj = getLongValue(); + long otherVal = msr.getLongValue(); + if (msrValObj > otherVal) { + return 1; + } + if (msrValObj < otherVal) { + return -1; + } + return 0; + } + + @Override public boolean equals(Object obj) { + if (!(obj instanceof SumDistinctLongAggregator)) { + return false; + } + SumDistinctLongAggregator o = (SumDistinctLongAggregator) obj; + return getLongValue().equals(o.getLongValue()); + } + + @Override public int hashCode() { + return getLongValue().hashCode(); + } + + @Override public MeasureAggregator getNew() { + // TODO Auto-generated method stub + return new SumDistinctLongAggregator(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/AbstractMeasureAggregatorDummy.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/AbstractMeasureAggregatorDummy.java b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/AbstractMeasureAggregatorDummy.java new file mode 100644 index 0000000..f423085 --- /dev/null +++ b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/AbstractMeasureAggregatorDummy.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.carbondata.query.aggregator.impl.dummy; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.carbondata.query.aggregator.MeasureAggregator; +import org.carbondata.query.aggregator.impl.AbstractMeasureAggregatorBasic; + +/** + * AbstractMeasureAggregatorDummy + * Used for custom Carbon Aggregator dummy + */ +public abstract class AbstractMeasureAggregatorDummy extends AbstractMeasureAggregatorBasic { + private static final long serialVersionUID = 1L; + + @Override public int compareTo(MeasureAggregator o) { + if (equals(o)) { + return 0; + } + return -1; + } + + @Override public boolean equals(Object arg0) { + return super.equals(arg0); + } + + @Override public int hashCode() { + return super.hashCode(); + } + + @Override public byte[] getByteArray() { + return null; + } + + @Override public void merge(MeasureAggregator aggregator) { + } + + @Override public MeasureAggregator getCopy() { + return null; + } + + @Override public void writeData(DataOutput output) throws IOException { + } + + @Override public void readData(DataInput inPut) throws IOException { + } + + @Override public void merge(byte[] value) { + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyBigDecimalAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyBigDecimalAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyBigDecimalAggregator.java new file mode 100644 index 0000000..8a33fe1 --- /dev/null +++ b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyBigDecimalAggregator.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.carbondata.query.aggregator.impl.dummy; + +import java.math.BigDecimal; + +import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; +import org.carbondata.query.aggregator.MeasureAggregator; + +public class DummyBigDecimalAggregator extends AbstractMeasureAggregatorDummy { + private static final long serialVersionUID = 1L; + + /** + * aggregate value + */ + private BigDecimal aggVal; + + @Override public void agg(Object newVal) { + aggVal = (BigDecimal) newVal; + firstTime = false; + } + + @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { + if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { + aggVal = dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index); + firstTime = false; + } + } + + @Override public BigDecimal getBigDecimalValue() { + return aggVal; + } + + @Override public Object getValueObject() { + return aggVal; + } + + @Override public void setNewValue(Object newValue) { + aggVal = (BigDecimal) newValue; + } + + @Override public MeasureAggregator getNew() { + // TODO Auto-generated method stub + return new DummyBigDecimalAggregator(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyDoubleAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyDoubleAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyDoubleAggregator.java new file mode 100644 index 0000000..cd4fe56 --- /dev/null +++ b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyDoubleAggregator.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.carbondata.query.aggregator.impl.dummy; + +import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; +import org.carbondata.query.aggregator.MeasureAggregator; + +public class DummyDoubleAggregator extends AbstractMeasureAggregatorDummy { + private static final long serialVersionUID = 1L; + + /** + * aggregate value + */ + private double aggVal; + + @Override public void agg(double newVal) { + aggVal = newVal; + firstTime = false; + } + + @Override public void agg(Object newVal) { + aggVal = (Double) newVal; + firstTime = false; + } + + @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { + if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { + aggVal = dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index); + firstTime = false; + } + } + + @Override public Double getDoubleValue() { + return aggVal; + } + + @Override public Object getValueObject() { + return aggVal; + } + + @Override public void setNewValue(Object newValue) { + aggVal = (Double) newValue; + } + + @Override public MeasureAggregator getNew() { + return new DummyDoubleAggregator(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyLongAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyLongAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyLongAggregator.java new file mode 100644 index 0000000..4131895 --- /dev/null +++ b/core/src/main/java/org/carbondata/query/aggregator/impl/dummy/DummyLongAggregator.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.carbondata.query.aggregator.impl.dummy; + +import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; +import org.carbondata.query.aggregator.MeasureAggregator; + +public class DummyLongAggregator extends AbstractMeasureAggregatorDummy { + private static final long serialVersionUID = 1L; + + /** + * aggregate value + */ + private long aggVal; + + @Override public void agg(Object newVal) { + aggVal = (Long) newVal; + firstTime = false; + } + + @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { + if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { + aggVal = dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index); + firstTime = false; + } + } + + @Override public Long getLongValue() { + return aggVal; + } + + @Override public Object getValueObject() { + return aggVal; + } + + @Override public void setNewValue(Object newValue) { + aggVal = (Long) newValue; + } + + @Override public MeasureAggregator getNew() { + return new DummyLongAggregator(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/max/AbstractMaxAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/max/AbstractMaxAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/max/AbstractMaxAggregator.java new file mode 100644 index 0000000..27b1876 --- /dev/null +++ b/core/src/main/java/org/carbondata/query/aggregator/impl/max/AbstractMaxAggregator.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.carbondata.query.aggregator.impl.max; + +import java.io.ByteArrayInputStream; +import java.io.ObjectInput; +import java.io.ObjectInputStream; + +import org.carbondata.common.logging.LogService; +import org.carbondata.common.logging.LogServiceFactory; +import org.carbondata.core.util.CarbonUtil; +import org.carbondata.query.aggregator.impl.AbstractMeasureAggregatorMaxMin; + +public abstract class AbstractMaxAggregator extends AbstractMeasureAggregatorMaxMin { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(AbstractMaxAggregator.class.getName()); + + protected void internalAgg(Object value) { + if (value instanceof Comparable) { + @SuppressWarnings("unchecked") Comparable<Object> newValue = ((Comparable<Object>) value); + aggVal = (aggVal == null || aggVal.compareTo(newValue) < 0) ? newValue : aggVal; + } + } + + @Override public void merge(byte[] value) { + if (0 == value.length) { + return; + } + ByteArrayInputStream bytesInputStream = null; + ObjectInput in = null; + try { + bytesInputStream = new ByteArrayInputStream(value); + in = new ObjectInputStream(bytesInputStream); + Object newVal = (Comparable<Object>) in.readObject(); + internalAgg(newVal); + firstTime = false; + } catch (Exception e) { + LOGGER.error(e, "Problem while merging byte array in maxAggregator: " + e.getMessage()); + } finally { + CarbonUtil.closeStreams(bytesInputStream); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxAggregator.java new file mode 100644 index 0000000..d0d8af0 --- /dev/null +++ b/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxAggregator.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.carbondata.query.aggregator.impl.max; + +import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; +import org.carbondata.query.aggregator.MeasureAggregator; + +/** + * Class Description : + * It will return max of values + */ +public class MaxAggregator extends AbstractMaxAggregator { + + /** + * serialVersionUID + */ + private static final long serialVersionUID = -5850218739083899419L; + + @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { + if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { + internalAgg(dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index)); + firstTime = false; + } + } + + /** + * Merge the value, it will update the max aggregate value if aggregator + * passed as an argument will have value greater than aggVal + * + * @param aggregator MaxAggregator + */ + @Override public void merge(MeasureAggregator aggregator) { + MaxAggregator maxAggregator = (MaxAggregator) aggregator; + if (!aggregator.isFirstTime()) { + agg(maxAggregator.aggVal); + firstTime = false; + } + + } + + @Override public MeasureAggregator getCopy() { + MaxAggregator aggregator = new MaxAggregator(); + aggregator.aggVal = aggVal; + aggregator.firstTime = firstTime; + return aggregator; + } + + @Override public MeasureAggregator getNew() { + return new MaxAggregator(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxBigDecimalAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxBigDecimalAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxBigDecimalAggregator.java new file mode 100644 index 0000000..c4149c6 --- /dev/null +++ b/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxBigDecimalAggregator.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.carbondata.query.aggregator.impl.max; + +import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; +import org.carbondata.query.aggregator.MeasureAggregator; + +/** + * Class Description : + * It will return max of values + */ +public class MaxBigDecimalAggregator extends AbstractMaxAggregator { + + /** + * serialVersionUID + */ + private static final long serialVersionUID = -5850218739083899419L; + + @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { + if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { + internalAgg(dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index)); + firstTime = false; + } + } + + /** + * Merge the value, it will update the max aggregate value if aggregator + * passed as an argument will have value greater than aggVal + * + * @param aggregator MaxAggregator + */ + @Override public void merge(MeasureAggregator aggregator) { + MaxAggregator maxAggregator = (MaxAggregator) aggregator; + if (!aggregator.isFirstTime()) { + agg(maxAggregator.aggVal); + firstTime = false; + } + } + + @Override public MeasureAggregator getCopy() { + MaxAggregator aggregator = new MaxAggregator(); + aggregator.aggVal = aggVal; + aggregator.firstTime = firstTime; + return aggregator; + } + + @Override public MeasureAggregator getNew() { + return new MaxBigDecimalAggregator(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxLongAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxLongAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxLongAggregator.java new file mode 100644 index 0000000..55b3be5 --- /dev/null +++ b/core/src/main/java/org/carbondata/query/aggregator/impl/max/MaxLongAggregator.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.carbondata.query.aggregator.impl.max; + +import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; +import org.carbondata.query.aggregator.MeasureAggregator; + +/** + * Class Description : + * It will return max of values + */ +public class MaxLongAggregator extends AbstractMaxAggregator { + + /** + * serialVersionUID + */ + private static final long serialVersionUID = -5850218739083899419L; + + @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { + if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { + internalAgg(dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index)); + firstTime = false; + } + } + + /** + * Merge the value, it will update the max aggregate value if aggregator + * passed as an argument will have value greater than aggVal + * + * @param aggregator MaxAggregator + */ + @Override public void merge(MeasureAggregator aggregator) { + MaxAggregator maxAggregator = (MaxAggregator) aggregator; + if (!aggregator.isFirstTime()) { + agg(maxAggregator.aggVal); + firstTime = false; + } + } + + @Override public MeasureAggregator getCopy() { + MaxAggregator aggregator = new MaxAggregator(); + aggregator.aggVal = aggVal; + aggregator.firstTime = firstTime; + return aggregator; + } + + @Override public MeasureAggregator getNew() { + return new MaxLongAggregator(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/min/AbstractMinAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/min/AbstractMinAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/min/AbstractMinAggregator.java new file mode 100644 index 0000000..77aa163 --- /dev/null +++ b/core/src/main/java/org/carbondata/query/aggregator/impl/min/AbstractMinAggregator.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.carbondata.query.aggregator.impl.min; + +import java.io.ByteArrayInputStream; +import java.io.ObjectInput; +import java.io.ObjectInputStream; + +import org.carbondata.common.logging.LogService; +import org.carbondata.common.logging.LogServiceFactory; +import org.carbondata.core.util.CarbonUtil; +import org.carbondata.query.aggregator.impl.AbstractMeasureAggregatorMaxMin; + +public abstract class AbstractMinAggregator extends AbstractMeasureAggregatorMaxMin { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(AbstractMinAggregator.class.getName()); + + protected void internalAgg(Object value) { + if (value instanceof Comparable) { + @SuppressWarnings("unchecked") Comparable<Object> newValue = ((Comparable<Object>) value); + aggVal = (aggVal == null || aggVal.compareTo(newValue) > 0) ? newValue : aggVal; + } + } + + @Override public void merge(byte[] value) { + if (0 == value.length) { + return; + } + ByteArrayInputStream bis = null; + ObjectInput objectInput = null; + try { + bis = new ByteArrayInputStream(value); + objectInput = new ObjectInputStream(bis); + Object newVal = (Comparable<Object>) objectInput.readObject(); + internalAgg(newVal); + firstTime = false; + } catch (Exception e) { + LOGGER.error(e, "Problem while merging byte array in minAggregator: " + e.getMessage()); + } finally { + CarbonUtil.closeStreams(bis); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinAggregator.java new file mode 100644 index 0000000..4bb3d73 --- /dev/null +++ b/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinAggregator.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.carbondata.query.aggregator.impl.min; + +import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; +import org.carbondata.query.aggregator.MeasureAggregator; + +/** + * Class Description : It will return min of values + */ +public class MinAggregator extends AbstractMinAggregator { + + /** + * serialVersionUID + */ + private static final long serialVersionUID = -8077547753784906280L; + + @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { + if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { + internalAgg(dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index)); + firstTime = false; + } + } + + /** + * Merge the value, it will update the min aggregate value if aggregator + * passed as an argument will have value less than aggVal + * + * @param aggregator MinAggregator + */ + @Override public void merge(MeasureAggregator aggregator) { + MinAggregator minAggregator = (MinAggregator) aggregator; + if (!aggregator.isFirstTime()) { + agg(minAggregator.aggVal); + firstTime = false; + } + } + + @Override public MeasureAggregator getCopy() { + MinAggregator aggregator = new MinAggregator(); + aggregator.aggVal = aggVal; + aggregator.firstTime = firstTime; + return aggregator; + } + + @Override public MeasureAggregator getNew() { + return new MinAggregator(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinBigDecimalAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinBigDecimalAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinBigDecimalAggregator.java new file mode 100644 index 0000000..7347dc1 --- /dev/null +++ b/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinBigDecimalAggregator.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.carbondata.query.aggregator.impl.min; + +import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; +import org.carbondata.query.aggregator.MeasureAggregator; + +/** + * Class Description : It will return min of values + */ +public class MinBigDecimalAggregator extends AbstractMinAggregator { + + /** + * serialVersionUID + */ + private static final long serialVersionUID = -8077547753784906280L; + + @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { + if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { + internalAgg(dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index)); + firstTime = false; + } + } + + /** + * Merge the value, it will update the min aggregate value if aggregator + * passed as an argument will have value less than aggVal + * + * @param aggregator MinAggregator + */ + @Override public void merge(MeasureAggregator aggregator) { + MinAggregator minAggregator = (MinAggregator) aggregator; + if (!aggregator.isFirstTime()) { + agg(minAggregator.aggVal); + firstTime = false; + } + } + + @Override public MeasureAggregator getCopy() { + MinAggregator aggregator = new MinAggregator(); + aggregator.aggVal = aggVal; + aggregator.firstTime = firstTime; + return aggregator; + } + + @Override public MeasureAggregator getNew() { + return new MinBigDecimalAggregator(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinLongAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinLongAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinLongAggregator.java new file mode 100644 index 0000000..af614c9 --- /dev/null +++ b/core/src/main/java/org/carbondata/query/aggregator/impl/min/MinLongAggregator.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.carbondata.query.aggregator.impl.min; + +import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; +import org.carbondata.query.aggregator.MeasureAggregator; + +/** + * Class Description : It will return min of values + */ +public class MinLongAggregator extends AbstractMinAggregator { + + /** + * serialVersionUID + */ + private static final long serialVersionUID = -8077547753784906280L; + + @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { + if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { + internalAgg(dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index)); + firstTime = false; + } + } + + /** + * Merge the value, it will update the min aggregate value if aggregator + * passed as an argument will have value less than aggVal + * + * @param aggregator MinAggregator + */ + @Override public void merge(MeasureAggregator aggregator) { + MinAggregator minAggregator = (MinAggregator) aggregator; + if (!aggregator.isFirstTime()) { + agg(minAggregator.aggVal); + firstTime = false; + } + } + + @Override public MeasureAggregator getCopy() { + MinAggregator aggregator = new MinAggregator(); + aggregator.aggVal = aggVal; + aggregator.firstTime = firstTime; + return aggregator; + } + + @Override public MeasureAggregator getNew() { + return new MinLongAggregator(); + } +}