http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumBigDecimalAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumBigDecimalAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumBigDecimalAggregator.java new file mode 100644 index 0000000..b901878 --- /dev/null +++ b/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumBigDecimalAggregator.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.carbondata.query.aggregator.impl.sum; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; + +import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; +import org.carbondata.core.util.DataTypeUtil; +import org.carbondata.query.aggregator.MeasureAggregator; +import org.carbondata.query.aggregator.impl.AbstractMeasureAggregatorBasic; + +public class SumBigDecimalAggregator extends AbstractMeasureAggregatorBasic { + + /** + * serialVersionUID + */ + private static final long serialVersionUID = 623750056131364540L; + + /** + * aggregate value + */ + private BigDecimal aggVal; + + public SumBigDecimalAggregator() { + aggVal = new BigDecimal(0); + firstTime = false; + } + + /** + * This method will update the aggVal it will add new value to aggVal + * + * @param newVal new value + */ + @Override public void agg(Object newVal) { + if (firstTime) { + aggVal = (BigDecimal) newVal; + firstTime = false; + } else { + aggVal = aggVal.add((BigDecimal) newVal); + } + } + + @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { + if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { + BigDecimal value = dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(index); + aggVal = aggVal.add(value); + firstTime = false; + } + } + + /** + * Below method will be used to get the value byte array + */ + @Override public byte[] getByteArray() { + if (firstTime) { + return new byte[0]; + } + byte[] bytes = DataTypeUtil.bigDecimalToByte(aggVal); + ByteBuffer allocate = ByteBuffer.allocate(4 + bytes.length); + + allocate.putInt(bytes.length); + allocate.put(bytes); + allocate.rewind(); + return allocate.array(); + } + + /** + * This method will return aggVal + * + * @return sum value + */ + @Override public BigDecimal getBigDecimalValue() { + return aggVal; + } + + /* Merge the value, it will update the sum aggregate value it will add new + * value to aggVal + * + * @param aggregator + * SumAggregator + * + */ + @Override public void merge(MeasureAggregator aggregator) { + if (!aggregator.isFirstTime()) { + agg(aggregator.getBigDecimalValue()); + } + } + + /** + * This method return the sum value as an object + * + * @return sum value as an object + */ + @Override public Object getValueObject() { + return aggVal; + } + + @Override public void setNewValue(Object newValue) { + aggVal = (BigDecimal) newValue; + } + + @Override public void readData(DataInput inPut) throws IOException { + firstTime = inPut.readBoolean(); + aggVal = new BigDecimal(inPut.readUTF()); + } + + @Override public void writeData(DataOutput output) throws IOException { + output.writeBoolean(firstTime); + output.writeUTF(aggVal.toString()); + + } + + @Override public MeasureAggregator getCopy() { + SumBigDecimalAggregator aggr = new SumBigDecimalAggregator(); + aggr.aggVal = aggVal; + aggr.firstTime = firstTime; + return aggr; + } + + @Override public void merge(byte[] value) { + if (0 == value.length) { + return; + } + + ByteBuffer buffer = ByteBuffer.wrap(value); + byte[] valueByte = new byte[buffer.getInt()]; + buffer.get(valueByte); + BigDecimal valueBigDecimal = DataTypeUtil.byteToBigDecimal(valueByte); + aggVal = aggVal.add(valueBigDecimal); + firstTime = false; + } + + public String toString() { + return aggVal + ""; + } + + @Override public int compareTo(MeasureAggregator o) { + BigDecimal value = getBigDecimalValue(); + BigDecimal otherVal = o.getBigDecimalValue(); + return value.compareTo(otherVal); + } + + @Override public boolean equals(Object obj) { + if (!(obj instanceof SumBigDecimalAggregator)) { + return false; + } + SumBigDecimalAggregator o = (SumBigDecimalAggregator) obj; + return getBigDecimalValue().equals(o.getBigDecimalValue()); + } + + @Override public int hashCode() { + return getBigDecimalValue().hashCode(); + } + + @Override public MeasureAggregator getNew() { + return new SumBigDecimalAggregator(); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumDoubleAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumDoubleAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumDoubleAggregator.java new file mode 100644 index 0000000..777318d --- /dev/null +++ b/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumDoubleAggregator.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.carbondata.query.aggregator.impl.sum; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; +import org.carbondata.core.constants.CarbonCommonConstants; +import org.carbondata.query.aggregator.MeasureAggregator; +import org.carbondata.query.aggregator.impl.AbstractMeasureAggregatorBasic; + +public class SumDoubleAggregator extends AbstractMeasureAggregatorBasic { + + /** + * serialVersionUID + */ + private static final long serialVersionUID = 623750056131364540L; + + /** + * aggregate value + */ + private double aggVal; + + /** + * This method will update the aggVal it will add new value to aggVal + * + * @param newVal new value + */ + @Override public void agg(double newVal) { + aggVal += newVal; + firstTime = false; + } + + /** + * This method will update the aggVal it will add new value to aggVal + * + * @param newVal new value + */ + @Override public void agg(Object newVal) { + aggVal += ((Number) newVal).doubleValue(); + firstTime = false; + } + + @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { + if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { + aggVal+= dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(index); + firstTime = false; + } + } + + /** + * Below method will be used to get the value byte array + */ + @Override public byte[] getByteArray() { + if (firstTime) { + return new byte[0]; + } + ByteBuffer buffer = ByteBuffer.allocate(CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE); + buffer.putDouble(aggVal); + return buffer.array(); + } + + /** + * This method will return aggVal + * + * @return sum value + */ + + @Override public Double getDoubleValue() { + return aggVal; + } + + /* Merge the value, it will update the sum aggregate value it will add new + * value to aggVal + * + * @param aggregator SumAggregator + * + */ + @Override public void merge(MeasureAggregator aggregator) { + if (!aggregator.isFirstTime()) { + agg(aggregator.getDoubleValue()); + } + } + + /** + * This method return the sum value as an object + * + * @return sum value as an object + */ + @Override public Object getValueObject() { + return aggVal; + } + + @Override public void setNewValue(Object newValue) { + aggVal = (Double) newValue; + } + + @Override public boolean isFirstTime() { + return firstTime; + } + + @Override public void readData(DataInput inPut) throws IOException { + firstTime = inPut.readBoolean(); + aggVal = inPut.readDouble(); + } + + @Override public void writeData(DataOutput output) throws IOException { + output.writeBoolean(firstTime); + output.writeDouble(aggVal); + + } + + @Override public MeasureAggregator getCopy() { + SumDoubleAggregator aggr = new SumDoubleAggregator(); + aggr.aggVal = aggVal; + aggr.firstTime = firstTime; + return aggr; + } + + @Override public void merge(byte[] value) { + if (0 == value.length) { + return; + } + aggVal += ByteBuffer.wrap(value).getDouble(); + firstTime = false; + } + + public String toString() { + return aggVal + ""; + } + + @Override public int compareTo(MeasureAggregator o) { + double value = getDoubleValue(); + double otherVal = o.getDoubleValue(); + if (value > otherVal) { + return 1; + } + if (value < otherVal) { + return -1; + } + return 0; + } + + @Override public boolean equals(Object obj) { + if(!(obj instanceof SumDoubleAggregator)) { + return false; + } + SumDoubleAggregator o = (SumDoubleAggregator) obj; + return getDoubleValue().equals(o.getDoubleValue()); + } + + @Override public int hashCode() { + return getDoubleValue().hashCode(); + } + + @Override public MeasureAggregator getNew() { + return new SumDoubleAggregator(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumLongAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumLongAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumLongAggregator.java new file mode 100644 index 0000000..7c245d9 --- /dev/null +++ b/core/src/main/java/org/carbondata/query/aggregator/impl/sum/SumLongAggregator.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.carbondata.query.aggregator.impl.sum; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk; +import org.carbondata.core.constants.CarbonCommonConstants; +import org.carbondata.query.aggregator.MeasureAggregator; +import org.carbondata.query.aggregator.impl.AbstractMeasureAggregatorBasic; + +public class SumLongAggregator extends AbstractMeasureAggregatorBasic { + + /** + * serialVersionUID + */ + private static final long serialVersionUID = 623750056131364540L; + + /** + * aggregate value + */ + private long aggVal; + + /** + * This method will update the aggVal it will add new value to aggVal + * + * @param newVal new value + */ + @Override public void agg(Object newVal) { + aggVal += (long) newVal; + firstTime = false; + } + + @Override public void agg(MeasureColumnDataChunk dataChunk, int index) { + if (!dataChunk.getNullValueIndexHolder().getBitSet().get(index)) { + aggVal+= dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(index); + firstTime = false; + } + } + + /** + * Below method will be used to get the value byte array + */ + @Override public byte[] getByteArray() { + if (firstTime) { + return new byte[0]; + } + ByteBuffer buffer = ByteBuffer.allocate(CarbonCommonConstants.LONG_SIZE_IN_BYTE); + buffer.putLong(aggVal); + return buffer.array(); + } + + /** + * This method will return aggVal + * + * @return sum value + */ + @Override public Long getLongValue() { + return aggVal; + } + + /* Merge the value, it will update the sum aggregate value it will add new + * value to aggVal + * + * @param aggregator SumAggregator + * + */ + @Override public void merge(MeasureAggregator aggregator) { + if (!aggregator.isFirstTime()) { + agg(aggregator.getLongValue()); + } + } + + /** + * This method return the sum value as an object + * + * @return sum long value as an object + */ + @Override public Object getValueObject() { + return aggVal; + } + + @Override public void setNewValue(Object newValue) { + aggVal = (long) newValue; + } + + @Override public void readData(DataInput inPut) throws IOException { + firstTime = inPut.readBoolean(); + aggVal = inPut.readLong(); + } + + @Override public void writeData(DataOutput output) throws IOException { + output.writeBoolean(firstTime); + output.writeLong(aggVal); + + } + + @Override public MeasureAggregator getCopy() { + SumLongAggregator aggr = new SumLongAggregator(); + aggr.aggVal = aggVal; + aggr.firstTime = firstTime; + return aggr; + } + + @Override public void merge(byte[] value) { + if (0 == value.length) { + return; + } + aggVal += ByteBuffer.wrap(value).getLong(); + firstTime = false; + } + + public String toString() { + return aggVal + ""; + } + + @Override public int compareTo(MeasureAggregator o) { + Long value = getLongValue(); + Long otherVal = o.getLongValue(); + if (value > otherVal) { + return 1; + } + if (value < otherVal) { + return -1; + } + return 0; + } + + @Override public boolean equals(Object obj) { + if(!(obj instanceof SumLongAggregator)) { + return false; + } + SumLongAggregator o = (SumLongAggregator) obj; + return getLongValue().equals(o.getLongValue()); + } + + @Override public int hashCode() { + return getLongValue().hashCode(); + } + + @Override public MeasureAggregator getNew() { + // TODO Auto-generated method stub + return new SumLongAggregator(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/aggregator/util/MeasureAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/aggregator/util/MeasureAggregatorFactory.java b/core/src/main/java/org/carbondata/query/aggregator/util/MeasureAggregatorFactory.java index 483392e..b46c4de 100644 --- a/core/src/main/java/org/carbondata/query/aggregator/util/MeasureAggregatorFactory.java +++ b/core/src/main/java/org/carbondata/query/aggregator/util/MeasureAggregatorFactory.java @@ -24,28 +24,28 @@ import org.carbondata.core.carbon.metadata.datatype.DataType; import org.carbondata.core.constants.CarbonCommonConstants; import org.carbondata.query.aggregator.CustomMeasureAggregator; import org.carbondata.query.aggregator.MeasureAggregator; -import org.carbondata.query.aggregator.impl.AvgBigDecimalAggregator; -import org.carbondata.query.aggregator.impl.AvgDoubleAggregator; -import org.carbondata.query.aggregator.impl.AvgLongAggregator; -import org.carbondata.query.aggregator.impl.CountAggregator; -import org.carbondata.query.aggregator.impl.DistinctCountAggregatorObjectSet; -import org.carbondata.query.aggregator.impl.DistinctCountBigDecimalAggregatorObjectSet; -import org.carbondata.query.aggregator.impl.DistinctCountLongAggregatorObjectSet; -import org.carbondata.query.aggregator.impl.DummyBigDecimalAggregator; -import org.carbondata.query.aggregator.impl.DummyDoubleAggregator; -import org.carbondata.query.aggregator.impl.DummyLongAggregator; -import org.carbondata.query.aggregator.impl.MaxAggregator; -import org.carbondata.query.aggregator.impl.MaxBigDecimalAggregator; -import org.carbondata.query.aggregator.impl.MaxLongAggregator; -import org.carbondata.query.aggregator.impl.MinAggregator; -import org.carbondata.query.aggregator.impl.MinBigDecimalAggregator; -import org.carbondata.query.aggregator.impl.MinLongAggregator; -import org.carbondata.query.aggregator.impl.SumBigDecimalAggregator; -import org.carbondata.query.aggregator.impl.SumDistinctBigDecimalAggregator; -import org.carbondata.query.aggregator.impl.SumDistinctDoubleAggregator; -import org.carbondata.query.aggregator.impl.SumDistinctLongAggregator; -import org.carbondata.query.aggregator.impl.SumDoubleAggregator; -import org.carbondata.query.aggregator.impl.SumLongAggregator; +import org.carbondata.query.aggregator.impl.avg.AvgBigDecimalAggregator; +import org.carbondata.query.aggregator.impl.avg.AvgDoubleAggregator; +import org.carbondata.query.aggregator.impl.avg.AvgLongAggregator; +import org.carbondata.query.aggregator.impl.count.CountAggregator; +import org.carbondata.query.aggregator.impl.distinct.DistinctCountAggregatorObjectSet; +import org.carbondata.query.aggregator.impl.distinct.DistinctCountBigDecimalAggregatorObjectSet; +import org.carbondata.query.aggregator.impl.distinct.DistinctCountLongAggregatorObjectSet; +import org.carbondata.query.aggregator.impl.distinct.SumDistinctBigDecimalAggregator; +import org.carbondata.query.aggregator.impl.distinct.SumDistinctDoubleAggregator; +import org.carbondata.query.aggregator.impl.distinct.SumDistinctLongAggregator; +import org.carbondata.query.aggregator.impl.dummy.DummyBigDecimalAggregator; +import org.carbondata.query.aggregator.impl.dummy.DummyDoubleAggregator; +import org.carbondata.query.aggregator.impl.dummy.DummyLongAggregator; +import org.carbondata.query.aggregator.impl.max.MaxAggregator; +import org.carbondata.query.aggregator.impl.max.MaxBigDecimalAggregator; +import org.carbondata.query.aggregator.impl.max.MaxLongAggregator; +import org.carbondata.query.aggregator.impl.min.MinAggregator; +import org.carbondata.query.aggregator.impl.min.MinBigDecimalAggregator; +import org.carbondata.query.aggregator.impl.min.MinLongAggregator; +import org.carbondata.query.aggregator.impl.sum.SumBigDecimalAggregator; +import org.carbondata.query.aggregator.impl.sum.SumDoubleAggregator; +import org.carbondata.query.aggregator.impl.sum.SumLongAggregator; import org.carbondata.query.carbon.model.CustomAggregateExpression; /** http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/carbon/aggregator/dimension/impl/DirectDictionaryDimensionAggregator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/carbon/aggregator/dimension/impl/DirectDictionaryDimensionAggregator.java b/core/src/main/java/org/carbondata/query/carbon/aggregator/dimension/impl/DirectDictionaryDimensionAggregator.java new file mode 100644 index 0000000..971e4cc --- /dev/null +++ b/core/src/main/java/org/carbondata/query/carbon/aggregator/dimension/impl/DirectDictionaryDimensionAggregator.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.carbondata.query.carbon.aggregator.dimension.impl; + +import java.nio.ByteBuffer; + +import org.carbondata.core.constants.CarbonCommonConstants; +import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; +import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; +import org.carbondata.core.util.CarbonUtil; +import org.carbondata.query.aggregator.MeasureAggregator; +import org.carbondata.query.carbon.aggregator.dimension.DimensionDataAggregator; +import org.carbondata.query.carbon.executor.util.QueryUtil; +import org.carbondata.query.carbon.model.DimensionAggregatorInfo; +import org.carbondata.query.carbon.result.AbstractScannedResult; + +/** + * Class which will be used to aggregate the direct dictionary dimension data + */ +public class DirectDictionaryDimensionAggregator implements DimensionDataAggregator { + + /** + * info object which store information about dimension to be aggregated + */ + private DimensionAggregatorInfo dimensionAggeragtorInfo; + + /** + * start index of the aggregator for current dimension column + */ + private int aggregatorStartIndex; + + /** + * buffer used to convert mdkey to surrogate key + */ + private ByteBuffer buffer; + + /** + * data index in the file + */ + private int blockIndex; + + /** + * to store index which will be used to aggregate + * number type value like sum avg + */ + private int[] numberTypeAggregatorIndex; + + /** + * DirectDictionaryGenerator + */ + private DirectDictionaryGenerator directDictionaryGenerator; + + /** + * to store index which will be used to aggregate + * actual type value like max, min, dictinct count + */ + private int[] actualTypeAggregatorIndex; + + public DirectDictionaryDimensionAggregator(DimensionAggregatorInfo dimensionAggeragtorInfo, + int aggregatorStartIndex, int blockIndex) { + this.dimensionAggeragtorInfo = dimensionAggeragtorInfo; + this.aggregatorStartIndex = aggregatorStartIndex; + this.blockIndex = blockIndex; + buffer = ByteBuffer.allocate(CarbonCommonConstants.INT_SIZE_IN_BYTE); + numberTypeAggregatorIndex = + QueryUtil.getNumberTypeIndex(this.dimensionAggeragtorInfo.getAggList()); + actualTypeAggregatorIndex = + QueryUtil.getActualTypeIndex(this.dimensionAggeragtorInfo.getAggList()); + directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory + .getDirectDictionaryGenerator(this.dimensionAggeragtorInfo.getDim().getDataType()); + } + + /** + * Below method will be used to aggregate the dimension data + * + * @param scannedResult scanned result + * @param aggeragtor aggregator used to aggregate the data + */ + @Override public void aggregateDimensionData(AbstractScannedResult scannedResult, + MeasureAggregator[] aggeragtor) { + byte[] dimensionData = scannedResult.getDimensionKey(blockIndex); + int surrogateKey = CarbonUtil.getSurrogateKey(dimensionData, buffer); + Object dataBasedOnDataType = + (long) directDictionaryGenerator.getValueFromSurrogate(surrogateKey) / 1000; + + if (actualTypeAggregatorIndex.length > 0) { + for (int j = 0; j < actualTypeAggregatorIndex.length; j++) { + aggeragtor[aggregatorStartIndex + actualTypeAggregatorIndex[j]].agg(dataBasedOnDataType); + } + } + if (numberTypeAggregatorIndex.length > 0) { + for (int j = 0; j < numberTypeAggregatorIndex.length; j++) { + aggeragtor[aggregatorStartIndex + numberTypeAggregatorIndex[j]].agg(dataBasedOnDataType); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/InternalCountStartQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/InternalCountStartQueryExecutor.java b/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/InternalCountStartQueryExecutor.java index e13aee2..14c336d 100644 --- a/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/InternalCountStartQueryExecutor.java +++ b/core/src/main/java/org/carbondata/query/carbon/executor/internal/impl/InternalCountStartQueryExecutor.java @@ -24,7 +24,7 @@ import java.util.List; import org.carbondata.core.carbon.datastore.block.AbstractIndex; import org.carbondata.core.iterator.CarbonIterator; import org.carbondata.query.aggregator.MeasureAggregator; -import org.carbondata.query.aggregator.impl.CountAggregator; +import org.carbondata.query.aggregator.impl.count.CountAggregator; import org.carbondata.query.carbon.executor.exception.QueryExecutionException; import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo; import org.carbondata.query.carbon.executor.internal.InternalQueryExecutor; http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/carbon/executor/util/QueryUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/carbon/executor/util/QueryUtil.java b/core/src/main/java/org/carbondata/query/carbon/executor/util/QueryUtil.java index ebd90f9..789f77e 100644 --- a/core/src/main/java/org/carbondata/query/carbon/executor/util/QueryUtil.java +++ b/core/src/main/java/org/carbondata/query/carbon/executor/util/QueryUtil.java @@ -53,6 +53,7 @@ import org.carbondata.core.util.CarbonUtil; import org.carbondata.core.util.CarbonUtilException; import org.carbondata.query.carbon.aggregator.dimension.DimensionDataAggregator; import org.carbondata.query.carbon.aggregator.dimension.impl.ColumnGroupDimensionsAggregator; +import org.carbondata.query.carbon.aggregator.dimension.impl.DirectDictionaryDimensionAggregator; import org.carbondata.query.carbon.aggregator.dimension.impl.FixedLengthDimensionAggregator; import org.carbondata.query.carbon.aggregator.dimension.impl.VariableLengthDimensionAggregator; import org.carbondata.query.carbon.executor.exception.QueryExecutionException; @@ -724,9 +725,15 @@ public class QueryUtil { aggregatorStartIndex += numberOfAggregatorForColumnGroup; continue; } else { + if(CarbonUtil.hasEncoding(dim.getEncoder(), Encoding.DIRECT_DICTIONARY)){ + dimensionDataAggregators.add( + new DirectDictionaryDimensionAggregator(entry.getValue().get(0), + aggregatorStartIndex, + dimensionToBlockIndexMapping.get(dim.getOrdinal()))); + } // if it is a dictionary column than create a fixed length // aggeragtor - if (CarbonUtil + else if (CarbonUtil .hasEncoding(dim.getEncoder(), Encoding.DICTIONARY)) { dimensionDataAggregators.add( new FixedLengthDimensionAggregator(entry.getValue().get(0), null, http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/QueryResultPreparatorImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/QueryResultPreparatorImpl.java b/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/QueryResultPreparatorImpl.java index 0957e7a..5604ecd 100644 --- a/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/QueryResultPreparatorImpl.java +++ b/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/QueryResultPreparatorImpl.java @@ -32,9 +32,9 @@ import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerat import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; import org.carbondata.core.util.CarbonUtil; import org.carbondata.query.aggregator.MeasureAggregator; -import org.carbondata.query.aggregator.impl.CountAggregator; -import org.carbondata.query.aggregator.impl.DistinctCountAggregator; -import org.carbondata.query.aggregator.impl.DistinctStringCountAggregator; +import org.carbondata.query.aggregator.impl.count.CountAggregator; +import org.carbondata.query.aggregator.impl.distinct.DistinctCountAggregator; +import org.carbondata.query.aggregator.impl.distinct.DistinctStringCountAggregator; import org.carbondata.query.carbon.executor.impl.QueryExecutorProperties; import org.carbondata.query.carbon.model.DimensionAggregatorInfo; import org.carbondata.query.carbon.model.QueryDimension; http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/examples/src/main/scala/org/carbondata/examples/GenerateDictionaryExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/carbondata/examples/GenerateDictionaryExample.scala b/examples/src/main/scala/org/carbondata/examples/GenerateDictionaryExample.scala index 69e1d2f..a549409 100644 --- a/examples/src/main/scala/org/carbondata/examples/GenerateDictionaryExample.scala +++ b/examples/src/main/scala/org/carbondata/examples/GenerateDictionaryExample.scala @@ -63,7 +63,7 @@ object GenerateDictionaryExample { val tableName = carbonTableIdentifier.getTableName val carbonRelation = CarbonEnv.getInstance(carbonContext).carbonCatalog. lookupRelation1(Option(dataBaseName), - tableName, None) (carbonContext).asInstanceOf[CarbonRelation] + tableName) (carbonContext).asInstanceOf[CarbonRelation] val carbonTable = carbonRelation.cubeMeta.carbonTable val dimensions = carbonTable.getDimensionByTableName(tableName) .toArray.map(_.asInstanceOf[CarbonDimension]) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala ---------------------------------------------------------------------- diff --git a/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala index 1cd4be4..c4f09cc 100644 --- a/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala +++ b/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala @@ -19,12 +19,13 @@ package org.apache.spark.sql.common.util import java.util.{Locale, TimeZone} -import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import scala.collection.JavaConversions._ + import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.columnar.InMemoryRelation +import org.apache.spark.sql.execution.columnar.InMemoryRelation +import org.apache.spark.sql.{DataFrame, Row, SQLContext} -import scala.collection.JavaConversions._ class QueryTest extends PlanTest { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase1.scala ---------------------------------------------------------------------- diff --git a/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase1.scala b/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase1.scala index ec23b80..bd7b596 100644 --- a/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase1.scala +++ b/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase1.scala @@ -81,6 +81,34 @@ class AllDataTypesTestCase1 extends QueryTest with BeforeAndAfterAll { "Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber," + "Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId," + "gamePointId,gamePointDescription')") + + sql( + "create table if not exists Carbon_automation_hive (imei string,deviceInformationId int," + + "MAC string,deviceColor string,device_backColor string,modelId string,marketName " + + "string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string," + + "productionDate timestamp,bomCode string,internalModels string, deliveryTime string, " + + "channelsId string, channelsName string , deliveryAreaId string, deliveryCountry " + + "string, deliveryProvince string, deliveryCity string,deliveryDistrict string, " + + "deliveryStreet string, oxSingleNumber string,contractNumber int, ActiveCheckTime string, ActiveAreaId " + + "string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict" + + " string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, " + + "Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, " + + "Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string," + + "Active_webTypeDataVerNumber string, Active_operatorsVersion string, " + + "Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, " + + "Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, " + + "Latest_province string, Latest_city string, Latest_district string, Latest_street " + + "string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion " + + "string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion " + + "string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, " + + "Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, " + + "Latest_operatorId string, , gamePointId int, gamePointDescription string" + + ") row format delimited fields terminated by ','" + ) + + sql("LOAD DATA LOCAL INPATH '" + currentDirectory + "/src/test/resources/100_olap.csv' INTO " + + "table Carbon_automation_hive ") + } catch { case e: Exception => print("ERROR: DROP Carbon_automation_test ") } @@ -88,6 +116,7 @@ class AllDataTypesTestCase1 extends QueryTest with BeforeAndAfterAll { override def afterAll { sql("drop cube Carbon_automation_test") + sql("drop table Carbon_automation_hive") } @@ -853,7 +882,7 @@ class AllDataTypesTestCase1 extends QueryTest with BeforeAndAfterAll { test("select variance(deviceInformationId) as a from Carbon_automation_test")({ checkAnswer( sql("select variance(deviceInformationId) as a from Carbon_automation_test"), - Seq(Row(9.31041555963636E9)) + sql("select variance(deviceInformationId) as a from Carbon_automation_hive") ) } ) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase2.scala ---------------------------------------------------------------------- diff --git a/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase2.scala b/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase2.scala index ab9121a..88ba722 100644 --- a/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase2.scala +++ b/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase2.scala @@ -81,6 +81,32 @@ class AllDataTypesTestCase2 extends QueryTest with BeforeAndAfterAll { "Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber," + "Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId," + "gamePointId,gamePointDescription')") + + sql( + "create table if not exists Carbon_automation_hive2(imei string,deviceInformationId int," + + "MAC string,deviceColor string,device_backColor string,modelId string,marketName " + + "string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string," + + "productionDate timestamp,bomCode string,internalModels string, deliveryTime string, " + + "channelsId string, channelsName string , deliveryAreaId string, deliveryCountry " + + "string, deliveryProvince string, deliveryCity string,deliveryDistrict string, " + + "deliveryStreet string, oxSingleNumber string,contractNumber int, ActiveCheckTime string, ActiveAreaId " + + "string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict" + + " string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, " + + "Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, " + + "Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string," + + "Active_webTypeDataVerNumber string, Active_operatorsVersion string, " + + "Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, " + + "Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, " + + "Latest_province string, Latest_city string, Latest_district string, Latest_street " + + "string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion " + + "string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion " + + "string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, " + + "Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, " + + "Latest_operatorId string, gamePointId int,gamePointDescription string" + + ") row format delimited fields terminated by ','" + ) + sql("LOAD DATA LOCAL INPATH '" + currentDirectory + "/src/test/resources/100_olap.csv' INTO " + + "table Carbon_automation_hive2 ") } catch { case e: Exception => print("ERROR: DROP Carbon_automation_test2 ") } @@ -89,6 +115,7 @@ class AllDataTypesTestCase2 extends QueryTest with BeforeAndAfterAll { override def afterAll { try { sql("drop cube Carbon_automation_test2") + sql("drop table Carbon_automation_hive2") } catch { case e: Exception => print("ERROR: DROP Carbon_automation_test2 ") } @@ -7902,7 +7929,9 @@ class AllDataTypesTestCase2 extends QueryTest with BeforeAndAfterAll { sql( "select variance(deviceInformationId), var_pop(imei) from Carbon_automation_test2 where activeareaid>3" ), - Seq(Row(1.477644655616972E10, null)) + sql( + "select variance(deviceInformationId), var_pop(imei) from Carbon_automation_hive2 where activeareaid>3" + ) ) } ) @@ -7915,7 +7944,9 @@ class AllDataTypesTestCase2 extends QueryTest with BeforeAndAfterAll { sql( "select variance(contractNumber), var_pop(contractNumber) from Carbon_automation_test2 where deliveryareaid>5" ), - Seq(Row(8.508651970169495E12, 8.508651970169495E12)) + sql( + "select variance(contractNumber), var_pop(contractNumber) from Carbon_automation_hive2 where deliveryareaid>5" + ) ) } ) @@ -7928,7 +7959,9 @@ class AllDataTypesTestCase2 extends QueryTest with BeforeAndAfterAll { sql( "select variance(AMSize), var_pop(channelsid) from Carbon_automation_test2 where channelsid>2" ), - Seq(Row(null, 2.148423005565863)) + sql( + "select variance(AMSize), var_pop(channelsid) from Carbon_automation_hive2 where channelsid>2" + ) ) } ) @@ -7941,7 +7974,9 @@ class AllDataTypesTestCase2 extends QueryTest with BeforeAndAfterAll { sql( "select variance(deviceInformationId), var_pop(deviceInformationId) from Carbon_automation_test2 where activeareaid>3" ), - Seq(Row(1.477644655616972E10, 1.477644655616972E10)) + sql( + "select variance(deviceInformationId), var_pop(deviceInformationId) from Carbon_automation_hive2 where activeareaid>3" + ) ) } ) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase6.scala ---------------------------------------------------------------------- diff --git a/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase6.scala b/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase6.scala index 1884e5f..12f55b7 100644 --- a/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase6.scala +++ b/integration-testcases/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase6.scala @@ -53,7 +53,7 @@ class AllDataTypesTestCase6 extends QueryTest with BeforeAndAfterAll { "bomCode string,internalModels string, deliveryTime string, channelsId string, " + "channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince " + "string, deliveryCity string,deliveryDistrict string, deliveryStreet string, " + - "oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry " + + "oxSingleNumber string,contractNumber int, ActiveCheckTime string, ActiveAreaId string, ActiveCountry " + "string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet " + "string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, " + "Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, " + @@ -65,8 +65,8 @@ class AllDataTypesTestCase6 extends QueryTest with BeforeAndAfterAll { "Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, " + "Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string," + " Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, " + - "Latest_phonePADPartitionedVersions string, Latest_operatorId string, " + - "gamePointDescription string, gamePointId int,contractNumber int) row format " + + "Latest_phonePADPartitionedVersions string, Latest_operatorId string,gamePointId int," + + "gamePointDescription string) row format " + "delimited fields terminated by ','" ) @@ -211,7 +211,7 @@ class AllDataTypesTestCase6 extends QueryTest with BeforeAndAfterAll { test("select sum( DISTINCT channelsId) a from Carbon_automation_test6")({ checkAnswer( sql("select sum( DISTINCT channelsId) a from Carbon_automation_test6"), - Seq(Row(428.0))) + sql("select sum( DISTINCT channelsId) a from hivetable")) }) //TC_083 @@ -263,7 +263,7 @@ class AllDataTypesTestCase6 extends QueryTest with BeforeAndAfterAll { test("select variance(gamePointId) as a from Carbon_automation_test6")({ checkAnswer( sql("select variance(gamePointId) as a from Carbon_automation_test6"), - Seq(Row(654787.843930927))) + sql("select variance(gamePointId) as a from hivetable")) }) //TC_120 @@ -732,14 +732,14 @@ class AllDataTypesTestCase6 extends QueryTest with BeforeAndAfterAll { test("select variance(gamepointid), var_pop(gamepointid) from Carbon_automation_test6 where channelsid>2")({ checkAnswer( sql("select variance(gamepointid), var_pop(gamepointid) from Carbon_automation_test6 where channelsid>2"), - Seq(Row(622630.4599570761, 622630.4599570761))) + sql("select variance(gamepointid), var_pop(gamepointid) from hivetable where channelsid>2")) }) //TC_445 test("select variance(bomcode), var_pop(gamepointid) from Carbon_automation_test6 where activeareaid>3")({ checkAnswer( sql("select variance(bomcode), var_pop(gamepointid) from Carbon_automation_test6 where activeareaid>3"), - Seq(Row(1.4776446556169722E10, 663683.3954750763))) + sql("select variance(bomcode), var_pop(gamepointid) from hivetable where activeareaid>3")) }) //TC_447 http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/pom.xml ---------------------------------------------------------------------- diff --git a/integration/spark/pom.xml b/integration/spark/pom.xml index 0682a42..c98c9fb 100644 --- a/integration/spark/pom.xml +++ b/integration/spark/pom.xml @@ -65,17 +65,6 @@ <version>${project.version}</version> </dependency> <dependency> - <groupId>eigenbase</groupId> - <artifactId>eigenbase-xom</artifactId> - <version>1.3.4</version> - <exclusions> - <exclusion> - <groupId>*</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> <groupId>it.unimi.dsi</groupId> <artifactId>fastutil</artifactId> <version>6.5.0</version> http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonAggregate.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonAggregate.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonAggregate.scala deleted file mode 100644 index 93cf675..0000000 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonAggregate.scala +++ /dev/null @@ -1,209 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import java.util.HashMap - -import scala.Array.{canBuildFrom, fallbackCanBuildFrom} - -import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.errors.attachTree -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.{SparkPlan, UnaryNode} - -/** - * :: DeveloperApi :: - * Groups input data by `groupingExpressions` and computes the `aggregateExpressions` for each - * group. - * - * @param partial if true then aggregation is done partially on local data without - * shuffling to - * ensure all values where `groupingExpressions` are equal are present. - * @param groupingExpressions expressions that are evaluated to determine grouping. - * @param aggregateExpressions expressions that are computed for each group. - * @param child the input data source. - */ -@DeveloperApi -case class CarbonAggregate( - partial: Boolean, - groupingExpressions: Seq[Expression], - aggregateExpressions: Seq[NamedExpression], - child: SparkPlan)(@transient sqlContext: SQLContext) - extends UnaryNode { - - override def requiredChildDistribution: Seq[Distribution] = { - if (partial) { - UnspecifiedDistribution :: Nil - } else { - if (groupingExpressions == Nil) { - AllTuples :: Nil - } else { - ClusteredDistribution(groupingExpressions) :: Nil - } - } - } - - override def otherCopyArgs: Seq[AnyRef] = sqlContext :: Nil - - // HACK: Generators don't correctly preserve their output through serializations so we grab - // out child's output attributes statically here. - private[this] val childOutput = child.output - - override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute) - - /** - * An aggregate that needs to be computed for each row in a group. - * - * @param unbound Unbound version of this aggregate, used for result substitution. - * @param aggregate A bound copy of this aggregate used to create a new aggregation buffer. - * @param resultAttribute An attribute used to refer to the result of this aggregate in the final - * output. - */ - case class ComputedAggregate(unbound: AggregateExpression1, - aggregate: AggregateExpression1, - resultAttribute: AttributeReference) - - /** A list of aggregates that need to be computed for each group. */ - private[this] val computedAggregates = aggregateExpressions.flatMap { agg => - agg.collect { - case a: AggregateExpression1 => - ComputedAggregate( - a, - BindReferences.bindReference(a, childOutput), - AttributeReference(s"aggResult:$a", a.dataType, a.nullable)()) - } - }.toArray - - /** The schema of the result of all aggregate evaluations */ - private[this] val computedSchema = computedAggregates.map(_.resultAttribute) - - /** Creates a new aggregate buffer for a group. */ - private[this] def newAggregateBuffer(): Array[AggregateFunction1] = { - val buffer = new Array[AggregateFunction1](computedAggregates.length) - var i = 0 - while (i < computedAggregates.length) { - buffer(i) = computedAggregates(i).aggregate.newInstance() - i += 1 - } - buffer - } - - /** Named attributes used to substitute grouping attributes into the final result. */ - private[this] val namedGroups = groupingExpressions.map { - case ne: NamedExpression => ne -> ne.toAttribute - case e => e -> Alias(e, s"groupingExpr:$e")().toAttribute - } - - /** - * A map of substitutions that are used to insert the aggregate expressions and grouping - * expression into the final result expression. - */ - private[this] val resultMap = - (computedAggregates.map { agg => agg.unbound -> agg.resultAttribute } ++ namedGroups).toMap - - /** - * Substituted version of aggregateExpressions expressions which are used to compute final - * output rows given a group and the result of all aggregate computations. - */ - private[this] val resultExpressions = aggregateExpressions.map { agg => - agg.transform { - case e: Expression if resultMap.contains(e) => resultMap(e) - } - } - - override def doExecute(): RDD[InternalRow] = { - attachTree(this, "execute") { - if (groupingExpressions.isEmpty) { - child.execute().mapPartitions { iter => - val buffer = newAggregateBuffer() - var currentRow: InternalRow = null - while (iter.hasNext) { - currentRow = iter.next() - var i = 0 - while (i < buffer.length) { - buffer(i).update(currentRow) - i += 1 - } - } - val resultProjection = new InterpretedProjection(resultExpressions, computedSchema) - val aggregateResults = new GenericMutableRow(computedAggregates.length) - - var i = 0 - while (i < buffer.length) { - aggregateResults(i) = buffer(i).eval(EmptyRow) - i += 1 - } - - Iterator(resultProjection(aggregateResults)) - } - } else { - child.execute().mapPartitions { iter => - val hashTable = new HashMap[InternalRow, Array[AggregateFunction1]] - val groupingProjection = new InterpretedMutableProjection(groupingExpressions, - childOutput) - - var currentRow: InternalRow = null - while (iter.hasNext) { - currentRow = iter.next() - val currentGroup = groupingProjection(currentRow) - var currentBuffer = hashTable.get(currentGroup) - if (currentBuffer == null) { - currentBuffer = newAggregateBuffer() - hashTable.put(currentGroup.copy(), currentBuffer) - } - - var i = 0 - while (i < currentBuffer.length) { - currentBuffer(i).update(currentRow) - i += 1 - } - } - - new Iterator[InternalRow] { - private[this] val hashTableIter = hashTable.entrySet().iterator() - private[this] val aggregateResults = new GenericMutableRow(computedAggregates.length) - private[this] val resultProjection = - new InterpretedMutableProjection(resultExpressions, - computedSchema ++ namedGroups.map(_._2)) - private[this] val joinedRow = new JoinedRow - - override final def hasNext: Boolean = hashTableIter.hasNext - - override final def next(): InternalRow = { - val currentEntry = hashTableIter.next() - val currentGroup = currentEntry.getKey - val currentBuffer = currentEntry.getValue - - var i = 0 - while (i < currentBuffer.length) { - // Evaluating an aggregate buffer returns the result. No row is required since we - // already added all rows in the group using update. - aggregateResults(i) = currentBuffer(i).eval(EmptyRow) - i += 1 - } - resultProjection(joinedRow(aggregateResults, currentGroup)) - } - } - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala index e574348..f728a32 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala @@ -19,18 +19,18 @@ package org.apache.spark.sql import scala.collection.mutable.MutableList -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.agg.{CarbonAverage, CarbonCount} +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, _} -import org.apache.spark.sql.catalyst.trees.TreeNodeRef import org.apache.spark.sql.execution.command.tableModel +import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.optimizer.{CarbonAliasDecoderRelation, CarbonDecoderRelation} -import org.apache.spark.sql.types.{BooleanType, DataType, StringType, TimestampType} - -import org.carbondata.spark.agg._ +import org.apache.spark.sql.types._ /** * Top command @@ -94,7 +94,7 @@ case class ShowCubeCommand(schemaNameOp: Option[String]) extends LogicalPlan wit override def children: Seq[LogicalPlan] = Seq.empty override def output: Seq[Attribute] = { - Seq(AttributeReference("cubeName", StringType, nullable = false)(), + Seq(AttributeReference("tableName", StringType, nullable = false)(), AttributeReference("isRegisteredWithSpark", BooleanType, nullable = false)()) } } @@ -107,8 +107,8 @@ case class ShowAllCubeCommand() extends LogicalPlan with Command { override def children: Seq[LogicalPlan] = Seq.empty override def output: Seq[Attribute] = { - Seq(AttributeReference("schemaName", StringType, nullable = false)(), - AttributeReference("cubeName", StringType, nullable = false)(), + Seq(AttributeReference("dbName", StringType, nullable = false)(), + AttributeReference("tableName", StringType, nullable = false)(), AttributeReference("isRegisteredWithSpark", BooleanType, nullable = false)()) } } @@ -161,7 +161,7 @@ case class ShowLoadsCommand(schemaNameOp: Option[String], cube: String, limit: O /** * Describe formatted for hive table */ -case class DescribeFormattedCommand(sql: String, tblIdentifier: Seq[String]) +case class DescribeFormattedCommand(sql: String, tblIdentifier: TableIdentifier) extends LogicalPlan with Command { override def children: Seq[LogicalPlan] = Seq.empty @@ -181,16 +181,18 @@ case class CarbonDictionaryCatalystDecoder( override def output: Seq[Attribute] = child.output } -abstract class CarbonProfile(attributes: Seq[Attribute]) extends Serializable{ +abstract class CarbonProfile(attributes: Seq[Attribute]) extends Serializable { def isEmpty: Boolean = attributes.isEmpty } + case class IncludeProfile(attributes: Seq[Attribute]) extends CarbonProfile(attributes) + case class ExcludeProfile(attributes: Seq[Attribute]) extends CarbonProfile(attributes) case class FakeCarbonCast(child: Literal, dataType: DataType) extends LeafExpression with CodegenFallback { - override def toString: String = s"FakeCarbonCast($child as ${dataType.simpleString})" + override def toString: String = s"FakeCarbonCast($child as ${ dataType.simpleString })" override def checkInputDataTypes(): TypeCheckResult = { TypeCheckResult.TypeCheckSuccess @@ -255,7 +257,7 @@ object PhysicalOperation1 extends PredicateHelper { val (fields, filters, other, aliases, _, sortOrder, limit) = collectProjectsAndFilters( child) - var aggExps: Seq[AggregateExpression1] = Nil + var aggExps: Seq[AggregateExpression] = Nil aggregateExpressions.foreach(v => { val list = findAggreagateExpression(v) aggExps = aggExps ++ list @@ -276,12 +278,12 @@ object PhysicalOperation1 extends PredicateHelper { } } - def findAggreagateExpression(expr: Expression): Seq[AggregateExpression1] = { + def findAggreagateExpression(expr: Expression): Seq[AggregateExpression] = { val exprList = expr match { - case d: AggregateExpression1 => d :: Nil + case d: AggregateExpression => d :: Nil case Alias(ref, name) => findAggreagateExpression(ref) case other => - var listout: Seq[AggregateExpression1] = Nil + var listout: Seq[AggregateExpression] = Nil other.children.foreach(v => { val list = findAggreagateExpression(v) @@ -317,7 +319,7 @@ object PhysicalOperation1 extends PredicateHelper { case Alias(ref, name) => ref case others => others }.filter { - case d: AggregateExpression1 => true + case d: AggregateExpression => true case _ => false } (fields, filters, other, aliases ++ collectAliases(aggregateExpressions), Some( @@ -352,6 +354,28 @@ object PhysicalOperation1 extends PredicateHelper { } } +case class PositionLiteral(expr: Expression, intermediateDataType: DataType) + extends LeafExpression with CodegenFallback { + override def dataType: DataType = expr.dataType + + override def nullable: Boolean = false + + type EvaluatedType = Any + var position = -1 + + def setPosition(pos: Int): Unit = position = pos + + override def toString: String = s"PositionLiteral($position : $expr)" + + override def eval(input: InternalRow): Any = { + if (position != -1) { + input.get(position, intermediateDataType) + } else { + expr.eval(input) + } + } +} + /** * Matches a logical aggregation that can be performed on distributed data in two steps. The first * operates on the data in each partition performing partial aggregation for each group. The second @@ -367,85 +391,98 @@ object PhysicalOperation1 extends PredicateHelper { * - Partial aggregate expressions. * - Input to the aggregation. */ -object PartialAggregation { - type ReturnType = - (Seq[Attribute], Seq[NamedExpression], Seq[Expression], Seq[NamedExpression], LogicalPlan) +object CarbonAggregation { + type ReturnType = (Seq[Expression], Seq[NamedExpression], LogicalPlan) private def convertAggregatesForPushdown(convertUnknown: Boolean, - rewrittenAggregateExpressions: Seq[Expression]) = { - var counter: Int = 0 - var updatedExpressions = MutableList[Expression]() - rewrittenAggregateExpressions.foreach(v => { - val updated = convertAggregate(v, counter, convertUnknown) - updatedExpressions += updated - counter = counter + 1 - }) - updatedExpressions + rewrittenAggregateExpressions: Seq[Expression], + oneAttr: AttributeReference) = { + if (canBeConvertedToCarbonAggregate(rewrittenAggregateExpressions)) { + var counter: Int = 0 + var updatedExpressions = MutableList[Expression]() + rewrittenAggregateExpressions.foreach(v => { + val updated = convertAggregate(v, counter, convertUnknown, oneAttr) + updatedExpressions += updated + counter = counter + 1 + }) + updatedExpressions + } else { + rewrittenAggregateExpressions + } } - def makePositionLiteral(expr: Expression, index: Int): PositionLiteral = { - val posLiteral = PositionLiteral(expr, MeasureAggregatorUDT) + def makePositionLiteral(expr: Expression, index: Int, dataType: DataType): PositionLiteral = { + val posLiteral = PositionLiteral(expr, dataType) posLiteral.setPosition(index) posLiteral } - def convertAggregate(current: Expression, index: Int, convertUnknown: Boolean): Expression = { - if (convertUnknown) { + def convertAggregate(current: Expression, + index: Int, + convertUnknown: Boolean, + oneAttr: AttributeReference): Expression = { + if (!convertUnknown && canBeConverted(current)) { current.transform { - case a@SumCarbon(_, _) => a - case a@AverageCarbon(_, _) => a - case a@MinCarbon(_, _) => a - case a@MaxCarbon(_, _) => a - case a@SumDistinctCarbon(_, _) => a - case a@CountDistinctCarbon(_) => a - case a@CountCarbon(_) => a - case anyAggr: AggregateExpression1 => anyAggr + case Average(attr: AttributeReference) => + val convertedDataType = transformArrayType(attr) + CarbonAverage(makePositionLiteral(convertedDataType, index, convertedDataType.dataType)) + case Average(Cast(attr: AttributeReference, dataType)) => + val convertedDataType = transformArrayType(attr) + CarbonAverage( + makePositionLiteral(convertedDataType, index, convertedDataType.dataType)) + case Count(Seq(s: Literal)) => + CarbonCount(s, Some(makePositionLiteral(transformLongType(oneAttr), index, LongType))) + case Count(Seq(attr: AttributeReference)) => + CarbonCount(makePositionLiteral(transformLongType(attr), index, LongType)) + case Sum(attr: AttributeReference) => + Sum(makePositionLiteral(attr, index, attr.dataType)) + case Sum(Cast(attr: AttributeReference, dataType)) => + Sum(Cast(makePositionLiteral(attr, index, attr.dataType), dataType)) + case Min(attr: AttributeReference) => Min(makePositionLiteral(attr, index, attr.dataType)) + case Min(Cast(attr: AttributeReference, dataType)) => + Min(Cast(makePositionLiteral(attr, index, attr.dataType), dataType)) + case Max(attr: AttributeReference) => + Max(makePositionLiteral(attr, index, attr.dataType)) + case Max(Cast(attr: AttributeReference, dataType)) => + Max(Cast(makePositionLiteral(attr, index, attr.dataType), dataType)) } } else { - current.transform { - case a@Sum(attr: AttributeReference) => SumCarbon(makePositionLiteral(attr, index)) - case a@Sum(cast@Cast(attr: AttributeReference, _)) => SumCarbon( - makePositionLiteral(attr, index), cast.dataType) - case a@Average(attr: AttributeReference) => AverageCarbon(makePositionLiteral(attr, index)) - case a@Average(cast@Cast(attr: AttributeReference, _)) => AverageCarbon( - makePositionLiteral(attr, index), cast.dataType) - case a@Min(attr: AttributeReference) => MinCarbon(makePositionLiteral(attr, index)) - case a@Min(cast@Cast(attr: AttributeReference, _)) => MinCarbon( - makePositionLiteral(attr, index), cast.dataType) - case a@Max(attr: AttributeReference) => MaxCarbon(makePositionLiteral(attr, index)) - case a@Max(cast@Cast(attr: AttributeReference, _)) => MaxCarbon( - makePositionLiteral(attr, index), cast.dataType) - case a@SumDistinct(attr: AttributeReference) => SumDistinctCarbon( - makePositionLiteral(attr, index)) - case a@SumDistinct(cast@Cast(attr: AttributeReference, _)) => SumDistinctCarbon( - makePositionLiteral(attr, index), cast.dataType) - case a@CountDistinct(attr: AttributeReference) => CountDistinctCarbon( - makePositionLiteral(attr, index)) - case a@CountDistinct(childSeq) if childSeq.size == 1 => - childSeq.head match { - case attr: AttributeReference => CountDistinctCarbon(makePositionLiteral(attr, index)) - case _ => a - } - case a@Count(s@Literal(_, _)) => - CountCarbon(makePositionLiteral(s, index)) - case a@Count(attr: AttributeReference) => - if (attr.name.equals("*")) { - CountCarbon(makePositionLiteral(Literal("*"), index)) - } else { - CountCarbon(makePositionLiteral(attr, index)) - } - } + current } } + def canBeConverted(current: Expression): Boolean = current match { + case Alias(AggregateExpression(Average(attr: AttributeReference), _, false), _) => true + case Alias(AggregateExpression(Average(Cast(attr: AttributeReference, _)), _, false), _) => true + case Alias(AggregateExpression(Count(Seq(s: Literal)), _, false), _) => true + case Alias(AggregateExpression(Count(Seq(attr: AttributeReference)), _, false), _) => true + case Alias(AggregateExpression(Sum(attr: AttributeReference), _, false), _) => true + case Alias(AggregateExpression(Sum(Cast(attr: AttributeReference, _)), _, false), _) => true + case Alias(AggregateExpression(Min(attr: AttributeReference), _, false), _) => true + case Alias(AggregateExpression(Min(Cast(attr: AttributeReference, _)), _, false), _) => true + case Alias(AggregateExpression(Max(attr: AttributeReference), _, false), _) => true + case Alias(AggregateExpression(Max(Cast(attr: AttributeReference, _)), _, false), _) => true + case _ => false + } + + def transformArrayType(attr: AttributeReference): AttributeReference = { + AttributeReference(attr.name, ArrayType(DoubleType), attr.nullable, attr.metadata)(attr.exprId, + attr.qualifiers) + } + + def transformLongType(attr: AttributeReference): AttributeReference = { + AttributeReference(attr.name, LongType, attr.nullable, attr.metadata)(attr.exprId, + attr.qualifiers) + } + /** * There should be sync between carbonOperators validation and here. we should not convert to * carbon aggregates if the validation does not satisfy. */ - private def canBeConvertedToCarbonAggregate(expressions: Seq[Expression]): Boolean = { + def canBeConvertedToCarbonAggregate(expressions: Seq[Expression]): Boolean = { val detailQuery = expressions.map { case attr@AttributeReference(_, _, _, _) => true - case par: Alias if par.children.head.isInstanceOf[AggregateExpression1] => true + case Alias(agg: AggregateExpression, name) => true case _ => false }.exists(!_) !detailQuery @@ -454,6 +491,7 @@ object PartialAggregation { def unapply(plan: LogicalPlan): Option[ReturnType] = unapply((plan, false)) def unapply(combinedPlan: (LogicalPlan, Boolean)): Option[ReturnType] = { + val oneAttr = getOneAttribute(combinedPlan._1) combinedPlan._1 match { case Aggregate(groupingExpressions, aggregateExpressionsOrig, child) => @@ -463,99 +501,28 @@ object PartialAggregation { aggregateExpressionsOrig } else { - // First calculate partialComputation before converting and then check whether it could - // be converted or not. This type of checks are necessary for queries like - // select sum(col)+10 from table. Here the aggregates are different for - // partialComputation and aggregateExpressionsOrig. So first check on partialComputation - val preCheckEval = getPartialEvaluation(groupingExpressions, aggregateExpressionsOrig) - preCheckEval match { - case Some(allExprs) => - if (canBeConvertedToCarbonAggregate(allExprs._1)) { - convertAggregatesForPushdown(false, aggregateExpressionsOrig) - } else { - aggregateExpressionsOrig - } - case _ => aggregateExpressionsOrig - } + convertAggregatesForPushdown(false, aggregateExpressionsOrig, oneAttr) } - val evaluation = getPartialEvaluation(groupingExpressions, aggregateExpressions) - - evaluation match { - case(Some((partialComputation, - rewrittenAggregateExpressions, - namedGroupingAttributes))) => - // Convert the other aggregations for push down to Carbon layer. - // Here don't touch earlier converted native carbon aggregators. - val convertedPartialComputation = - if (combinedPlan._2) { - partialComputation - } - else { - convertAggregatesForPushdown(true, partialComputation) - .asInstanceOf[Seq[NamedExpression]] - } - - Some( - (namedGroupingAttributes, - rewrittenAggregateExpressions, - groupingExpressions, - convertedPartialComputation, - child)) - case _ => None - } - + Some((groupingExpressions, aggregateExpressions.asInstanceOf[Seq[NamedExpression]], child)) case _ => None } } - def getPartialEvaluation(groupingExpressions: Seq[Expression], - aggregateExpressions: Seq[Expression]): - Option[(Seq[NamedExpression], Seq[NamedExpression], Seq[Attribute])] = { - // Collect all aggregate expressions. - val allAggregates = - aggregateExpressions.flatMap(_ collect { case a: AggregateExpression1 => a }) - // Collect all aggregate expressions that can be computed partially. - val partialAggregates = - aggregateExpressions.flatMap(_ collect { case p: PartialAggregate1 => p }) - - // Only do partial aggregation if supported by all aggregate expressions. - if (allAggregates.size == partialAggregates.size) { - // Create a map of expressions to their partial evaluations for all aggregate expressions. - val partialEvaluations: Map[TreeNodeRef, SplitEvaluation] = - partialAggregates.map(a => (new TreeNodeRef(a), a.asPartial)).toMap - - // We need to pass all grouping expressions though so the grouping can happen a second - // time. However some of them might be unnamed so we alias them allowing them to be - // referenced in the second aggregation. - val namedGroupingExpressions: Map[Expression, NamedExpression] = groupingExpressions.map { - case n: NamedExpression => (n, n) - case other => (other, Alias(other, "PartialGroup")()) - }.toMap - - // Replace aggregations with a new expression that computes the result from the already - // computed partial evaluations and grouping values. - val rewrittenAggregateExpressions = aggregateExpressions.map(_.transformUp { - case e: Expression if partialEvaluations.contains(new TreeNodeRef(e)) => - partialEvaluations(new TreeNodeRef(e)).finalEvaluation - - case e: Expression => - // Should trim aliases around `GetField`s. These aliases are introduced while - // resolving struct field accesses, because `GetField` is not a `NamedExpression`. - // (Should we just turn `GetField` into a `NamedExpression`?) - namedGroupingExpressions.collectFirst { - case (expr, ne) if expr semanticEquals e => ne.toAttribute - }.getOrElse(e) - }).asInstanceOf[Seq[NamedExpression]] - - val partialComputation = - (namedGroupingExpressions.values ++ - partialEvaluations.values.flatMap(_.partialEvaluations)).toSeq - val namedGroupingAttributes = namedGroupingExpressions.values.map(_.toAttribute).toSeq - Some(partialComputation, rewrittenAggregateExpressions, namedGroupingAttributes) + def getOneAttribute(plan: LogicalPlan): AttributeReference = { + var relation: LogicalRelation = null + plan collect { + case l: LogicalRelation => relation = l + } + if (relation != null) { + relation.output.find { p => + p.dataType match { + case n: NumericType => true + case _ => false + } + }.getOrElse(relation.output.head) } else { - None + null } - } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala index 79d8ffa..2bf50da 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql import scala.language.implicitConversions import org.apache.spark.SparkContext +import org.apache.spark.sql.catalyst.ParserDialect import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog} import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -52,7 +53,7 @@ class CarbonContext(val sc: SparkContext, val storePath: String) extends HiveCon override protected[sql] lazy val optimizer: Optimizer = new CarbonOptimizer(DefaultOptimizer, conf) - override protected[sql] def dialectClassName = classOf[CarbonSQLDialect].getCanonicalName + protected[sql] override def getSQLDialect(): ParserDialect = new CarbonSQLDialect(this) experimental.extraStrategies = CarbonStrategy.getStrategy(self) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala index f95acf4..94b38a4 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala @@ -24,6 +24,7 @@ import scala.language.implicitConversions import org.apache.hadoop.fs.Path import org.apache.spark._ +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.logical._ @@ -56,7 +57,11 @@ class CarbonSource case _ => val options = new CarbonOption(parameters) val tableIdentifier = options.tableIdentifier.split("""\.""").toSeq - CarbonDatasourceRelation(tableIdentifier, None)(sqlContext) + val ident = tableIdentifier match { + case Seq(name) => TableIdentifier(name) + case Seq(db, name) => TableIdentifier(name, Some(db)) + } + CarbonDatasourceRelation(ident, None)(sqlContext) } } @@ -120,14 +125,14 @@ class CarbonSource * This relation is stored to hive metastore */ private[sql] case class CarbonDatasourceRelation( - tableIdentifier: Seq[String], + tableIdentifier: TableIdentifier, alias: Option[String]) (@transient context: SQLContext) extends BaseRelation with Serializable with Logging { def carbonRelation: CarbonRelation = { CarbonEnv.getInstance(context) - .carbonCatalog.lookupRelation2(tableIdentifier, None)(sqlContext) + .carbonCatalog.lookupRelation1(tableIdentifier, None)(sqlContext) .asInstanceOf[CarbonRelation] } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala index a4ac246..600519f 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala @@ -32,6 +32,7 @@ import org.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueI import org.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier} import org.carbondata.core.carbon.metadata.datatype.DataType import org.carbondata.core.carbon.metadata.encoder.Encoding +import org.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension import org.carbondata.query.carbon.util.DataTypeUtil /** @@ -62,7 +63,7 @@ case class CarbonDictionaryDecoder( !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) && canBeDecoded(attr)) { val newAttr = AttributeReference(a.name, - convertCarbonToSparkDataType(carbonDimension.getDataType), + convertCarbonToSparkDataType(carbonDimension), a.nullable, a.metadata)(a.exprId, a.qualifiers).asInstanceOf[Attribute] @@ -88,8 +89,8 @@ case class CarbonDictionaryDecoder( } } - def convertCarbonToSparkDataType(dataType: DataType): types.DataType = { - dataType match { + def convertCarbonToSparkDataType(carbonDimension: CarbonDimension): types.DataType = { + carbonDimension.getDataType match { case DataType.STRING => StringType case DataType.INT => IntegerType case DataType.LONG => LongType @@ -125,6 +126,9 @@ case class CarbonDictionaryDecoder( dictIds } + + override def outputsUnsafeRows: Boolean = true + override def doExecute(): RDD[InternalRow] = { attachTree(this, "execute") { val storePath = sqlContext.catalog.asInstanceOf[CarbonMetastoreCatalog].storePath @@ -143,20 +147,21 @@ case class CarbonDictionaryDecoder( val dicts: Seq[Dictionary] = getDictionary(absoluteTableIdentifiers, forwardDictionaryCache) new Iterator[InternalRow] { + val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray) override final def hasNext: Boolean = iter.hasNext override final def next(): InternalRow = { val row: InternalRow = iter.next() val data = row.toSeq(dataTypes).toArray for (i <- data.indices) { - if (dicts(i) != null) { + if (dicts(i) != null && data(i) != null) { data(i) = toType(DataTypeUtil .getDataBasedOnDataType(dicts(i) .getDictionaryValueForKey(data(i).asInstanceOf[Integer]), getDictionaryColumnIds(i)._3)) } } - new GenericMutableRow(data) + unsafeProjection(new GenericMutableRow(data)) } } }