Repository: carbondata Updated Branches: refs/heads/master c471386fa -> 2fc0ad306
[CARBONDATA-2365] Add QueryExecutor in SearchMode for row-based CarbonRecordReader This closes #2196 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2fc0ad30 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2fc0ad30 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2fc0ad30 Branch: refs/heads/master Commit: 2fc0ad306a749983a70df279a1f6a9a881648e04 Parents: c471386 Author: Manhua <kevin...@qq.com> Authored: Fri Apr 20 16:07:27 2018 +0800 Committer: Jacky Li <jacky.li...@qq.com> Committed: Fri Apr 20 23:57:23 2018 +0800 ---------------------------------------------------------------------- .../scan/executor/QueryExecutorFactory.java | 20 +++- .../impl/SearchModeDetailQueryExecutor.java | 69 +++++++++++ .../SearchModeVectorDetailQueryExecutor.java | 10 +- .../core/scan/processor/BlockScan.java | 4 + .../AbstractDetailQueryResultIterator.java | 2 +- .../AbstractSearchModeResultIterator.java | 118 +++++++++++++++++++ .../iterator/SearchModeResultIterator.java | 107 ++--------------- .../SearchModeVectorResultIterator.java | 49 ++++++++ .../detailquery/SearchModeTestCase.scala | 13 +- .../VectorizedCarbonRecordReader.java | 11 +- 10 files changed, 288 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fc0ad30/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java index a890323..06fe4db 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/QueryExecutorFactory.java @@ -16,9 +16,13 @@ */ package org.apache.carbondata.core.scan.executor; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.scan.executor.impl.DetailQueryExecutor; +import org.apache.carbondata.core.scan.executor.impl.SearchModeDetailQueryExecutor; +import org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor; import org.apache.carbondata.core.scan.executor.impl.VectorDetailQueryExecutor; import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.util.CarbonProperties; /** * Factory class to get the query executor from RDD @@ -27,10 +31,20 @@ import org.apache.carbondata.core.scan.model.QueryModel; public class QueryExecutorFactory { public static QueryExecutor getQueryExecutor(QueryModel queryModel) { - if (queryModel.isVectorReader()) { - return new VectorDetailQueryExecutor(); + if (CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE, + CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE_DEFAULT).equals("true")) { + if (queryModel.isVectorReader()) { + return new SearchModeVectorDetailQueryExecutor(); + } else { + return new SearchModeDetailQueryExecutor(); + } } else { - return new DetailQueryExecutor(); + if (queryModel.isVectorReader()) { + return new VectorDetailQueryExecutor(); + } else { + return new DetailQueryExecutor(); + } } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fc0ad30/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java new file mode 100644 index 0000000..c64755e --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeDetailQueryExecutor.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.scan.executor.impl; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.scan.result.iterator.SearchModeResultIterator; +import org.apache.carbondata.core.util.CarbonProperties; + + +public class SearchModeDetailQueryExecutor extends AbstractQueryExecutor<Object> { + private static final LogService LOGGER = + LogServiceFactory.getLogService(SearchModeDetailQueryExecutor.class.getName()); + private static ExecutorService executorService; + + static { + int nThread; + try { + nThread = Integer.parseInt(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD, + CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT)); + } catch (NumberFormatException e) { + nThread = Integer.parseInt(CarbonCommonConstants.CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT); + LOGGER.warn("The carbon.search.mode.thread is invalid. Using the default value " + nThread); + } + if (nThread > 0) { + executorService = Executors.newFixedThreadPool(nThread); + } else { + executorService = Executors.newCachedThreadPool(); + } + } + + @Override + public CarbonIterator<Object> execute(QueryModel queryModel) + throws QueryExecutionException, IOException { + List<BlockExecutionInfo> blockExecutionInfoList = getBlockExecutionInfos(queryModel); + this.queryIterator = new SearchModeResultIterator( + blockExecutionInfoList, + queryModel, + executorService + ); + return this.queryIterator; + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fc0ad30/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java index 439cadf..075d94a 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/SearchModeVectorDetailQueryExecutor.java @@ -28,12 +28,10 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; import org.apache.carbondata.core.scan.model.QueryModel; -import org.apache.carbondata.core.scan.result.iterator.SearchModeResultIterator; +import org.apache.carbondata.core.scan.result.iterator.SearchModeVectorResultIterator; import org.apache.carbondata.core.util.CarbonProperties; -/** - * Below class will be used to execute the detail query and returns columnar vectors. - */ + public class SearchModeVectorDetailQueryExecutor extends AbstractQueryExecutor<Object> { private static final LogService LOGGER = LogServiceFactory.getLogService(SearchModeVectorDetailQueryExecutor.class.getName()); @@ -50,7 +48,7 @@ public class SearchModeVectorDetailQueryExecutor extends AbstractQueryExecutor<O LOGGER.warn("The carbon.search.mode.thread is invalid. Using the default value " + nThread); } if (nThread > 0) { - executorService = Executors.newFixedThreadPool(nThread); + executorService = Executors.newFixedThreadPool(nThread); } else { executorService = Executors.newCachedThreadPool(); } @@ -60,7 +58,7 @@ public class SearchModeVectorDetailQueryExecutor extends AbstractQueryExecutor<O public CarbonIterator<Object> execute(QueryModel queryModel) throws QueryExecutionException, IOException { List<BlockExecutionInfo> blockExecutionInfoList = getBlockExecutionInfos(queryModel); - this.queryIterator = new SearchModeResultIterator( + this.queryIterator = new SearchModeVectorResultIterator( blockExecutionInfoList, queryModel, executorService http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fc0ad30/core/src/main/java/org/apache/carbondata/core/scan/processor/BlockScan.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/processor/BlockScan.java b/core/src/main/java/org/apache/carbondata/core/scan/processor/BlockScan.java index eb41071..4d46b3b 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/processor/BlockScan.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/processor/BlockScan.java @@ -95,4 +95,8 @@ public class BlockScan { this.scannerResultAggregator.collectResultInColumnarBatch(curResult, columnarBatch); } + public List<Object[]> next(int size) { + return this.scannerResultAggregator.collectResultInRow(curResult, size); + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fc0ad30/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java index e8a61fc..bb23ff3 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java @@ -82,7 +82,7 @@ public abstract class AbstractDetailQueryResultIterator<E> extends CarbonIterato /** * number of cores which can be used */ - private int batchSize; + protected int batchSize; /** * queryStatisticsModel to store query statistics object */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fc0ad30/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractSearchModeResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractSearchModeResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractSearchModeResultIterator.java new file mode 100644 index 0000000..1b52e55 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractSearchModeResultIterator.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.scan.result.iterator; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.scan.processor.BlockScan; +import org.apache.carbondata.core.stats.QueryStatistic; +import org.apache.carbondata.core.stats.QueryStatisticsConstants; +import org.apache.carbondata.core.stats.QueryStatisticsModel; +import org.apache.carbondata.core.stats.QueryStatisticsRecorder; + +public abstract class AbstractSearchModeResultIterator + extends AbstractDetailQueryResultIterator<Object> { + + private FileFactory.FileType fileType; + private List<Future<BlockScan>> taskSubmitList; + protected BlockScan curBlockScan; + private int nextBlockScanIndex = 0; + + public AbstractSearchModeResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel, + ExecutorService execService) { + super(infos, queryModel, execService); + this.fileType = FileFactory.getFileType(queryModel.getAbsoluteTableIdentifier().getTablePath()); + scanAll(); + } + + private void scanAll() { + taskSubmitList = new ArrayList<>(blockExecutionInfos.size()); + for (final BlockExecutionInfo info: blockExecutionInfos) { + taskSubmitList.add(execService.submit(new Callable<BlockScan>() { + + @Override + public BlockScan call() throws Exception { + BlockScan blockScan = new BlockScan(info, FileFactory.getFileHolder(fileType), + buildQueryStatiticsModel(recorder)); + blockScan.scan(); + return blockScan; + } + })); + } + } + + @Override + public boolean hasNext() { + try { + while ((curBlockScan == null || !curBlockScan.hasNext()) && + nextBlockScanIndex < taskSubmitList.size()) { + curBlockScan = taskSubmitList.get(nextBlockScanIndex++).get(); + } + return curBlockScan != null && curBlockScan.hasNext(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + private QueryStatisticsModel buildQueryStatiticsModel(QueryStatisticsRecorder recorder) { + QueryStatisticsModel queryStatisticsModel = new QueryStatisticsModel(); + queryStatisticsModel.setRecorder(recorder); + QueryStatistic queryStatisticTotalBlocklet = new QueryStatistic(); + queryStatisticsModel.getStatisticsTypeAndObjMap() + .put(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM, queryStatisticTotalBlocklet); + queryStatisticsModel.getRecorder().recordStatistics(queryStatisticTotalBlocklet); + + QueryStatistic queryStatisticValidScanBlocklet = new QueryStatistic(); + queryStatisticsModel.getStatisticsTypeAndObjMap() + .put(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM, queryStatisticValidScanBlocklet); + queryStatisticsModel.getRecorder().recordStatistics(queryStatisticValidScanBlocklet); + + QueryStatistic totalNumberOfPages = new QueryStatistic(); + queryStatisticsModel.getStatisticsTypeAndObjMap() + .put(QueryStatisticsConstants.TOTAL_PAGE_SCANNED, totalNumberOfPages); + queryStatisticsModel.getRecorder().recordStatistics(totalNumberOfPages); + + QueryStatistic validPages = new QueryStatistic(); + queryStatisticsModel.getStatisticsTypeAndObjMap() + .put(QueryStatisticsConstants.VALID_PAGE_SCANNED, validPages); + queryStatisticsModel.getRecorder().recordStatistics(validPages); + + QueryStatistic scannedPages = new QueryStatistic(); + queryStatisticsModel.getStatisticsTypeAndObjMap() + .put(QueryStatisticsConstants.PAGE_SCANNED, scannedPages); + queryStatisticsModel.getRecorder().recordStatistics(scannedPages); + + QueryStatistic scanTime = new QueryStatistic(); + queryStatisticsModel.getStatisticsTypeAndObjMap() + .put(QueryStatisticsConstants.SCAN_BLOCKlET_TIME, scanTime); + queryStatisticsModel.getRecorder().recordStatistics(scanTime); + + QueryStatistic readTime = new QueryStatistic(); + queryStatisticsModel.getStatisticsTypeAndObjMap() + .put(QueryStatisticsConstants.READ_BLOCKlET_TIME, readTime); + queryStatisticsModel.getRecorder().recordStatistics(readTime); + return queryStatisticsModel; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fc0ad30/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/SearchModeResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/SearchModeResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/SearchModeResultIterator.java index ae46242..c4a65b9 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/SearchModeResultIterator.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/SearchModeResultIterator.java @@ -16,119 +16,38 @@ */ package org.apache.carbondata.core.scan.result.iterator; -import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; import org.apache.carbondata.core.scan.model.QueryModel; -import org.apache.carbondata.core.scan.processor.BlockScan; -import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; -import org.apache.carbondata.core.stats.QueryStatistic; -import org.apache.carbondata.core.stats.QueryStatisticsConstants; -import org.apache.carbondata.core.stats.QueryStatisticsModel; -import org.apache.carbondata.core.stats.QueryStatisticsRecorder; +import org.apache.carbondata.core.scan.result.RowBatch; -public class SearchModeResultIterator extends AbstractDetailQueryResultIterator<Object> { +public class SearchModeResultIterator extends AbstractSearchModeResultIterator { private final Object lock = new Object(); - private FileFactory.FileType fileType; - private List<Future<BlockScan>> taskSubmitList; - private BlockScan curBlockScan; - private int nextBlockScanIndex = 0; - public SearchModeResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel, - ExecutorService execService) { + ExecutorService execService) { super(infos, queryModel, execService); - this.fileType = FileFactory.getFileType(queryModel.getAbsoluteTableIdentifier().getTablePath()); - scanAll(); - } - - private void scanAll() { - taskSubmitList = new ArrayList<>(blockExecutionInfos.size()); - for (final BlockExecutionInfo info: blockExecutionInfos) { - taskSubmitList.add(execService.submit(new Callable<BlockScan>() { - - @Override - public BlockScan call() throws Exception { - BlockScan blockScan = new BlockScan(info, FileFactory.getFileHolder(fileType), - buildQueryStatiticsModel(recorder)); - blockScan.scan(); - return blockScan; - } - })); - } - } - - @Override - public boolean hasNext() { - try { - while ((curBlockScan == null || !curBlockScan.hasNext()) && - nextBlockScanIndex < taskSubmitList.size()) { - curBlockScan = taskSubmitList.get(nextBlockScanIndex++).get(); - } - return curBlockScan != null && curBlockScan.hasNext(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } } @Override - public Object next() { - throw new UnsupportedOperationException("call processNextBatch instead"); + public RowBatch next() { + return getBatchResult(); } - @Override - public void processNextBatch(CarbonColumnarBatch columnarBatch) { + private RowBatch getBatchResult() { + RowBatch rowBatch = new RowBatch(); synchronized (lock) { if (curBlockScan.hasNext()) { - curBlockScan.processNextBatch(columnarBatch); + List<Object[]> collectedResult = curBlockScan.next(batchSize); + while (collectedResult.size() < batchSize && hasNext()) { + collectedResult.addAll(curBlockScan.next(batchSize - collectedResult.size())); + } + rowBatch.setRows(collectedResult); } } - } - - private QueryStatisticsModel buildQueryStatiticsModel(QueryStatisticsRecorder recorder) { - QueryStatisticsModel queryStatisticsModel = new QueryStatisticsModel(); - queryStatisticsModel.setRecorder(recorder); - QueryStatistic queryStatisticTotalBlocklet = new QueryStatistic(); - queryStatisticsModel.getStatisticsTypeAndObjMap() - .put(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM, queryStatisticTotalBlocklet); - queryStatisticsModel.getRecorder().recordStatistics(queryStatisticTotalBlocklet); - - QueryStatistic queryStatisticValidScanBlocklet = new QueryStatistic(); - queryStatisticsModel.getStatisticsTypeAndObjMap() - .put(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM, queryStatisticValidScanBlocklet); - queryStatisticsModel.getRecorder().recordStatistics(queryStatisticValidScanBlocklet); - - QueryStatistic totalNumberOfPages = new QueryStatistic(); - queryStatisticsModel.getStatisticsTypeAndObjMap() - .put(QueryStatisticsConstants.TOTAL_PAGE_SCANNED, totalNumberOfPages); - queryStatisticsModel.getRecorder().recordStatistics(totalNumberOfPages); - - QueryStatistic validPages = new QueryStatistic(); - queryStatisticsModel.getStatisticsTypeAndObjMap() - .put(QueryStatisticsConstants.VALID_PAGE_SCANNED, validPages); - queryStatisticsModel.getRecorder().recordStatistics(validPages); - - QueryStatistic scannedPages = new QueryStatistic(); - queryStatisticsModel.getStatisticsTypeAndObjMap() - .put(QueryStatisticsConstants.PAGE_SCANNED, scannedPages); - queryStatisticsModel.getRecorder().recordStatistics(scannedPages); - - QueryStatistic scanTime = new QueryStatistic(); - queryStatisticsModel.getStatisticsTypeAndObjMap() - .put(QueryStatisticsConstants.SCAN_BLOCKlET_TIME, scanTime); - queryStatisticsModel.getRecorder().recordStatistics(scanTime); - - QueryStatistic readTime = new QueryStatistic(); - queryStatisticsModel.getStatisticsTypeAndObjMap() - .put(QueryStatisticsConstants.READ_BLOCKlET_TIME, readTime); - queryStatisticsModel.getRecorder().recordStatistics(readTime); - return queryStatisticsModel; + return rowBatch; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fc0ad30/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/SearchModeVectorResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/SearchModeVectorResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/SearchModeVectorResultIterator.java new file mode 100644 index 0000000..bff5e36 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/SearchModeVectorResultIterator.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.scan.result.iterator; + +import java.util.List; +import java.util.concurrent.ExecutorService; + +import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; + +public class SearchModeVectorResultIterator extends AbstractSearchModeResultIterator { + + private final Object lock = new Object(); + + public SearchModeVectorResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel, + ExecutorService execService) { + super(infos, queryModel, execService); + } + + @Override + public Object next() { + throw new UnsupportedOperationException("call processNextBatch instead"); + } + + @Override + public void processNextBatch(CarbonColumnarBatch columnarBatch) { + synchronized (lock) { + if (curBlockScan.hasNext()) { + curBlockScan.processNextBatch(columnarBatch); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fc0ad30/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala index 0b8f92b..7dc7493 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala @@ -37,7 +37,18 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll { } - test("select empno,empname,utilization from alldatatypestable where empname = 'ayushi'") { + test("SearchMode Query: row result") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "false") + checkAnswer( + sql("select empno,empname,utilization from alldatatypestable where empname = 'ayushi'"), + sql("select empno,empname,utilization from alldatatypestable_hive where empname = 'ayushi'")) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE, + CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE_DEFAULT) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, + CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT) + } + test("SearchMode Query: vector result") { CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE, "true") checkAnswer( sql("select empno,empname,utilization from alldatatypestable where empname = 'ayushi'"), http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fc0ad30/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java index eb71daa..903bf44 100644 --- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java +++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java @@ -26,7 +26,6 @@ import java.util.Map; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.cache.dictionary.Dictionary; -import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.block.TableBlockInfo; import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; @@ -36,14 +35,12 @@ import org.apache.carbondata.core.metadata.encoder.Encoding; import org.apache.carbondata.core.scan.executor.QueryExecutor; import org.apache.carbondata.core.scan.executor.QueryExecutorFactory; import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException; -import org.apache.carbondata.core.scan.executor.impl.SearchModeVectorDetailQueryExecutor; import org.apache.carbondata.core.scan.model.ProjectionDimension; import org.apache.carbondata.core.scan.model.ProjectionMeasure; import org.apache.carbondata.core.scan.model.QueryModel; import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator; import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector; import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch; -import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.hadoop.AbstractRecordReader; import org.apache.carbondata.hadoop.CarbonInputSplit; @@ -134,13 +131,7 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> { queryModel.setTableBlockInfos(tableBlockInfoList); queryModel.setVectorReader(true); try { - if (CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE, - CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE_DEFAULT).equals("true")) { - queryExecutor = new SearchModeVectorDetailQueryExecutor(); - } else { - queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel); - } + queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel); iterator = (AbstractDetailQueryResultIterator) queryExecutor.execute(queryModel); } catch (QueryExecutionException e) { Throwable ext = e;