http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/impl/MapBasedResult.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/carbon/result/impl/MapBasedResult.java b/core/src/main/java/org/carbondata/query/carbon/result/impl/MapBasedResult.java deleted file mode 100644 index 42c3027..0000000 --- a/core/src/main/java/org/carbondata/query/carbon/result/impl/MapBasedResult.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.carbondata.query.carbon.result.impl; - -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Map.Entry; - -import org.carbondata.core.constants.CarbonCommonConstants; -import org.carbondata.query.aggregator.MeasureAggregator; -import org.carbondata.query.carbon.result.Result; -import org.carbondata.query.carbon.wrappers.ByteArrayWrapper; - -/** - * To store aggregated result - */ -public class MapBasedResult implements Result<Map<ByteArrayWrapper, MeasureAggregator[]>> { - /** - * iterator over result - */ - private Iterator<Entry<ByteArrayWrapper, MeasureAggregator[]>> resultIterator; - - /** - * result entry - */ - private Entry<ByteArrayWrapper, MeasureAggregator[]> resultEntry; - - /** - * scanned result - */ - private Map<ByteArrayWrapper, MeasureAggregator[]> scannerResult; - - /** - * total number of result - */ - private int resulSize; - - public MapBasedResult() { - scannerResult = new HashMap<ByteArrayWrapper, MeasureAggregator[]>( - CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - this.resultIterator = scannerResult.entrySet().iterator(); - } - - /** - * @return the key - */ - @Override public ByteArrayWrapper getKey() { - resultEntry = this.resultIterator.next(); - return resultEntry.getKey(); - } - - /** - * return the value - */ - @Override public MeasureAggregator[] getValue() { - return resultEntry.getValue(); - } - - /** - * Method to check more result is present - * or not - */ - @Override public boolean hasNext() { - return this.resultIterator.hasNext(); - } - - /*** - * below method will be used to merge the - * scanned result - * - * @param otherResult return to be merged - */ - @Override public void addScannedResult(Map<ByteArrayWrapper, MeasureAggregator[]> scannerResult) { - this.scannerResult = scannerResult; - resulSize = scannerResult.size(); - this.resultIterator = scannerResult.entrySet().iterator(); - } - - /*** - * below method will be used to merge the - * scanned result, in case of map based the - * result we need to aggregate the result - * - * @param otherResult return to be merged - */ - @Override public void merge(Result<Map<ByteArrayWrapper, MeasureAggregator[]>> result) { - ByteArrayWrapper key = null; - MeasureAggregator[] value = null; - Map<ByteArrayWrapper, MeasureAggregator[]> otherResult = result.getResult(); - if (otherResult != null) { - while (resultIterator.hasNext()) { - Entry<ByteArrayWrapper, MeasureAggregator[]> entry = resultIterator.next(); - key = entry.getKey(); - value = entry.getValue(); - MeasureAggregator[] agg = otherResult.get(key); - if (agg != null) { - for (int j = 0; j < agg.length; j++) { - agg[j].merge(value[j]); - } - } else { - otherResult.put(key, value); - } - } - resulSize = otherResult.size(); - this.resultIterator = otherResult.entrySet().iterator(); - this.scannerResult = otherResult; - } - } - - /** - * Return the size of the result - */ - @Override public int size() { - return resulSize; - } - - /** - * @return the complete result - */ - @Override public Map<ByteArrayWrapper, MeasureAggregator[]> getResult() { - return this.scannerResult; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/iterator/AbstractDetailQueryResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/carbon/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/carbondata/query/carbon/result/iterator/AbstractDetailQueryResultIterator.java index bd56ec2..09fa50c 100644 --- a/core/src/main/java/org/carbondata/query/carbon/result/iterator/AbstractDetailQueryResultIterator.java +++ b/core/src/main/java/org/carbondata/query/carbon/result/iterator/AbstractDetailQueryResultIterator.java @@ -39,7 +39,7 @@ import org.carbondata.query.carbon.model.QueryModel; * executing that query are returning a iterator over block and every time next * call will come it will execute the block and return the result */ -public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterator<E> { +public abstract class AbstractDetailQueryResultIterator extends CarbonIterator { /** * LOGGER. @@ -75,7 +75,7 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato /** * current counter to check how blocklet has been executed */ - private long currentCounter; + protected long currentCounter; /** * keep the track of number of blocklet of a block has been executed @@ -138,7 +138,7 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato return currentCounter < totalNumberOfNode; } - protected void updateSliceIndexToBeExecuted() { + protected int updateSliceIndexToBeExecuted() { Arrays.fill(blockIndexToBeExecuted, -1); int currentSliceIndex = 0; int i = 0; @@ -154,7 +154,7 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato break; } } - currentCounter += i; + return i; } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkBasedDetailResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkBasedDetailResultIterator.java b/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkBasedDetailResultIterator.java new file mode 100644 index 0000000..826f816 --- /dev/null +++ b/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkBasedDetailResultIterator.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.carbondata.query.carbon.result.iterator; + +import java.util.List; + +import org.carbondata.core.iterator.CarbonIterator; +import org.carbondata.query.carbon.executor.impl.QueryExecutorProperties; +import org.carbondata.query.carbon.model.QueryModel; +import org.carbondata.query.carbon.result.BatchResult; +import org.carbondata.query.carbon.result.ListBasedResultWrapper; +import org.carbondata.query.carbon.result.Result; +import org.carbondata.query.carbon.result.preparator.QueryResultPreparator; +import org.carbondata.query.carbon.result.preparator.impl.DetailQueryResultPreparatorImpl; + +/** + * Iterator over chunk result + */ +public class ChunkBasedDetailResultIterator extends CarbonIterator<BatchResult> { + + /** + * query result prepartor which will be used to create a query result + */ + private QueryResultPreparator<List<ListBasedResultWrapper>, Object> queryResultPreparator; + + /** + * iterator over result + */ + private CarbonIterator<Result> queryResultIterator; + + public ChunkBasedDetailResultIterator(CarbonIterator<Result> queryResultIterator, + QueryExecutorProperties executerProperties, QueryModel queryModel) { + this.queryResultIterator = queryResultIterator; + this.queryResultPreparator = + new DetailQueryResultPreparatorImpl(executerProperties, queryModel); + + } + + /** + * Returns {@code true} if the iteration has more elements. (In other words, + * returns {@code true} + * + * @return {@code true} if the iteration has more elements + */ + @Override public boolean hasNext() { + return queryResultIterator.hasNext(); + } + + /** + * Returns the next element in the iteration. + * + * @return the next element in the iteration + */ + @Override public BatchResult next() { + return queryResultPreparator.prepareQueryResult(queryResultIterator.next()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkBasedResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkBasedResultIterator.java b/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkBasedResultIterator.java deleted file mode 100644 index 71b311f..0000000 --- a/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkBasedResultIterator.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.carbondata.query.carbon.result.iterator; - -import org.carbondata.core.iterator.CarbonIterator; -import org.carbondata.query.carbon.executor.impl.QueryExecutorProperties; -import org.carbondata.query.carbon.model.QueryModel; -import org.carbondata.query.carbon.result.BatchResult; -import org.carbondata.query.carbon.result.Result; -import org.carbondata.query.carbon.result.preparator.QueryResultPreparator; -import org.carbondata.query.carbon.result.preparator.impl.QueryResultPreparatorImpl; - -/** - * Iterator over chunk result - */ -public class ChunkBasedResultIterator extends CarbonIterator<BatchResult> { - - /** - * query result prepartor which will be used to create a query result - */ - private QueryResultPreparator<BatchResult> queryResultPreparator; - - /** - * iterator over result - */ - private CarbonIterator<Result> queryResultIterator; - - public ChunkBasedResultIterator(CarbonIterator<Result> queryResultIterator, - QueryExecutorProperties executerProperties, QueryModel queryModel) { - this.queryResultIterator = queryResultIterator; - this.queryResultPreparator = new QueryResultPreparatorImpl(executerProperties, queryModel); - - } - - /** - * Returns {@code true} if the iteration has more elements. (In other words, - * returns {@code true} - * - * @return {@code true} if the iteration has more elements - */ - @Override public boolean hasNext() { - return queryResultIterator.hasNext(); - } - - /** - * Returns the next element in the iteration. - * - * @return the next element in the iteration - */ - @Override public BatchResult next() { - return queryResultPreparator.prepareQueryResult(queryResultIterator.next()); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkRawRowIterartor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkRawRowIterartor.java b/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkRawRowIterartor.java deleted file mode 100644 index ea4d65c..0000000 --- a/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkRawRowIterartor.java +++ /dev/null @@ -1,56 +0,0 @@ -package org.carbondata.query.carbon.result.iterator; - -import org.carbondata.core.iterator.CarbonIterator; -import org.carbondata.query.carbon.result.BatchRawResult; - -public class ChunkRawRowIterartor extends CarbonIterator<Object[]> { - - /** - * iterator over chunk result - */ - private CarbonIterator<BatchRawResult> iterator; - - /** - * currect chunk - */ - private BatchRawResult currentchunk; - - public ChunkRawRowIterartor(CarbonIterator<BatchRawResult> iterator) { - this.iterator = iterator; - if (iterator.hasNext()) { - currentchunk = iterator.next(); - } - } - - /** - * Returns {@code true} if the iteration has more elements. (In other words, - * returns {@code true} if {@link #next} would return an element rather than - * throwing an exception.) - * - * @return {@code true} if the iteration has more elements - */ - @Override public boolean hasNext() { - if (null != currentchunk) { - if ((currentchunk.hasNext())) { - return true; - } else if (!currentchunk.hasNext()) { - while (iterator.hasNext()) { - currentchunk = iterator.next(); - if (currentchunk != null && currentchunk.hasNext()) { - return true; - } - } - } - } - return false; - } - - /** - * Returns the next element in the iteration. - * - * @return the next element in the iteration - */ - @Override public Object[] next() { - return currentchunk.next(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkRowIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkRowIterator.java b/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkRowIterator.java index 6ba54cd..3db3404 100644 --- a/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkRowIterator.java +++ b/core/src/main/java/org/carbondata/query/carbon/result/iterator/ChunkRowIterator.java @@ -21,12 +21,11 @@ package org.carbondata.query.carbon.result.iterator; import org.carbondata.core.iterator.CarbonIterator; import org.carbondata.query.carbon.result.BatchResult; -import org.carbondata.query.carbon.result.RowResult; /** * Iterator over row result */ -public class ChunkRowIterator extends CarbonIterator<RowResult> { +public class ChunkRowIterator extends CarbonIterator<Object[]> { /** * iterator over chunk result @@ -73,7 +72,7 @@ public class ChunkRowIterator extends CarbonIterator<RowResult> { * * @return the next element in the iteration */ - @Override public RowResult next() { + @Override public Object[] next() { return currentchunk.next(); } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/iterator/DetailQueryResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/carbon/result/iterator/DetailQueryResultIterator.java b/core/src/main/java/org/carbondata/query/carbon/result/iterator/DetailQueryResultIterator.java index f07eb20..3641e75 100644 --- a/core/src/main/java/org/carbondata/query/carbon/result/iterator/DetailQueryResultIterator.java +++ b/core/src/main/java/org/carbondata/query/carbon/result/iterator/DetailQueryResultIterator.java @@ -29,16 +29,17 @@ import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo; import org.carbondata.query.carbon.executor.internal.InternalQueryExecutor; import org.carbondata.query.carbon.model.QueryModel; import org.carbondata.query.carbon.result.BatchResult; +import org.carbondata.query.carbon.result.ListBasedResultWrapper; import org.carbondata.query.carbon.result.Result; import org.carbondata.query.carbon.result.preparator.QueryResultPreparator; -import org.carbondata.query.carbon.result.preparator.impl.QueryResultPreparatorImpl; +import org.carbondata.query.carbon.result.preparator.impl.DetailQueryResultPreparatorImpl; /** * In case of detail query we cannot keep all the records in memory so for * executing that query are returning a iterator over block and every time next * call will come it will execute the block and return the result */ -public class DetailQueryResultIterator extends AbstractDetailQueryResultIterator<BatchResult> { +public class DetailQueryResultIterator extends AbstractDetailQueryResultIterator { /** * LOGGER. @@ -49,17 +50,18 @@ public class DetailQueryResultIterator extends AbstractDetailQueryResultIterator /** * to prepare the result */ - private QueryResultPreparator<BatchResult> queryResultPreparator; + private QueryResultPreparator<List<ListBasedResultWrapper>, Object> queryResultPreparator; public DetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryExecutorProperties executerProperties, QueryModel queryModel, InternalQueryExecutor queryExecutor) { super(infos, executerProperties, queryModel, queryExecutor); - this.queryResultPreparator = new QueryResultPreparatorImpl(executerProperties, queryModel); + this.queryResultPreparator = + new DetailQueryResultPreparatorImpl(executerProperties, queryModel); } @Override public BatchResult next() { - updateSliceIndexToBeExecuted(); + currentCounter += updateSliceIndexToBeExecuted(); CarbonIterator<Result> result = null; try { result = executor.executeQuery(blockExecutionInfos, blockIndexToBeExecuted); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/iterator/DetailRawQueryResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/carbon/result/iterator/DetailRawQueryResultIterator.java b/core/src/main/java/org/carbondata/query/carbon/result/iterator/DetailRawQueryResultIterator.java index 4f9dbe2..2b14793 100644 --- a/core/src/main/java/org/carbondata/query/carbon/result/iterator/DetailRawQueryResultIterator.java +++ b/core/src/main/java/org/carbondata/query/carbon/result/iterator/DetailRawQueryResultIterator.java @@ -19,6 +19,10 @@ package org.carbondata.query.carbon.result.iterator; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.carbondata.core.iterator.CarbonIterator; import org.carbondata.query.carbon.executor.exception.QueryExecutionException; @@ -26,7 +30,8 @@ import org.carbondata.query.carbon.executor.impl.QueryExecutorProperties; import org.carbondata.query.carbon.executor.infos.BlockExecutionInfo; import org.carbondata.query.carbon.executor.internal.InternalQueryExecutor; import org.carbondata.query.carbon.model.QueryModel; -import org.carbondata.query.carbon.result.BatchRawResult; +import org.carbondata.query.carbon.result.BatchResult; +import org.carbondata.query.carbon.result.ListBasedResultWrapper; import org.carbondata.query.carbon.result.Result; import org.carbondata.query.carbon.result.preparator.QueryResultPreparator; import org.carbondata.query.carbon.result.preparator.impl.RawQueryResultPreparatorImpl; @@ -36,10 +41,13 @@ import org.carbondata.query.carbon.result.preparator.impl.RawQueryResultPreparat * executing that query are returning a iterator over block and every time next * call will come it will execute the block and return the result */ -public class DetailRawQueryResultIterator - extends AbstractDetailQueryResultIterator<BatchRawResult> { +public class DetailRawQueryResultIterator extends AbstractDetailQueryResultIterator { - private QueryResultPreparator<BatchRawResult> queryResultPreparator; + private ExecutorService execService = Executors.newFixedThreadPool(1); + + private Future<ResultInfo> future; + + private QueryResultPreparator<List<ListBasedResultWrapper>, Object> queryResultPreparator; public DetailRawQueryResultIterator(List<BlockExecutionInfo> infos, QueryExecutorProperties executerProperties, QueryModel queryModel, @@ -48,26 +56,63 @@ public class DetailRawQueryResultIterator this.queryResultPreparator = new RawQueryResultPreparatorImpl(executerProperties, queryModel); } - @Override public BatchRawResult next() { - updateSliceIndexToBeExecuted(); - CarbonIterator<Result> result = null; - try { - result = executor.executeQuery(blockExecutionInfos, blockIndexToBeExecuted); - } catch (QueryExecutionException ex) { - throw new RuntimeException(ex.getCause()); + @Override public BatchResult next() { + BatchResult result; + if (future == null) { + future = execute(); } - for (int i = 0; i < blockIndexToBeExecuted.length; i++) { - if (blockIndexToBeExecuted[i] != -1) { - blockExecutionInfos.get(blockIndexToBeExecuted[i]).setFirstDataBlock( - blockExecutionInfos.get(blockIndexToBeExecuted[i]).getFirstDataBlock() - .getNextDataRefNode()); - } + ResultInfo resultFromFuture = getResultFromFuture(future); + result = resultFromFuture.result; + currentCounter += resultFromFuture.counter; + if (hasNext()) { + future = execute(); } - if (null != result) { - Result next = result.next(); - return queryResultPreparator.prepareQueryResult(next); - } else { - return queryResultPreparator.prepareQueryResult(null); + return result; + } + + private ResultInfo getResultFromFuture(Future<ResultInfo> future) { + try { + return future.get(); + } catch (Exception e) { + e.printStackTrace(); } + return new ResultInfo(); + } + + private Future<ResultInfo> execute() { + return execService.submit(new Callable<ResultInfo>() { + @Override public ResultInfo call() { + CarbonIterator<Result> result = null; + int counter = updateSliceIndexToBeExecuted(); + try { + result = executor.executeQuery(blockExecutionInfos, blockIndexToBeExecuted); + } catch (QueryExecutionException ex) { + throw new RuntimeException(ex.getCause()); + } + for (int i = 0; i < blockIndexToBeExecuted.length; i++) { + if (blockIndexToBeExecuted[i] != -1) { + blockExecutionInfos.get(blockIndexToBeExecuted[i]).setFirstDataBlock( + blockExecutionInfos.get(blockIndexToBeExecuted[i]).getFirstDataBlock() + .getNextDataRefNode()); + } + } + BatchResult batchResult; + if (null != result) { + Result next = result.next(); + batchResult = queryResultPreparator.prepareQueryResult(next); + } else { + batchResult = queryResultPreparator.prepareQueryResult(null); + } + ResultInfo resultInfo = new ResultInfo(); + resultInfo.counter = counter; + resultInfo.result = batchResult; + return resultInfo; + } + }); + } + + private static class ResultInfo { + private int counter; + private BatchResult result; } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/preparator/QueryResultPreparator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/carbon/result/preparator/QueryResultPreparator.java b/core/src/main/java/org/carbondata/query/carbon/result/preparator/QueryResultPreparator.java index 431e163..fbf3074 100644 --- a/core/src/main/java/org/carbondata/query/carbon/result/preparator/QueryResultPreparator.java +++ b/core/src/main/java/org/carbondata/query/carbon/result/preparator/QueryResultPreparator.java @@ -1,9 +1,10 @@ package org.carbondata.query.carbon.result.preparator; +import org.carbondata.query.carbon.result.BatchResult; import org.carbondata.query.carbon.result.Result; -public interface QueryResultPreparator<E> { +public interface QueryResultPreparator<K, V> { - public E prepareQueryResult(Result scannedResult); + public BatchResult prepareQueryResult(Result<K, V> scannedResult); } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/AbstractQueryResultPreparator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/AbstractQueryResultPreparator.java b/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/AbstractQueryResultPreparator.java index 1890baf..ad5b4c5 100644 --- a/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/AbstractQueryResultPreparator.java +++ b/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/AbstractQueryResultPreparator.java @@ -1,17 +1,19 @@ package org.carbondata.query.carbon.result.preparator.impl; -import java.util.ArrayList; import java.util.List; -import org.carbondata.query.aggregator.MeasureAggregator; +import org.carbondata.core.carbon.metadata.encoder.Encoding; +import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; +import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; +import org.carbondata.core.util.CarbonUtil; import org.carbondata.query.carbon.executor.impl.QueryExecutorProperties; +import org.carbondata.query.carbon.model.QueryDimension; import org.carbondata.query.carbon.model.QueryModel; import org.carbondata.query.carbon.result.BatchResult; import org.carbondata.query.carbon.result.preparator.QueryResultPreparator; -import org.carbondata.query.scanner.impl.CarbonKey; -import org.carbondata.query.scanner.impl.CarbonValue; +import org.carbondata.query.carbon.util.DataTypeUtil; -public abstract class AbstractQueryResultPreparator<E> implements QueryResultPreparator<E> { +public abstract class AbstractQueryResultPreparator<K, V> implements QueryResultPreparator<K, V> { /** * query properties @@ -29,13 +31,35 @@ public abstract class AbstractQueryResultPreparator<E> implements QueryResultPre this.queryModel = queryModel; } - protected void fillMeasureValueForAggGroupByQuery(QueryModel queryModel, - Object[][] surrogateResult, int dimensionCount, int columnIndex, MeasureAggregator[] v) { - int msrCount = queryModel.getQueryMeasures().size(); - for (int i = 0; i < msrCount; i++) { - v[queryExecuterProperties.measureStartIndex + i] = - ((MeasureAggregator) surrogateResult[dimensionCount - + queryExecuterProperties.measureStartIndex + i][columnIndex]); + protected void fillDimensionData(Object[][] convertedResult, List<QueryDimension> queryDimensions, + int dimensionCount, Object[] row, int rowIndex) { + QueryDimension queryDimension; + for (int i = 0; i < dimensionCount; i++) { + queryDimension = queryDimensions.get(i); + if (!CarbonUtil + .hasEncoding(queryDimension.getDimension().getEncoder(), Encoding.DICTIONARY)) { + row[queryDimension.getQueryOrder()] = convertedResult[i][rowIndex]; + } else if (CarbonUtil + .hasEncoding(queryDimension.getDimension().getEncoder(), Encoding.DIRECT_DICTIONARY)) { + DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory + .getDirectDictionaryGenerator(queryDimension.getDimension().getDataType()); + row[queryDimension.getQueryOrder()] = directDictionaryGenerator + .getValueFromSurrogate((Integer) convertedResult[i][rowIndex]); + } else { + if (queryExecuterProperties.sortDimIndexes[i] == 1) { + row[queryDimension.getQueryOrder()] = DataTypeUtil.getDataBasedOnDataType( + queryExecuterProperties.columnToDictionayMapping + .get(queryDimension.getDimension().getColumnId()) + .getDictionaryValueFromSortedIndex((Integer) convertedResult[i][rowIndex]), + queryDimension.getDimension().getDataType()); + } else { + row[queryDimension.getQueryOrder()] = DataTypeUtil.getDataBasedOnDataType( + queryExecuterProperties.columnToDictionayMapping + .get(queryDimension.getDimension().getColumnId()) + .getDictionaryValueForKey((Integer) convertedResult[i][rowIndex]), + queryDimension.getDimension().getDataType()); + } + } } } @@ -54,18 +78,9 @@ public abstract class AbstractQueryResultPreparator<E> implements QueryResultPre } protected BatchResult getEmptyChunkResult(int size) { - List<CarbonKey> keys = new ArrayList<CarbonKey>(size); - List<CarbonValue> values = new ArrayList<CarbonValue>(size); - Object[] row = new Object[1]; - for (int i = 0; i < size; i++) - - { - values.add(new CarbonValue(new MeasureAggregator[0])); - keys.add(new CarbonKey(row)); - } + Object[][] row = new Object[size][1]; BatchResult chunkResult = new BatchResult(); - chunkResult.setKeys(keys); - chunkResult.setValues(values); + chunkResult.setRows(row); return chunkResult; } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/DetailQueryResultPreparatorImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/DetailQueryResultPreparatorImpl.java b/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/DetailQueryResultPreparatorImpl.java new file mode 100644 index 0000000..712894a --- /dev/null +++ b/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/DetailQueryResultPreparatorImpl.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.carbondata.query.carbon.result.preparator.impl; + +import java.nio.charset.Charset; +import java.util.List; + +import org.carbondata.common.logging.LogService; +import org.carbondata.common.logging.LogServiceFactory; +import org.carbondata.core.carbon.metadata.encoder.Encoding; +import org.carbondata.core.constants.CarbonCommonConstants; +import org.carbondata.core.util.CarbonUtil; +import org.carbondata.query.carbon.executor.impl.QueryExecutorProperties; +import org.carbondata.query.carbon.model.QueryDimension; +import org.carbondata.query.carbon.model.QueryMeasure; +import org.carbondata.query.carbon.model.QueryModel; +import org.carbondata.query.carbon.result.BatchResult; +import org.carbondata.query.carbon.result.ListBasedResultWrapper; +import org.carbondata.query.carbon.result.Result; +import org.carbondata.query.carbon.util.DataTypeUtil; +import org.carbondata.query.carbon.wrappers.ByteArrayWrapper; + +/** + * Below class will be used to get the result by converting to actual data + * Actual data conversion can be converting the surrogate key to actual data + * + * @TODO there are many things in class which is very confusing, need to check + * why it was handled like that and how we can handle that in a better + * way.Need to revisit this class. IF aggregation is push down to spark + * layer and if we can process the data in byte array format then this + * class wont be useful so in future we can delete this class. + * @TODO need to expose one interface which will return the result based on required type + * for example its implementation case return converted result or directly result with out + * converting to actual value + */ +public class DetailQueryResultPreparatorImpl + extends AbstractQueryResultPreparator<List<ListBasedResultWrapper>, Object> { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(DetailQueryResultPreparatorImpl.class.getName()); + + public DetailQueryResultPreparatorImpl(QueryExecutorProperties executerProperties, + QueryModel queryModel) { + super(executerProperties, queryModel); + } + + @Override public BatchResult prepareQueryResult( + Result<List<ListBasedResultWrapper>, Object> scannedResult) { + if ((null == scannedResult || scannedResult.size() < 1)) { + return new BatchResult(); + } + List<QueryDimension> queryDimension = queryModel.getQueryDimension(); + int dimensionCount = queryDimension.size(); + int totalNumberOfColumn = dimensionCount + queryExecuterProperties.measureDataTypes.length; + Object[][] resultData = new Object[scannedResult.size()][totalNumberOfColumn]; + if (!queryExecuterProperties.isFunctionQuery && totalNumberOfColumn == 0 + && scannedResult.size() > 0) { + return getEmptyChunkResult(scannedResult.size()); + } + int currentRow = 0; + long[] surrogateResult = null; + int noDictionaryColumnIndex = 0; + ByteArrayWrapper key = null; + Object[] value = null; + while (scannedResult.hasNext()) { + key = scannedResult.getKey(); + value = scannedResult.getValue(); + if (key != null) { + surrogateResult = queryExecuterProperties.keyStructureInfo.getKeyGenerator() + .getKeyArray(key.getDictionaryKey(), + queryExecuterProperties.keyStructureInfo.getMaskedBytes()); + for (int i = 0; i < dimensionCount; i++) { + if (!CarbonUtil.hasEncoding(queryDimension.get(i).getDimension().getEncoder(), + Encoding.DICTIONARY)) { + resultData[currentRow][i] = DataTypeUtil.getDataBasedOnDataType( + new String(key.getNoDictionaryKeyByIndex(noDictionaryColumnIndex++), + Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)), + queryDimension.get(i).getDimension().getDataType()); + } else { + resultData[currentRow][i] = + (int) surrogateResult[queryDimension.get(i).getDimension().getKeyOrdinal()]; + } + } + } + if (value != null) { + System.arraycopy(value, 0, resultData[currentRow], dimensionCount, + queryExecuterProperties.measureDataTypes.length); + } + currentRow++; + noDictionaryColumnIndex = 0; + } + if (resultData.length > 0) { + resultData = encodeToRows(resultData); + } + return getResult(queryModel, resultData); + } + + private BatchResult getResult(QueryModel queryModel, Object[][] convertedResult) { + + int rowSize = convertedResult[0].length; + Object[][] rows = new Object[rowSize][]; + List<QueryDimension> queryDimensions = queryModel.getQueryDimension(); + int dimensionCount = queryDimensions.size(); + int msrCount = queryExecuterProperties.measureDataTypes.length; + Object[] row; + for (int rowIndex = 0; rowIndex < rowSize; rowIndex++) { + row = new Object[dimensionCount + msrCount]; + fillDimensionData(convertedResult, queryDimensions, dimensionCount, row, rowIndex); + + QueryMeasure msr; + for (int i = 0; i < queryModel.getQueryMeasures().size(); i++) { + msr = queryModel.getQueryMeasures().get(i); + row[msr.getQueryOrder()] = convertedResult[dimensionCount + i][rowIndex]; + } + rows[rowIndex] = row; + } + LOGGER.info( + "###########################################------ Total Number of records" + rowSize); + BatchResult chunkResult = new BatchResult(); + chunkResult.setRows(rows); + return chunkResult; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/QueryResultPreparatorImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/QueryResultPreparatorImpl.java b/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/QueryResultPreparatorImpl.java deleted file mode 100644 index 5604ecd..0000000 --- a/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/QueryResultPreparatorImpl.java +++ /dev/null @@ -1,297 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.carbondata.query.carbon.result.preparator.impl; - -import java.math.BigDecimal; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import org.carbondata.common.logging.LogService; -import org.carbondata.common.logging.LogServiceFactory; -import org.carbondata.core.carbon.metadata.encoder.Encoding; -import org.carbondata.core.constants.CarbonCommonConstants; -import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; -import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; -import org.carbondata.core.util.CarbonUtil; -import org.carbondata.query.aggregator.MeasureAggregator; -import org.carbondata.query.aggregator.impl.count.CountAggregator; -import org.carbondata.query.aggregator.impl.distinct.DistinctCountAggregator; -import org.carbondata.query.aggregator.impl.distinct.DistinctStringCountAggregator; -import org.carbondata.query.carbon.executor.impl.QueryExecutorProperties; -import org.carbondata.query.carbon.model.DimensionAggregatorInfo; -import org.carbondata.query.carbon.model.QueryDimension; -import org.carbondata.query.carbon.model.QueryMeasure; -import org.carbondata.query.carbon.model.QueryModel; -import org.carbondata.query.carbon.result.BatchResult; -import org.carbondata.query.carbon.result.Result; -import org.carbondata.query.carbon.util.DataTypeUtil; -import org.carbondata.query.carbon.wrappers.ByteArrayWrapper; -import org.carbondata.query.scanner.impl.CarbonKey; -import org.carbondata.query.scanner.impl.CarbonValue; - -/** - * Below class will be used to get the result by converting to actual data - * Actual data conversion can be converting the surrogate key to actual data - * - * @TODO there are many things in class which is very confusing, need to check - * why it was handled like that and how we can handle that in a better - * way.Need to revisit this class. IF aggregation is push down to spark - * layer and if we can process the data in byte array format then this - * class wont be useful so in future we can delete this class. - * @TODO need to expose one interface which will return the result based on required type - * for example its implementation case return converted result or directly result with out - * converting to actual value - */ -public class QueryResultPreparatorImpl extends AbstractQueryResultPreparator<BatchResult> { - - private static final LogService LOGGER = - LogServiceFactory.getLogService(QueryResultPreparatorImpl.class.getName()); - - public QueryResultPreparatorImpl(QueryExecutorProperties executerProperties, - QueryModel queryModel) { - super(executerProperties, queryModel); - } - - @Override public BatchResult prepareQueryResult(Result scannedResult) { - if ((null == scannedResult || scannedResult.size() < 1)) { - return new BatchResult(); - } - List<QueryDimension> queryDimension = queryModel.getQueryDimension(); - int dimensionCount = queryDimension.size(); - int totalNumberOfColumn = dimensionCount + queryExecuterProperties.measureAggregators.length; - Object[][] resultData = new Object[scannedResult.size()][totalNumberOfColumn]; - if (!queryExecuterProperties.isFunctionQuery && totalNumberOfColumn == 0 - && scannedResult.size() > 0) { - return getEmptyChunkResult(scannedResult.size()); - } - int currentRow = 0; - long[] surrogateResult = null; - int noDictionaryColumnIndex = 0; - ByteArrayWrapper key = null; - MeasureAggregator[] value = null; - while (scannedResult.hasNext()) { - key = scannedResult.getKey(); - value = scannedResult.getValue(); - surrogateResult = queryExecuterProperties.keyStructureInfo.getKeyGenerator() - .getKeyArray(key.getDictionaryKey(), - queryExecuterProperties.keyStructureInfo.getMaskedBytes()); - for (int i = 0; i < dimensionCount; i++) { - if (!CarbonUtil - .hasEncoding(queryDimension.get(i).getDimension().getEncoder(), Encoding.DICTIONARY)) { - resultData[currentRow][i] = DataTypeUtil.getDataBasedOnDataType( - new String(key.getNoDictionaryKeyByIndex(noDictionaryColumnIndex++), - Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)), - queryDimension.get(i).getDimension().getDataType()); - } else { - resultData[currentRow][i] = - (int) surrogateResult[queryDimension.get(i).getDimension().getKeyOrdinal()]; - } - } - - // @TODO need to check why it was handled like this - if (queryExecuterProperties.isFunctionQuery) { - if (value[0].toString().contains("Long")) { - Long sizeOfListL = value[0].getLongValue(); - return getEmptyChunkResult(sizeOfListL.intValue()); - } else if (value[0].toString().contains("Decimal")) { - BigDecimal sizeOfListD = value[0].getBigDecimalValue(); - return getEmptyChunkResult(sizeOfListD.intValue()); - } else { - Double sizeOfList = value[0].getDoubleValue(); - return getEmptyChunkResult(sizeOfList.intValue()); - } - - } - for (int i = 0; i < queryExecuterProperties.measureAggregators.length; i++) { - resultData[currentRow][dimensionCount + i] = value[i]; - } - currentRow++; - noDictionaryColumnIndex = 0; - } - if (resultData.length > 0) { - resultData = encodeToRows(resultData); - } - return getResult(queryModel, resultData); - } - - private BatchResult getResult(QueryModel queryModel, Object[][] convertedResult) { - - List<CarbonKey> keys = new ArrayList<CarbonKey>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - List<CarbonValue> values = - new ArrayList<CarbonValue>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - List<QueryDimension> queryDimensions = queryModel.getQueryDimension(); - int dimensionCount = queryDimensions.size(); - int msrCount = queryExecuterProperties.measureAggregators.length; - Object[][] resultDataA = null; - // @TODO no sure why this check is here as in the caller of this method - // is returning in case of - // function query. Need to confirm with other developer who handled this - // scneario - if (queryExecuterProperties.isFunctionQuery) { - msrCount = 1; - resultDataA = new Object[dimensionCount + msrCount][msrCount]; - } else { - resultDataA = new Object[dimensionCount + msrCount][convertedResult[0].length]; - } - Object[] row = null; - QueryDimension queryDimension = null; - for (int columnIndex = 0; columnIndex < resultDataA[0].length; columnIndex++) { - row = new Object[dimensionCount + msrCount]; - for (int i = 0; i < dimensionCount; i++) { - queryDimension = queryDimensions.get(i); - if (!CarbonUtil - .hasEncoding(queryDimension.getDimension().getEncoder(), Encoding.DICTIONARY)) { - row[queryDimension.getQueryOrder()] = convertedResult[i][columnIndex]; - } else if (CarbonUtil - .hasEncoding(queryDimension.getDimension().getEncoder(), Encoding.DIRECT_DICTIONARY)) { - DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory - .getDirectDictionaryGenerator(queryDimension.getDimension().getDataType()); - row[queryDimension.getQueryOrder()] = directDictionaryGenerator - .getValueFromSurrogate((Integer) convertedResult[i][columnIndex]); - } else { - if (queryExecuterProperties.sortDimIndexes[i] == 1) { - row[queryDimension.getQueryOrder()] = DataTypeUtil.getDataBasedOnDataType( - queryExecuterProperties.columnToDictionayMapping - .get(queryDimension.getDimension().getColumnId()) - .getDictionaryValueFromSortedIndex((Integer) convertedResult[i][columnIndex]), - queryDimension.getDimension().getDataType()); - } else { - row[queryDimension.getQueryOrder()] = DataTypeUtil.getDataBasedOnDataType( - queryExecuterProperties.columnToDictionayMapping - .get(queryDimension.getDimension().getColumnId()) - .getDictionaryValueForKey((Integer) convertedResult[i][columnIndex]), - queryDimension.getDimension().getDataType()); - } - } - } - MeasureAggregator[] msrAgg = - new MeasureAggregator[queryExecuterProperties.measureAggregators.length]; - - fillMeasureValueForAggGroupByQuery(queryModel, convertedResult, dimensionCount, columnIndex, - msrAgg); - fillDimensionAggValue(queryModel, convertedResult, dimensionCount, columnIndex, msrAgg); - - if (!queryModel.isDetailQuery()) { - for (int i = 0; i < queryModel.getQueryMeasures().size(); i++) { - row[queryModel.getQueryMeasures().get(i).getQueryOrder()] = - msrAgg[queryExecuterProperties.measureStartIndex + i].get(); - } - int index = 0; - for (int i = 0; i < queryModel.getDimAggregationInfo().size(); i++) { - DimensionAggregatorInfo dimensionAggregatorInfo = - queryModel.getDimAggregationInfo().get(i); - for (int j = 0; j < dimensionAggregatorInfo.getOrderList().size(); j++) { - row[dimensionAggregatorInfo.getOrderList().get(j)] = msrAgg[index++].get(); - } - } - for (int i = 0; i < queryModel.getExpressions().size(); i++) { - row[queryModel.getExpressions().get(i).getQueryOrder()] = - ((MeasureAggregator) convertedResult[dimensionCount - + queryExecuterProperties.aggExpressionStartIndex + i][columnIndex]).get(); - } - } else { - QueryMeasure msr = null; - for (int i = 0; i < queryModel.getQueryMeasures().size(); i++) { - msr = queryModel.getQueryMeasures().get(i); - if (msrAgg[queryExecuterProperties.measureStartIndex + i].isFirstTime()) { - row[msr.getQueryOrder()] = null; - } else { - Object msrVal; - switch (msr.getMeasure().getDataType()) { - case LONG: - msrVal = msrAgg[queryExecuterProperties.measureStartIndex + i].getLongValue(); - break; - case DECIMAL: - msrVal = msrAgg[queryExecuterProperties.measureStartIndex + i].getBigDecimalValue(); - break; - default: - msrVal = msrAgg[queryExecuterProperties.measureStartIndex + i].getDoubleValue(); - } - row[msr.getQueryOrder()] = DataTypeUtil - .getMeasureDataBasedOnDataType(msrVal,msr.getMeasure().getDataType()); - } - } - } - values.add(new CarbonValue(new MeasureAggregator[0])); - keys.add(new CarbonKey(row)); - } - LOGGER.info("###########################################------ Total Number of records" - + resultDataA[0].length); - BatchResult chunkResult = new BatchResult(); - chunkResult.setKeys(keys); - chunkResult.setValues(values); - return chunkResult; - } - - private void fillDimensionAggValue(QueryModel queryModel, Object[][] surrogateResult, - int dimensionCount, int columnIndex, MeasureAggregator[] v) { - Iterator<DimensionAggregatorInfo> dimAggInfoIterator = - queryModel.getDimAggregationInfo().iterator(); - DimensionAggregatorInfo dimensionAggregatorInfo = null; - List<String> partitionColumns = queryModel.getParitionColumns(); - int rowIndex = -1; - int index = 0; - while (dimAggInfoIterator.hasNext()) { - dimensionAggregatorInfo = dimAggInfoIterator.next(); - for (int j = 0; j < dimensionAggregatorInfo.getAggList().size(); j++) { - ++rowIndex; - if (!dimensionAggregatorInfo.getAggList().get(j) - .equals(CarbonCommonConstants.DISTINCT_COUNT)) { - v[index++] = - ((MeasureAggregator) surrogateResult[dimensionCount + rowIndex][columnIndex]); - } else if (partitionColumns.size() == 1 && partitionColumns - .contains(dimensionAggregatorInfo.getColumnName()) && dimensionAggregatorInfo - .getAggList().get(j).equals(CarbonCommonConstants.DISTINCT_COUNT)) { - double value = - ((MeasureAggregator) surrogateResult[dimensionCount + rowIndex][columnIndex]) - .getDoubleValue(); - - MeasureAggregator countAggregator = new CountAggregator(); - countAggregator.setNewValue(value); - v[index++] = countAggregator; - } else { - if (surrogateResult[dimensionCount - + rowIndex][columnIndex] instanceof DistinctCountAggregator) { - - Iterator<Integer> iterator = - ((DistinctCountAggregator) surrogateResult[dimensionCount + rowIndex][columnIndex]) - .getBitMap().iterator(); - - MeasureAggregator distinctCountAggregatorObjct = new DistinctStringCountAggregator(); - while (iterator.hasNext()) { - String member = queryExecuterProperties.columnToDictionayMapping - .get(dimensionAggregatorInfo.getDim().getColumnId()) - .getDictionaryValueForKey(iterator.next()); - if (!member.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) { - distinctCountAggregatorObjct.agg(member); - } - } - v[index++] = distinctCountAggregatorObjct; - } else { - v[index++] = - ((MeasureAggregator) surrogateResult[dimensionCount + rowIndex][columnIndex]); - } - } - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/RawQueryResultPreparatorImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/RawQueryResultPreparatorImpl.java b/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/RawQueryResultPreparatorImpl.java index 0eb60ff..0ae6651 100644 --- a/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/RawQueryResultPreparatorImpl.java +++ b/core/src/main/java/org/carbondata/query/carbon/result/preparator/impl/RawQueryResultPreparatorImpl.java @@ -1,22 +1,24 @@ package org.carbondata.query.carbon.result.preparator.impl; +import java.util.List; + import org.carbondata.common.logging.LogService; import org.carbondata.common.logging.LogServiceFactory; -import org.carbondata.query.aggregator.MeasureAggregator; import org.carbondata.query.carbon.executor.impl.QueryExecutorProperties; import org.carbondata.query.carbon.model.QueryDimension; import org.carbondata.query.carbon.model.QueryMeasure; import org.carbondata.query.carbon.model.QueryModel; import org.carbondata.query.carbon.model.QuerySchemaInfo; import org.carbondata.query.carbon.result.BatchRawResult; +import org.carbondata.query.carbon.result.BatchResult; +import org.carbondata.query.carbon.result.ListBasedResultWrapper; import org.carbondata.query.carbon.result.Result; -import org.carbondata.query.carbon.util.DataTypeUtil; -import org.carbondata.query.carbon.wrappers.ByteArrayWrapper; /** * It does not decode the dictionary. */ -public class RawQueryResultPreparatorImpl extends AbstractQueryResultPreparator<BatchRawResult> { +public class RawQueryResultPreparatorImpl + extends AbstractQueryResultPreparator<List<ListBasedResultWrapper>, Object> { private static final LogService LOGGER = LogServiceFactory.getLogService(RawQueryResultPreparatorImpl.class.getName()); @@ -33,7 +35,7 @@ public class RawQueryResultPreparatorImpl extends AbstractQueryResultPreparator< .toArray(new QueryDimension[queryModel.getQueryDimension().size()])); querySchemaInfo.setQueryMeasures(queryModel.getQueryMeasures() .toArray(new QueryMeasure[queryModel.getQueryMeasures().size()])); - int msrSize = queryExecuterProperties.measureAggregators.length; + int msrSize = queryExecuterProperties.measureDataTypes.length; int dimensionCount = queryModel.getQueryDimension().size(); int[] queryOrder = new int[dimensionCount + msrSize]; int[] queryReverseOrder = new int[dimensionCount + msrSize]; @@ -49,75 +51,34 @@ public class RawQueryResultPreparatorImpl extends AbstractQueryResultPreparator< querySchemaInfo.setQueryReverseOrder(queryReverseOrder); } - @Override public BatchRawResult prepareQueryResult(Result scannedResult) { + @Override public BatchResult prepareQueryResult( + Result<List<ListBasedResultWrapper>, Object> scannedResult) { if ((null == scannedResult || scannedResult.size() < 1)) { - BatchRawResult batchRawResult = new BatchRawResult(new Object[0][0]); + BatchRawResult batchRawResult = new BatchRawResult(); batchRawResult.setQuerySchemaInfo(querySchemaInfo); return batchRawResult; } - int msrSize = queryExecuterProperties.measureAggregators.length; - int totalNumberOfColumn = msrSize + 1; - Object[][] resultData = new Object[scannedResult.size()][totalNumberOfColumn]; - int currentRow = 0; - ByteArrayWrapper key = null; - MeasureAggregator[] value = null; + int msrSize = queryExecuterProperties.measureDataTypes.length; + Object[][] resultData = new Object[scannedResult.size()][]; + Object[] value; + Object[] row; + int counter = 0; while (scannedResult.hasNext()) { - key = scannedResult.getKey(); value = scannedResult.getValue(); - resultData[currentRow][0] = key; - for (int i = 0; i < msrSize; i++) { - resultData[currentRow][1 + i] = value[i]; + row = new Object[msrSize + 1]; + row[0] = scannedResult.getKey(); + if(value != null) { + System.arraycopy(value, 0, row, 1, msrSize); } - currentRow++; - } - - if (resultData.length > 0) { - resultData = encodeToRows(resultData); + resultData[counter] = row; + counter ++; } - BatchRawResult result = getResult(queryModel, resultData); + LOGGER.info("###########################---- Total Number of records" + scannedResult.size()); + BatchRawResult result = new BatchRawResult(); + result.setRows(resultData); result.setQuerySchemaInfo(querySchemaInfo); return result; } - - private BatchRawResult getResult(QueryModel queryModel, Object[][] convertedResult) { - - int msrCount = queryExecuterProperties.measureAggregators.length; - Object[][] resultDataA = new Object[1 + msrCount][convertedResult[0].length]; - - for (int columnIndex = 0; columnIndex < resultDataA[0].length; columnIndex++) { - resultDataA[0][columnIndex] = convertedResult[0][columnIndex]; - MeasureAggregator[] msrAgg = - new MeasureAggregator[queryExecuterProperties.measureAggregators.length]; - - fillMeasureValueForAggGroupByQuery(queryModel, convertedResult, 1, columnIndex, msrAgg); - - QueryMeasure msr = null; - for (int i = 0; i < queryModel.getQueryMeasures().size(); i++) { - msr = queryModel.getQueryMeasures().get(i); - if (msrAgg[queryExecuterProperties.measureStartIndex + i].isFirstTime()) { - resultDataA[i + 1][columnIndex] = null; - } else { - Object msrVal; - switch (msr.getMeasure().getDataType()) { - case LONG: - msrVal = msrAgg[queryExecuterProperties.measureStartIndex + i].getLongValue(); - break; - case DECIMAL: - msrVal = msrAgg[queryExecuterProperties.measureStartIndex + i].getBigDecimalValue(); - break; - default: - msrVal = msrAgg[queryExecuterProperties.measureStartIndex + i].getDoubleValue(); - } - resultDataA[i + 1][columnIndex] = DataTypeUtil - .getMeasureDataBasedOnDataType(msrVal, - msr.getMeasure().getDataType()); - } - } - } - LOGGER.info("###########################################------ Total Number of records" - + resultDataA[0].length); - return new BatchRawResult(resultDataA); - } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/filter/executer/RowLevelFilterExecuterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/filter/executer/RowLevelFilterExecuterImpl.java b/core/src/main/java/org/carbondata/query/filter/executer/RowLevelFilterExecuterImpl.java index ca7c77a..0377580 100644 --- a/core/src/main/java/org/carbondata/query/filter/executer/RowLevelFilterExecuterImpl.java +++ b/core/src/main/java/org/carbondata/query/filter/executer/RowLevelFilterExecuterImpl.java @@ -35,8 +35,6 @@ import org.carbondata.core.constants.CarbonCommonConstants; import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; import org.carbondata.core.util.CarbonUtil; -import org.carbondata.query.aggregator.MeasureAggregator; -import org.carbondata.query.aggregator.util.MeasureAggregatorFactory; import org.carbondata.query.carbon.executor.exception.QueryExecutionException; import org.carbondata.query.carbon.processor.BlocksChunkHolder; import org.carbondata.query.carbon.util.DataTypeUtil; @@ -209,44 +207,25 @@ public class RowLevelFilterExecuterImpl implements FilterExecuter { if (!msrColumnEvalutorInfo.isMeasureExistsInCurrentSlice()) { record[msrColumnEvalutorInfo.getRowIndex()] = msrColumnEvalutorInfo.getDefaultValue(); } else { - if (msrColumnEvalutorInfo.isCustomMeasureValue()) { - MeasureAggregator aggregator = MeasureAggregatorFactory - .getAggregator(msrColumnEvalutorInfo.getAggregator(), - msrColumnEvalutorInfo.getType()); - aggregator.merge( - blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()] - .getMeasureDataHolder().getReadableByteArrayValueByIndex(index)); - switch (msrType) { - case LONG: - record[msrColumnEvalutorInfo.getRowIndex()] = aggregator.getLongValue(); - break; - case DECIMAL: - record[msrColumnEvalutorInfo.getRowIndex()] = aggregator.getBigDecimalValue(); - break; - default: - record[msrColumnEvalutorInfo.getRowIndex()] = aggregator.getDoubleValue(); - } - } else { - Object msrValue; - switch (msrType) { - case LONG: - msrValue = - blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()] - .getMeasureDataHolder().getReadableLongValueByIndex(index); - break; - case DECIMAL: - msrValue = - blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()] - .getMeasureDataHolder().getReadableBigDecimalValueByIndex(index); - break; - default: - msrValue = - blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()] - .getMeasureDataHolder().getReadableDoubleValueByIndex(index); - } - record[msrColumnEvalutorInfo.getRowIndex()] = msrValue; - + Object msrValue; + switch (msrType) { + case LONG: + msrValue = + blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()] + .getMeasureDataHolder().getReadableLongValueByIndex(index); + break; + case DECIMAL: + msrValue = + blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()] + .getMeasureDataHolder().getReadableBigDecimalValueByIndex(index); + break; + default: + msrValue = + blockChunkHolder.getMeasureDataChunk()[msrColumnEvalutorInfo.getColumnIndex()] + .getMeasureDataHolder().getReadableDoubleValueByIndex(index); } + record[msrColumnEvalutorInfo.getRowIndex()] = msrValue; + } } row.setValues(record); @@ -275,7 +254,7 @@ public class RowLevelFilterExecuterImpl implements FilterExecuter { * Read the actual filter member by passing the dictionary value from * the forward dictionary cache which which holds column wise cache * - * @param dimColumnEvaluaatorInfo + * @param dimColumnEvaluatorInfo * @param dictionaryValue * @param forwardDictionary * @return http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/filter/resolver/resolverinfo/MeasureColumnResolvedFilterInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/filter/resolver/resolverinfo/MeasureColumnResolvedFilterInfo.java b/core/src/main/java/org/carbondata/query/filter/resolver/resolverinfo/MeasureColumnResolvedFilterInfo.java index 9cefbdb..e6877d5 100644 --- a/core/src/main/java/org/carbondata/query/filter/resolver/resolverinfo/MeasureColumnResolvedFilterInfo.java +++ b/core/src/main/java/org/carbondata/query/filter/resolver/resolverinfo/MeasureColumnResolvedFilterInfo.java @@ -31,8 +31,6 @@ public class MeasureColumnResolvedFilterInfo implements Serializable { private int rowIndex = -1; - private boolean isCustomMeasureValue; - private Object uniqueValue; private String aggregator; @@ -59,14 +57,6 @@ public class MeasureColumnResolvedFilterInfo implements Serializable { this.rowIndex = rowIndex; } - public boolean isCustomMeasureValue() { - return isCustomMeasureValue; - } - - public void setCustomMeasureValue(boolean isCustomMeasureValue) { - this.isCustomMeasureValue = isCustomMeasureValue; - } - public Object getUniqueValue() { return uniqueValue; } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/scanner/impl/CarbonKey.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/scanner/impl/CarbonKey.java b/core/src/main/java/org/carbondata/query/scanner/impl/CarbonKey.java deleted file mode 100644 index 7f6d7f2..0000000 --- a/core/src/main/java/org/carbondata/query/scanner/impl/CarbonKey.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * - */ -package org.carbondata.query.scanner.impl; - -import java.io.Serializable; -import java.util.Arrays; - -/** - * @author R00900208 - */ -public class CarbonKey implements Serializable, Comparable<CarbonKey> { - - /** - * - */ - private static final long serialVersionUID = -8773813519739848506L; - - private Object[] key; - - public CarbonKey(Object[] key) { - this.key = key; - } - - /** - * @return the key - */ - public Object[] getKey() { - return key; - } - - public CarbonKey getSubKey(int size) { - Object[] crop = new Object[size]; - System.arraycopy(key, 0, crop, 0, size); - return new CarbonKey(crop); - } - - /* (non-Javadoc) - * @see java.lang.Object#hashCode() - */ - @Override public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + Arrays.hashCode(key); - return result; - } - - /* (non-Javadoc) - * @see java.lang.Object#equals(java.lang.Object) - */ - @Override public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - CarbonKey other = (CarbonKey) obj; - if (!Arrays.equals(key, other.key)) { - return false; - } - return true; - } - - @Override public String toString() { - return Arrays.toString(key); - } - - @Override public int compareTo(CarbonKey other) { - Object[] oKey = other.key; - - int l = 0; - for (int i = 0; i < key.length; i++) { - l = ((Comparable) key[i]).compareTo(oKey[i]); - if (l != 0) { - return l; - } - } - - return 0; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/core/src/main/java/org/carbondata/query/scanner/impl/CarbonValue.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/carbondata/query/scanner/impl/CarbonValue.java b/core/src/main/java/org/carbondata/query/scanner/impl/CarbonValue.java deleted file mode 100644 index 68e7226..0000000 --- a/core/src/main/java/org/carbondata/query/scanner/impl/CarbonValue.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.carbondata.query.scanner.impl; - -import java.io.Serializable; -import java.util.Arrays; - -import org.carbondata.query.aggregator.MeasureAggregator; - -public class CarbonValue implements Serializable, Comparable<CarbonValue> { - - /** - * - */ - private static final long serialVersionUID = 8034398963696130423L; - - private MeasureAggregator[] values; - - private int topNIndex; - - public CarbonValue(MeasureAggregator[] values) { - this.values = values; - } - - /** - * @return the values - */ - public MeasureAggregator[] getValues() { - return values; - } - - public CarbonValue merge(CarbonValue another) { - for (int i = 0; i < values.length; i++) { - values[i].merge(another.values[i]); - } - return this; - } - - public void setTopNIndex(int index) { - this.topNIndex = index; - } - - public void addGroup(CarbonKey key, CarbonValue value) { - - } - - public CarbonValue mergeKeyVal(CarbonValue another) { - return another; - } - - @Override public String toString() { - return Arrays.toString(values); - } - - @Override public int compareTo(CarbonValue o) { - return values[topNIndex].compareTo(o.values[topNIndex]); - } - - @Override public boolean equals(Object obj) { - if(!(obj instanceof CarbonValue)) { - return false; - } - CarbonValue o = (CarbonValue)obj; - return values[topNIndex].equals(o.values[o.topNIndex]); - } - - @Override public int hashCode() { - return values[topNIndex].hashCode(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/hadoop/src/main/java/org/carbondata/hadoop/CarbonRecordReader.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/carbondata/hadoop/CarbonRecordReader.java index 8ea0104..3d54d96 100644 --- a/hadoop/src/main/java/org/carbondata/hadoop/CarbonRecordReader.java +++ b/hadoop/src/main/java/org/carbondata/hadoop/CarbonRecordReader.java @@ -13,8 +13,8 @@ import org.carbondata.hadoop.readsupport.CarbonReadSupport; import org.carbondata.query.carbon.executor.QueryExecutorFactory; import org.carbondata.query.carbon.executor.exception.QueryExecutionException; import org.carbondata.query.carbon.model.QueryModel; -import org.carbondata.query.carbon.result.BatchRawResult; -import org.carbondata.query.carbon.result.iterator.ChunkRawRowIterartor; +import org.carbondata.query.carbon.result.BatchResult; +import org.carbondata.query.carbon.result.iterator.ChunkRowIterator; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; @@ -48,8 +48,8 @@ public class CarbonRecordReader<T> extends RecordReader<Void, T> { readSupport .intialize(queryModel.getProjectionColumns(), queryModel.getAbsoluteTableIdentifier()); try { - carbonIterator = new ChunkRawRowIterartor( - (CarbonIterator<BatchRawResult>) QueryExecutorFactory.getQueryExecutor(queryModel) + carbonIterator = new ChunkRowIterator( + (CarbonIterator<BatchResult>) QueryExecutorFactory.getQueryExecutor(queryModel) .execute(queryModel)); } catch (QueryExecutionException e) { throw new InterruptedException(e.getMessage()); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6288ec71/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala index f728a32..b6f589d 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala @@ -17,17 +17,12 @@ package org.apache.spark.sql -import scala.collection.mutable.MutableList - -import org.apache.spark.sql.agg.{CarbonAverage, CarbonCount} import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, _} import org.apache.spark.sql.execution.command.tableModel -import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.optimizer.{CarbonAliasDecoderRelation, CarbonDecoderRelation} import org.apache.spark.sql.types._ @@ -202,328 +197,3 @@ case class FakeCarbonCast(child: Literal, dataType: DataType) override def eval(input: InternalRow): Any = child.value } - -/** - * A pattern that matches any number of project or filter operations on top of another relational - * operator. All filter operators are collected and their conditions are broken up and returned - * together with the top project operator. - * [[org.apache.spark.sql.catalyst.expressions.Alias Aliases]] are in-lined/substituted if - * necessary. - */ -object PhysicalOperation1 extends PredicateHelper { - type ReturnType = (Seq[NamedExpression], Seq[Expression], Option[Seq[Expression]], - Option[Seq[SortOrder]], Option[Expression], LogicalPlan) - - def apply(plan: LogicalPlan): Option[ReturnType] = { - val (fields, filters, child, _, groupby, sortOrder, limit) = - collectProjectsAndFilters(plan) - - Some((fields.getOrElse(child.output), filters, groupby, sortOrder, limit, child)) - } - - /** - * Collects projects and filters, in-lining/substituting aliases if necessary. Here are two - * examples for alias in-lining/substitution. Before: - * {{{ - * SELECT c1 FROM (SELECT key AS c1 FROM t1) t2 WHERE c1 > 10 - * SELECT c1 AS c2 FROM (SELECT key AS c1 FROM t1) t2 WHERE c1 > 10 - * }}} - * After: - * {{{ - * SELECT key AS c1 FROM t1 WHERE key > 10 - * SELECT key AS c2 FROM t1 WHERE key > 10 - * }}} - */ - def collectProjectsAndFilters(plan: LogicalPlan): - (Option[Seq[NamedExpression]], Seq[Expression], LogicalPlan, - Map[Attribute, Expression], Option[Seq[Expression]], - Option[Seq[SortOrder]], Option[Expression]) = { - plan match { - case Project(fields, child) => - val (_, filters, other, aliases, groupby, sortOrder, limit) = collectProjectsAndFilters( - child) - val substitutedFields = fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]] - (Some(substitutedFields), filters, other, collectAliases( - substitutedFields), groupby, sortOrder, limit) - - case Filter(condition, child) => - val (fields, filters, other, aliases, groupby, sortOrder, limit) = - collectProjectsAndFilters(child) - val substitutedCondition = substitute(aliases)(condition) - (fields, filters ++ splitConjunctivePredicates( - substitutedCondition), other, aliases, groupby, sortOrder, limit) - - case Aggregate(groupingExpressions, aggregateExpressions, child) => - val (fields, filters, other, aliases, _, sortOrder, limit) = collectProjectsAndFilters( - child) - - var aggExps: Seq[AggregateExpression] = Nil - aggregateExpressions.foreach(v => { - val list = findAggreagateExpression(v) - aggExps = aggExps ++ list - }) - - (fields, filters, other, aliases ++ collectAliases(aggregateExpressions), Some( - aggregateExpressions), sortOrder, limit) - case Sort(order, _, child) => - val (fields, filters, other, aliases, groupby, _, limit) = collectProjectsAndFilters(child) - val substitutedOrder = order.map(s => SortOrder(substitute(aliases)(s.child), s.direction)) - (fields, filters, other, aliases, groupby, Some(substitutedOrder), limit) - case Limit(limitExpr, child) => - val (fields, filters, other, aliases, groupby, sortOrder, _) = collectProjectsAndFilters( - child) - (fields, filters, other, aliases, groupby, sortOrder, Some(limitExpr)) - case other => - (None, Nil, other, Map.empty, None, None, None) - } - } - - def findAggreagateExpression(expr: Expression): Seq[AggregateExpression] = { - val exprList = expr match { - case d: AggregateExpression => d :: Nil - case Alias(ref, name) => findAggreagateExpression(ref) - case other => - var listout: Seq[AggregateExpression] = Nil - - other.children.foreach(v => { - val list = findAggreagateExpression(v) - listout = listout ++ list - }) - listout - } - exprList - } - - def collectProjectsAndFilters1(plan: LogicalPlan): - (Option[Seq[NamedExpression]], Seq[Expression], LogicalPlan, Map[Attribute, Expression], - Option[Seq[Expression]], Option[Seq[SortOrder]], Option[Expression]) = { - plan match { - case Project(fields, child) => - val (_, filters, other, aliases, groupby, sortOrder, limit) = collectProjectsAndFilters( - child) - val substitutedFields = fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]] - (Some(substitutedFields), filters, other, collectAliases( - substitutedFields), groupby, sortOrder, limit) - - case Filter(condition, child) => - val (fields, filters, other, aliases, groupby, sortOrder, limit) = - collectProjectsAndFilters(child) - val substitutedCondition = substitute(aliases)(condition) - (fields, filters ++ splitConjunctivePredicates( - substitutedCondition), other, aliases, groupby, sortOrder, limit) - - case Aggregate(groupingExpressions, aggregateExpressions, child) => - val (fields, filters, other, aliases, _, sortOrder, limit) = collectProjectsAndFilters( - child) - val aggExps = aggregateExpressions.map { - case Alias(ref, name) => ref - case others => others - }.filter { - case d: AggregateExpression => true - case _ => false - } - (fields, filters, other, aliases ++ collectAliases(aggregateExpressions), Some( - aggExps), sortOrder, limit) - case Sort(order, _, child) => - val (fields, filters, other, aliases, groupby, _, limit) = collectProjectsAndFilters(child) - val substitutedOrder = order.map(s => SortOrder(substitute(aliases)(s.child), s.direction)) - (fields, filters, other, aliases, groupby, Some(substitutedOrder), limit) - case Limit(limitExpr, child) => - val (fields, filters, other, aliases, groupby, sortOrder, _) = collectProjectsAndFilters( - child) - (fields, filters, other, aliases, groupby, sortOrder, Some(limitExpr)) - case other => - (None, Nil, other, Map.empty, None, None, None) - } - } - - private def collectAliases(fields: Seq[Expression]) = { - fields.collect { - case a@Alias(child, _) => a.toAttribute -> child - }.toMap - } - - private def substitute(aliases: Map[Attribute, Expression])(expr: Expression) = { - expr.transform { - case a@Alias(ref: AttributeReference, name) => - aliases.get(ref).map(Alias(_, name)(a.exprId, a.qualifiers)).getOrElse(a) - - case a: AttributeReference => - aliases.get(a).map(Alias(_, a.name)(a.exprId, a.qualifiers)).getOrElse(a) - } - } -} - -case class PositionLiteral(expr: Expression, intermediateDataType: DataType) - extends LeafExpression with CodegenFallback { - override def dataType: DataType = expr.dataType - - override def nullable: Boolean = false - - type EvaluatedType = Any - var position = -1 - - def setPosition(pos: Int): Unit = position = pos - - override def toString: String = s"PositionLiteral($position : $expr)" - - override def eval(input: InternalRow): Any = { - if (position != -1) { - input.get(position, intermediateDataType) - } else { - expr.eval(input) - } - } -} - -/** - * Matches a logical aggregation that can be performed on distributed data in two steps. The first - * operates on the data in each partition performing partial aggregation for each group. The second - * occurs after the shuffle and completes the aggregation. - * - * This pattern will only match if all aggregate expressions can be computed partially and will - * return the rewritten aggregation expressions for both phases. - * - * The returned values for this match are as follows: - * - Grouping attributes for the final aggregation. - * - Aggregates for the final aggregation. - * - Grouping expressions for the partial aggregation. - * - Partial aggregate expressions. - * - Input to the aggregation. - */ -object CarbonAggregation { - type ReturnType = (Seq[Expression], Seq[NamedExpression], LogicalPlan) - - private def convertAggregatesForPushdown(convertUnknown: Boolean, - rewrittenAggregateExpressions: Seq[Expression], - oneAttr: AttributeReference) = { - if (canBeConvertedToCarbonAggregate(rewrittenAggregateExpressions)) { - var counter: Int = 0 - var updatedExpressions = MutableList[Expression]() - rewrittenAggregateExpressions.foreach(v => { - val updated = convertAggregate(v, counter, convertUnknown, oneAttr) - updatedExpressions += updated - counter = counter + 1 - }) - updatedExpressions - } else { - rewrittenAggregateExpressions - } - } - - def makePositionLiteral(expr: Expression, index: Int, dataType: DataType): PositionLiteral = { - val posLiteral = PositionLiteral(expr, dataType) - posLiteral.setPosition(index) - posLiteral - } - - def convertAggregate(current: Expression, - index: Int, - convertUnknown: Boolean, - oneAttr: AttributeReference): Expression = { - if (!convertUnknown && canBeConverted(current)) { - current.transform { - case Average(attr: AttributeReference) => - val convertedDataType = transformArrayType(attr) - CarbonAverage(makePositionLiteral(convertedDataType, index, convertedDataType.dataType)) - case Average(Cast(attr: AttributeReference, dataType)) => - val convertedDataType = transformArrayType(attr) - CarbonAverage( - makePositionLiteral(convertedDataType, index, convertedDataType.dataType)) - case Count(Seq(s: Literal)) => - CarbonCount(s, Some(makePositionLiteral(transformLongType(oneAttr), index, LongType))) - case Count(Seq(attr: AttributeReference)) => - CarbonCount(makePositionLiteral(transformLongType(attr), index, LongType)) - case Sum(attr: AttributeReference) => - Sum(makePositionLiteral(attr, index, attr.dataType)) - case Sum(Cast(attr: AttributeReference, dataType)) => - Sum(Cast(makePositionLiteral(attr, index, attr.dataType), dataType)) - case Min(attr: AttributeReference) => Min(makePositionLiteral(attr, index, attr.dataType)) - case Min(Cast(attr: AttributeReference, dataType)) => - Min(Cast(makePositionLiteral(attr, index, attr.dataType), dataType)) - case Max(attr: AttributeReference) => - Max(makePositionLiteral(attr, index, attr.dataType)) - case Max(Cast(attr: AttributeReference, dataType)) => - Max(Cast(makePositionLiteral(attr, index, attr.dataType), dataType)) - } - } else { - current - } - } - - def canBeConverted(current: Expression): Boolean = current match { - case Alias(AggregateExpression(Average(attr: AttributeReference), _, false), _) => true - case Alias(AggregateExpression(Average(Cast(attr: AttributeReference, _)), _, false), _) => true - case Alias(AggregateExpression(Count(Seq(s: Literal)), _, false), _) => true - case Alias(AggregateExpression(Count(Seq(attr: AttributeReference)), _, false), _) => true - case Alias(AggregateExpression(Sum(attr: AttributeReference), _, false), _) => true - case Alias(AggregateExpression(Sum(Cast(attr: AttributeReference, _)), _, false), _) => true - case Alias(AggregateExpression(Min(attr: AttributeReference), _, false), _) => true - case Alias(AggregateExpression(Min(Cast(attr: AttributeReference, _)), _, false), _) => true - case Alias(AggregateExpression(Max(attr: AttributeReference), _, false), _) => true - case Alias(AggregateExpression(Max(Cast(attr: AttributeReference, _)), _, false), _) => true - case _ => false - } - - def transformArrayType(attr: AttributeReference): AttributeReference = { - AttributeReference(attr.name, ArrayType(DoubleType), attr.nullable, attr.metadata)(attr.exprId, - attr.qualifiers) - } - - def transformLongType(attr: AttributeReference): AttributeReference = { - AttributeReference(attr.name, LongType, attr.nullable, attr.metadata)(attr.exprId, - attr.qualifiers) - } - - /** - * There should be sync between carbonOperators validation and here. we should not convert to - * carbon aggregates if the validation does not satisfy. - */ - def canBeConvertedToCarbonAggregate(expressions: Seq[Expression]): Boolean = { - val detailQuery = expressions.map { - case attr@AttributeReference(_, _, _, _) => true - case Alias(agg: AggregateExpression, name) => true - case _ => false - }.exists(!_) - !detailQuery - } - - def unapply(plan: LogicalPlan): Option[ReturnType] = unapply((plan, false)) - - def unapply(combinedPlan: (LogicalPlan, Boolean)): Option[ReturnType] = { - val oneAttr = getOneAttribute(combinedPlan._1) - combinedPlan._1 match { - case Aggregate(groupingExpressions, aggregateExpressionsOrig, child) => - - // if detailed query dont convert aggregate expressions to Carbon Aggregate expressions - val aggregateExpressions = - if (combinedPlan._2) { - aggregateExpressionsOrig - } - else { - convertAggregatesForPushdown(false, aggregateExpressionsOrig, oneAttr) - } - Some((groupingExpressions, aggregateExpressions.asInstanceOf[Seq[NamedExpression]], child)) - case _ => None - } - } - - def getOneAttribute(plan: LogicalPlan): AttributeReference = { - var relation: LogicalRelation = null - plan collect { - case l: LogicalRelation => relation = l - } - if (relation != null) { - relation.output.find { p => - p.dataType match { - case n: NumericType => true - case _ => false - } - }.getOrElse(relation.output.head) - } else { - null - } - } -} - -