[CARBONDATA-2389] Search mode support FG datamap Search mode support FG datamap
This closes #2290 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cb71ffe1 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cb71ffe1 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cb71ffe1 Branch: refs/heads/branch-1.4 Commit: cb71ffe1ac1a39ef34df43457e704232a4c1444e Parents: 67766ab Author: xubo245 <601450...@qq.com> Authored: Wed May 9 21:20:59 2018 +0800 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Tue Jun 5 16:04:20 2018 +0530 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 12 + .../core/datamap/DataMapStoreManager.java | 20 +- .../apache/carbondata/core/datamap/Segment.java | 2 +- .../datamap/dev/expr/AndDataMapExprWrapper.java | 16 + .../datamap/dev/expr/DataMapExprWrapper.java | 13 + .../dev/expr/DataMapExprWrapperImpl.java | 8 + .../datamap/dev/expr/OrDataMapExprWrapper.java | 13 + .../LatestFilesReadCommittedScope.java | 43 ++- .../core/readcommitter/ReadCommittedScope.java | 2 +- .../TableStatusReadCommittedScope.java | 2 +- .../lucene/LuceneDataMapFactoryBase.java | 4 +- .../examples/LuceneDataMapExample.scala | 2 - .../carbondata/hadoop/CarbonRecordReader.java | 8 +- .../hadoop/api/CarbonInputFormat.java | 6 +- .../lucene/LuceneFineGrainDataMapSuite.scala | 1 + ...eneFineGrainDataMapWithSearchModeSuite.scala | 328 +++++++++++++++++++ .../detailquery/SearchModeTestCase.scala | 27 ++ .../execution/command/CarbonHiveCommands.scala | 4 +- .../spark/sql/optimizer/CarbonFilters.scala | 2 + .../store/worker/SearchRequestHandler.java | 37 ++- .../scala/org/apache/spark/rpc/Master.scala | 13 +- 21 files changed, 521 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 8ebce9e..08aa704 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1658,6 +1658,18 @@ public final class CarbonCommonConstants { public static final String CARBON_SEARCH_MODE_ENABLE_DEFAULT = "false"; /** + * It's timeout threshold of carbon search query + */ + @CarbonProperty + @InterfaceStability.Unstable + public static final String CARBON_SEARCH_QUERY_TIMEOUT = "carbon.search.query.timeout"; + + /** + * Default value is 10 seconds + */ + public static final String CARBON_SEARCH_QUERY_TIMEOUT_DEFAULT = "10s"; + + /** * The size of thread pool used for reading files in Work for search mode. By default, * it is number of cores in Worker */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java index 0fcf4cd..96d2b1c 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java @@ -96,13 +96,19 @@ public final class DataMapStoreManager { String dbName = carbonTable.getDatabaseName(); String tableName = carbonTable.getTableName(); String dmName = dataMap.getDataMapSchema().getDataMapName(); - boolean isDmVisible = sessionInfo.getSessionParams().getProperty( - String.format("%s%s.%s.%s", CarbonCommonConstants.CARBON_DATAMAP_VISIBLE, - dbName, tableName, dmName), "true").trim().equalsIgnoreCase("true"); - if (!isDmVisible) { - LOGGER.warn(String.format("Ignore invisible datamap %s on table %s.%s", - dmName, dbName, tableName)); - dataMapIterator.remove(); + // TODO: need support get the visible status of datamap without sessionInfo in the future + if (sessionInfo != null) { + boolean isDmVisible = sessionInfo.getSessionParams().getProperty( + String.format("%s%s.%s.%s", CarbonCommonConstants.CARBON_DATAMAP_VISIBLE, + dbName, tableName, dmName), "true").trim().equalsIgnoreCase("true"); + if (!isDmVisible) { + LOGGER.warn(String.format("Ignore invisible datamap %s on table %s.%s", + dmName, dbName, tableName)); + dataMapIterator.remove(); + } + } else { + String message = "Carbon session info is null"; + LOGGER.info(message); } } return allDataMaps; http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java index 85c7176..7b63b84 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java @@ -115,7 +115,7 @@ public class Segment implements Serializable { public SegmentRefreshInfo getSegmentRefreshInfo(UpdateVO updateVo) throws IOException { - return readCommittedScope.getCommitedSegmentRefreshInfo(this, updateVo); + return readCommittedScope.getCommittedSegmentRefreshInfo(this, updateVo); } public String getSegmentNo() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java index 1de16bc..ec674de 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/AndDataMapExprWrapper.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.carbondata.core.datamap.DataMapDistributable; import org.apache.carbondata.core.datamap.DataMapLevel; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; @@ -59,6 +60,21 @@ public class AndDataMapExprWrapper implements DataMapExprWrapper { return andBlocklets; } + @Override + public List<ExtendedBlocklet> prune(DataMapDistributable distributable, + List<PartitionSpec> partitionsToPrune) + throws IOException { + List<ExtendedBlocklet> leftPrune = left.prune(distributable, partitionsToPrune); + List<ExtendedBlocklet> rightPrune = right.prune(distributable, partitionsToPrune); + List<ExtendedBlocklet> andBlocklets = new ArrayList<>(); + for (ExtendedBlocklet blocklet : leftPrune) { + if (rightPrune.contains(blocklet)) { + andBlocklets.add(blocklet); + } + } + return andBlocklets; + } + @Override public List<ExtendedBlocklet> pruneBlocklets(List<ExtendedBlocklet> blocklets) throws IOException { List<ExtendedBlocklet> leftPrune = left.pruneBlocklets(blocklets); http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java index 5a04529..901cfc7 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapper.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.List; +import org.apache.carbondata.core.datamap.DataMapDistributable; import org.apache.carbondata.core.datamap.DataMapLevel; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; @@ -41,6 +42,18 @@ public interface DataMapExprWrapper extends Serializable { throws IOException; /** + * prune blocklet according distributable + * + * @param distributable distributable + * @param partitionsToPrune partitions to prune + * @return the pruned ExtendedBlocklet list + * @throws IOException + */ + List<ExtendedBlocklet> prune(DataMapDistributable distributable, + List<PartitionSpec> partitionsToPrune) + throws IOException; + + /** * It is used in case on distributable datamap. First using job it gets all blockets from all * related datamaps. These blocklets are passed to this method to apply expression. * @param blocklets http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java index 38f2336..6537976 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java @@ -25,6 +25,7 @@ import org.apache.carbondata.core.datamap.DataMapDistributable; import org.apache.carbondata.core.datamap.DataMapLevel; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.TableDataMap; +import org.apache.carbondata.core.datamap.dev.DataMap; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; @@ -52,6 +53,13 @@ public class DataMapExprWrapperImpl implements DataMapExprWrapper { return dataMap.prune(segments, expression, partitionsToPrune); } + public List<ExtendedBlocklet> prune(DataMapDistributable distributable, + List<PartitionSpec> partitionsToPrune) + throws IOException { + List<DataMap> dataMaps = dataMap.getTableDataMaps(distributable); + return dataMap.prune(dataMaps, distributable, expression, partitionsToPrune); + } + @Override public List<ExtendedBlocklet> pruneBlocklets(List<ExtendedBlocklet> blocklets) throws IOException { List<ExtendedBlocklet> blockletList = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java index 4988903..bb98535 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/OrDataMapExprWrapper.java @@ -22,6 +22,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.carbondata.core.datamap.DataMapDistributable; import org.apache.carbondata.core.datamap.DataMapLevel; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; @@ -58,6 +59,18 @@ public class OrDataMapExprWrapper implements DataMapExprWrapper { return new ArrayList<>(andBlocklets); } + @Override + public List<ExtendedBlocklet> prune(DataMapDistributable distributable, + List<PartitionSpec> partitionsToPrune) + throws IOException { + List<ExtendedBlocklet> leftPrune = left.prune(distributable, partitionsToPrune); + List<ExtendedBlocklet> rightPrune = right.prune(distributable, partitionsToPrune); + Set<ExtendedBlocklet> andBlocklets = new HashSet<>(); + andBlocklets.addAll(leftPrune); + andBlocklets.addAll(rightPrune); + return new ArrayList<>(andBlocklets); + } + @Override public List<ExtendedBlocklet> pruneBlocklets(List<ExtendedBlocklet> blocklets) throws IOException { List<ExtendedBlocklet> leftPrune = left.pruneBlocklets(blocklets); http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java index 14bba65..6a1234e 100644 --- a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java +++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java @@ -17,10 +17,7 @@ package org.apache.carbondata.core.readcommitter; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.annotations.InterfaceStability; @@ -43,11 +40,20 @@ import org.apache.carbondata.core.util.path.CarbonTablePath; public class LatestFilesReadCommittedScope implements ReadCommittedScope { private String carbonFilePath; + private String segmentId; private ReadCommittedIndexFileSnapShot readCommittedIndexFileSnapShot; private LoadMetadataDetails[] loadMetadataDetails; - public LatestFilesReadCommittedScope(String path) { + /** + * a new constructor of this class + * + * @param path carbon file path + * @param segmentId segment id + */ + public LatestFilesReadCommittedScope(String path, String segmentId) { + Objects.requireNonNull(path); this.carbonFilePath = path; + this.segmentId = segmentId; try { takeCarbonIndexFileSnapShot(); } catch (IOException ex) { @@ -55,6 +61,15 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope { } } + /** + * a new constructor with path + * + * @param path carbon file path + */ + public LatestFilesReadCommittedScope(String path) { + this(path, null); + } + private void prepareLoadMetadata() { int loadCount = 0; Map<String, List<String>> snapshotMap = @@ -101,13 +116,16 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope { segName = segment.getSegmentFileName(); } List<String> index = snapShot.get(segName); + if (null == index) { + index = new LinkedList<>(); + } for (String indexPath : index) { indexFileStore.put(indexPath, null); } return indexFileStore; } - @Override public SegmentRefreshInfo getCommitedSegmentRefreshInfo( + @Override public SegmentRefreshInfo getCommittedSegmentRefreshInfo( Segment segment, UpdateVO updateVo) throws IOException { Map<String, SegmentRefreshInfo> snapShot = readCommittedIndexFileSnapShot.getSegmentTimestampUpdaterMap(); @@ -140,9 +158,10 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope { // Read the current file Path get the list of indexes from the path. CarbonFile file = FileFactory.getCarbonFile(carbonFilePath); CarbonFile[] files = file.listFiles(new CarbonFileFilter() { - @Override public boolean accept(CarbonFile file) { + @Override + public boolean accept(CarbonFile file) { return file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName() - .endsWith(CarbonTablePath.CARBON_DATA_EXT); + .endsWith(CarbonTablePath.CARBON_DATA_EXT) || file.getName().endsWith("Fact"); } }); if (files.length == 0) { @@ -152,8 +171,14 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope { } Map<String, List<String>> indexFileStore = new HashMap<>(); Map<String, SegmentRefreshInfo> segmentTimestampUpdaterMap = new HashMap<>(); + CarbonFile[] carbonIndexFiles = null; if (file.isDirectory()) { - CarbonFile[] carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(carbonFilePath); + if (segmentId == null) { + carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(carbonFilePath); + } else { + String segmentPath = CarbonTablePath.getSegmentPath(carbonFilePath, segmentId); + carbonIndexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentPath); + } for (int i = 0; i < carbonIndexFiles.length; i++) { // TODO. If Required to support merge index, then this code has to be modified. // TODO. Nested File Paths. http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java index 6ff4b89..d177a00 100644 --- a/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java +++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/ReadCommittedScope.java @@ -45,7 +45,7 @@ public interface ReadCommittedScope extends Serializable { */ public Map<String, String> getCommittedIndexFile(Segment segment) throws IOException ; - public SegmentRefreshInfo getCommitedSegmentRefreshInfo( + public SegmentRefreshInfo getCommittedSegmentRefreshInfo( Segment segment, UpdateVO updateVo) throws IOException; public void takeCarbonIndexFileSnapShot() throws IOException; http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java index 91ebd41..1f61aab 100644 --- a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java +++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java @@ -79,7 +79,7 @@ public class TableStatusReadCommittedScope implements ReadCommittedScope { return indexFiles; } - public SegmentRefreshInfo getCommitedSegmentRefreshInfo(Segment segment, UpdateVO updateVo) + public SegmentRefreshInfo getCommittedSegmentRefreshInfo(Segment segment, UpdateVO updateVo) throws IOException { SegmentRefreshInfo segmentRefreshInfo; if (updateVo != null) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java ---------------------------------------------------------------------- diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java index fab0565..1da8edd 100644 --- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java +++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java @@ -29,6 +29,7 @@ import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandExcept import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.datamap.DataMapDistributable; +import org.apache.carbondata.core.datamap.DataMapLevel; import org.apache.carbondata.core.datamap.DataMapMeta; import org.apache.carbondata.core.datamap.DataMapStoreManager; import org.apache.carbondata.core.datamap.Segment; @@ -235,7 +236,8 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> extends DataMapFactor } for (CarbonFile indexDir : indexDirs) { // Filter out the tasks which are filtered through CG datamap. - if (!segment.getFilteredIndexShardNames().contains(indexDir.getName())) { + if (getDataMapLevel() != DataMapLevel.FG && + !segment.getFilteredIndexShardNames().contains(indexDir.getName())) { continue; } DataMapDistributable luceneDataMapDistributable = new LuceneDataMapDistributable( http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneDataMapExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneDataMapExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneDataMapExample.scala index efe2a63..fe94f54 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneDataMapExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/LuceneDataMapExample.scala @@ -61,8 +61,6 @@ object LuceneDataMapExample { | DMProperties('INDEX_COLUMNS'='id , name') """.stripMargin) - spark.sql("refresh datamap dm ON TABLE personTable") - // 1. Compare the performance: def time(code: => Unit): Double = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java index cad20fc..da84c00 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java @@ -78,8 +78,12 @@ public class CarbonRecordReader<T> extends AbstractRecordReader<T> { } else { throw new RuntimeException("unsupported input split type: " + inputSplit); } - List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList); - queryModel.setTableBlockInfos(tableBlockInfoList); + // It should use the exists tableBlockInfos if tableBlockInfos of queryModel is not empty + // otherwise the prune is no use before this method + if (queryModel.getTableBlockInfos().isEmpty()) { + List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList); + queryModel.setTableBlockInfos(tableBlockInfoList); + } readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getTable()); try { carbonIterator = new ChunkRowIterator(queryExecutor.execute(queryModel)); http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java index cf51162..05c70f8 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java @@ -372,7 +372,7 @@ m filterExpression List<ExtendedBlocklet> prunedBlocklets = getPrunedBlocklets(job, carbonTable, resolver, segmentIds); - List<CarbonInputSplit> resultFilterredBlocks = new ArrayList<>(); + List<CarbonInputSplit> resultFilteredBlocks = new ArrayList<>(); int partitionIndex = 0; List<Integer> partitionIdList = new ArrayList<>(); if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) { @@ -401,7 +401,7 @@ m filterExpression if (matchedPartitions == null || matchedPartitions.get(partitionIndex)) { CarbonInputSplit inputSplit = convertToCarbonInputSplit(blocklet); if (inputSplit != null) { - resultFilterredBlocks.add(inputSplit); + resultFilteredBlocks.add(inputSplit); } } } @@ -409,7 +409,7 @@ m filterExpression statistic .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis()); recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id")); - return resultFilterredBlocks; + return resultFilteredBlocks; } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala index 638d24d..f64a349 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala @@ -438,6 +438,7 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll { .contains("Unsupported alter operation on hive table")) sql("drop datamap if exists dm2 on table datamap_test_table") } + test("test Clean Files and check Lucene DataMap") { sql("DROP TABLE IF EXISTS datamap_test_table") sql( http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala new file mode 100644 index 0000000..0ceead8 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapWithSearchModeSuite.scala @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.lucene + +import java.io.{File, PrintWriter} + +import scala.util.Random + +import org.apache.spark.sql.{CarbonEnv, CarbonSession, Row} +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.datamap.status.DataMapStatusManager + +/** + * Test lucene fine grain datamap with search mode + */ +class LuceneFineGrainDataMapWithSearchModeSuite extends QueryTest with BeforeAndAfterAll { + + val file2 = resourcesPath + "/datamap_input.csv" + + override protected def beforeAll(): Unit = { + //n should be about 5000000 of reset if size is default 1024 + val n = 500000 + sqlContext.sparkSession.asInstanceOf[CarbonSession].startSearchMode() + CarbonProperties + .getInstance() + .addProperty(CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT, "100s") + LuceneFineGrainDataMapSuite.createFile(file2, n) + sql("create database if not exists lucene") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION, + CarbonEnv.getDatabaseLocation("lucene", sqlContext.sparkSession)) + sql("use lucene") + sql("DROP TABLE IF EXISTS datamap_test") + sql( + """ + | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT) + | STORED BY 'carbondata' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') + """.stripMargin) + } + + test("test lucene fine grain data map with search mode") { + + sqlContext.sparkSession.sparkContext.setLogLevel("WARN") + sql( + s""" + | CREATE DATAMAP dm ON TABLE datamap_test + | USING 'lucene' + | DMProperties('INDEX_COLUMNS'='Name') + """.stripMargin) + + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')") + checkAnswer(sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('name:n10')"), + sql(s"select * from datamap_test where name='n10'")) + + sql("drop datamap dm on table datamap_test") + } + + // TODOï¼ optimize performance + ignore("test lucene fine grain data map with TEXT_MATCH 'AND' Filter") { + sql("drop datamap if exists dm on table datamap_test") + sql( + s""" + | CREATE DATAMAP dm ON TABLE datamap_test + | USING 'lucene' + | DMProperties('INDEX_COLUMNS'='name') + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')") + checkAnswer( + sql("SELECT count(*) FROM datamap_test WHERE TEXT_MATCH('name:n2*') " + + "AND age=28 and id=200149"), + sql("SELECT count(*) FROM datamap_test WHERE name like 'n2%' " + + "AND age=28 and id=200149")) + sql("drop datamap if exists dm on table datamap_test") + } + + // TODOï¼ optimize performance + ignore("test lucene fine grain data map with TEXT_MATCH 'AND' and 'OR' Filter ") { + sql("drop datamap if exists dm on table datamap_test") + sql( + s""" + | CREATE DATAMAP dm ON TABLE datamap_test + | USING 'lucene' + | DMProperties('INDEX_COLUMNS'='name , city') + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')") + checkAnswer(sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('name:n1*') OR TEXT_MATCH ('city:c01*') " + + "AND TEXT_MATCH('city:C02*')"), + sql("select * from datamap_test where name like 'n1%' OR city like 'c01%' and city like" + + " 'c02%'")) + sql("drop datamap if exists dm on table datamap_test") + } + + test("test lucene fine grain data map with compaction-Major ") { + sql("DROP TABLE IF EXISTS datamap_test_table") + sql( + """ + | CREATE TABLE datamap_test_table(id INT, name STRING, city STRING, age INT) + | STORED BY 'carbondata' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') + """.stripMargin) + sql( + s""" + | CREATE DATAMAP dm ON TABLE datamap_test_table + | USING 'lucene' + | DMProperties('INDEX_COLUMNS'='name , city') + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_table OPTIONS('header'='false')") + checkAnswer(sql("SELECT * FROM datamap_test_table WHERE TEXT_MATCH('name:n10')"), + sql("select * from datamap_test_table where name='n10'")) + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_table OPTIONS('header'='false')") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_table OPTIONS('header'='false')") + sql("alter table datamap_test_table compact 'major'") + checkAnswer(sql("SELECT COUNT(*) FROM datamap_test_table WHERE TEXT_MATCH('name:n10')"), + sql("select COUNT(*) from datamap_test_table where name='n10'")) + sql("drop datamap if exists dm on table datamap_test_table") + sql("DROP TABLE IF EXISTS datamap_test_table") + } + + test("test lucene fine grain datamap rebuild") { + sql("DROP TABLE IF EXISTS datamap_test5") + sql( + """ + | CREATE TABLE datamap_test5(id INT, name STRING, city STRING, age INT) + | STORED BY 'carbondata' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') + """.stripMargin) + sql( + s""" + | CREATE DATAMAP dm ON TABLE datamap_test5 + | USING 'lucene' + | WITH DEFERRED REBUILD + | DMProperties('INDEX_COLUMNS'='city') + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test5 OPTIONS('header'='false')") + val map = DataMapStatusManager.readDataMapStatusMap() + assert(!map.get("dm").isEnabled) + sql("REBUILD DATAMAP dm ON TABLE datamap_test5") + checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c020')"), + sql(s"SELECT * FROM datamap_test5 WHERE city='c020'")) + sql("DROP TABLE IF EXISTS datamap_test5") + } + + test("test lucene fine grain datamap rebuild with table block size") { + sql("DROP TABLE IF EXISTS datamap_test5") + sql( + """ + | CREATE TABLE datamap_test5(id INT, name STRING, city STRING, age INT) + | STORED BY 'carbondata' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'TABLE_BLOCKSIZE'='1') + """.stripMargin) + sql( + s""" + | CREATE DATAMAP dm ON TABLE datamap_test5 + | USING 'lucene' + | DMProperties('INDEX_COLUMNS'='Name , cIty') + """.stripMargin) + + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test5 OPTIONS('header'='false')") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test5 OPTIONS('header'='false')") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test5 OPTIONS('header'='false')") + + checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c00')"), + sql(s"SELECT * FROM datamap_test5 WHERE city='c00'")) + checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c020')"), + sql(s"SELECT * FROM datamap_test5 WHERE city='c020'")) + checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c0100085')"), + sql(s"SELECT * FROM datamap_test5 WHERE city='c0100085'")) + checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c09560')"), + sql(s"SELECT * FROM datamap_test5 WHERE city='c09560'")) + sql("DROP TABLE IF EXISTS datamap_test5") + } + + test("test lucene fine grain multiple data map on table") { + sql("DROP TABLE IF EXISTS datamap_test5") + sql( + """ + | CREATE TABLE datamap_test5(id INT, name STRING, city STRING, age INT) + | STORED BY 'carbondata' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') + """.stripMargin) + sql( + s""" + | CREATE DATAMAP dm2 ON TABLE datamap_test5 + | USING 'lucene' + | DMProperties('INDEX_COLUMNS'='city') + """.stripMargin) + sql( + s""" + | CREATE DATAMAP dm1 ON TABLE datamap_test5 + | USING 'lucene' + | DMProperties('INDEX_COLUMNS'='Name') + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test5 OPTIONS('header'='false')") + checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('name:n10')"), + sql(s"select * from datamap_test5 where name='n10'")) + checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c020')"), + sql(s"SELECT * FROM datamap_test5 WHERE city='c020'")) + sql("DROP TABLE IF EXISTS datamap_test5") + } + + // TODOï¼ support it in the future + ignore("test lucene datamap and validate the visible and invisible status of datamap ") { + val tableName = "datamap_test2" + val dataMapName1 = "ggdatamap1"; + sql(s"DROP TABLE IF EXISTS $tableName") + sql( + s""" + | CREATE TABLE $tableName(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') + """.stripMargin) + // register datamap writer + sql( + s""" + | CREATE DATAMAP ggdatamap1 ON TABLE $tableName + | USING 'lucene' + | DMPROPERTIES('index_columns'='name') + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE $tableName OPTIONS('header'='false')") + + val df1 = sql(s"EXPLAIN EXTENDED SELECT * FROM $tableName WHERE TEXT_MATCH('name:n502670')").collect() + sql(s"SELECT * FROM $tableName WHERE TEXT_MATCH('name:n502670')").show() + println(df1(0).getString(0)) + assertResult( + s"""== CarbonData Profiler == + |Table Scan on datamap_test2 + | - total blocklets: 1 + | - filter: TEXT_MATCH('name:n502670') + | - pruned by Main DataMap + | - skipped blocklets: 0 + | - pruned by FG DataMap + | - name: ggdatamap1 + | - provider: lucene + | - skipped blocklets: 1 + |""".stripMargin)(df1(0).getString(0)) + + sql(s"set ${CarbonCommonConstants.CARBON_DATAMAP_VISIBLE}default.$tableName.$dataMapName1 = false") + + val df2 = sql(s"EXPLAIN EXTENDED SELECT * FROM $tableName WHERE name='n502670'").collect() + println(df2(0).getString(0)) + assertResult( + s"""== CarbonData Profiler == + |Table Scan on $tableName + | - total blocklets: 1 + | - filter: (name <> null and name = n502670) + | - pruned by Main DataMap + | - skipped blocklets: 0 + |""".stripMargin)(df2(0).getString(0)) + + checkAnswer(sql(s"SELECT * FROM $tableName WHERE name='n502670' AND city='c2670'"), + sql(s"SELECT * FROM $tableName WHERE name='n502670' AND city='c2670'")) + sql(s"DROP TABLE IF EXISTS $tableName") + } + + ignore("test lucene fine grain datamap rebuild with table block size, rebuild") { + sql("DROP TABLE IF EXISTS datamap_test5") + sql( + """ + | CREATE TABLE datamap_test5(id INT, name STRING, city STRING, age INT) + | STORED BY 'carbondata' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'TABLE_BLOCKSIZE'='1') + """.stripMargin) + sql( + s""" + | CREATE DATAMAP dm ON TABLE datamap_test5 + | USING 'lucene' + | WITH DEFERRED REBUILD + | DMProperties('INDEX_COLUMNS'='Name , cIty') + """.stripMargin) + + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test5 OPTIONS('header'='false')") + sql("REBUILD DATAMAP dm ON TABLE datamap_test5") + + sqlContext.sparkSession.asInstanceOf[CarbonSession].stopSearchMode() + sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c020')").show() + sqlContext.sparkSession.asInstanceOf[CarbonSession].startSearchMode() + sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c020')").show() + checkAnswer(sql("SELECT * FROM datamap_test5 WHERE TEXT_MATCH('city:c020')"), + sql(s"SELECT * FROM datamap_test5 WHERE city='c020'")) + sql("DROP TABLE IF EXISTS datamap_test5") + } + + override protected def afterAll(): Unit = { + LuceneFineGrainDataMapSuite.deleteFile(file2) + sql("DROP TABLE IF EXISTS datamap_test") + sql("DROP TABLE IF EXISTS datamap_test5") + sql("use default") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION, + CarbonProperties.getStorePath) + sqlContext.sparkSession.asInstanceOf[CarbonSession].stopSearchMode() + } + + def createFile(fileName: String, line: Int = 10000, start: Int = 0) = { + val write = new PrintWriter(new File(fileName)) + for (i <- start until (start + line)) { + write.println(i + "," + "n" + i + "," + "c0" + i + "," + Random.nextInt(80)) + } + write.close() + } + + def deleteFile(fileName: String): Unit = { + val file = new File(fileName) + if (file.exists()) { + file.delete() + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala index 2c94dab..d278fc5 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala @@ -109,4 +109,31 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll { sql("set carbon.search.enabled = false") assert(!sqlContext.sparkSession.asInstanceOf[CarbonSession].isSearchModeEnabled) } + + test("test lucene datamap with search mode") { + sql("DROP DATAMAP IF EXISTS dm ON TABLE main") + sql("CREATE DATAMAP dm ON TABLE main USING 'lucene' DMProperties('INDEX_COLUMNS'='id') ") + checkAnswer(sql("SELECT * FROM main WHERE TEXT_MATCH('id:100000')"), + sql(s"SELECT * FROM main WHERE id='100000'")) + sql("DROP DATAMAP if exists dm ON TABLE main") + } + + test("test lucene datamap with search mode 2") { + sql("drop datamap if exists dm3 ON TABLE main") + sql("CREATE DATAMAP dm3 ON TABLE main USING 'lucene' DMProperties('INDEX_COLUMNS'='city') ") + checkAnswer(sql("SELECT * FROM main WHERE TEXT_MATCH('city:city6')"), + sql("SELECT * FROM main WHERE city='city6'")) + sql("DROP DATAMAP if exists dm3 ON TABLE main") + } + + test("test lucene datamap with search mode, two column") { + sql("drop datamap if exists dm3 ON TABLE main") + sql("CREATE DATAMAP dm3 ON TABLE main USING 'lucene' DMProperties('INDEX_COLUMNS'='city , id') ") + checkAnswer(sql("SELECT * FROM main WHERE TEXT_MATCH('city:city6')"), + sql("SELECT * FROM main WHERE city='city6'")) + checkAnswer(sql("SELECT * FROM main WHERE TEXT_MATCH('id:100000')"), + sql(s"SELECT * FROM main WHERE id='100000'")) + sql("DROP DATAMAP if exists dm3 ON TABLE main") + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala index 29dcec9..186e39e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala @@ -68,10 +68,10 @@ case class CarbonSetCommand(command: SetCommand) override val output: Seq[Attribute] = command.output override def run(sparkSession: SparkSession): Seq[Row] = { - val sessionParms = CarbonEnv.getInstance(sparkSession).carbonSessionInfo.getSessionParams + val sessionParams = CarbonEnv.getInstance(sparkSession).carbonSessionInfo.getSessionParams command.kv match { case Some((key, Some(value))) => - CarbonSetCommand.validateAndSetValue(sessionParms, key, value) + CarbonSetCommand.validateAndSetValue(sessionParams, key, value) // handle search mode start/stop for ThriftServer usage if (key.equalsIgnoreCase(CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE)) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala index 07a444f..c052cd7 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala @@ -382,6 +382,8 @@ object CarbonFilters { CarbonScalaUtil.convertSparkToCarbonDataType(dataType))) new AndExpression(l, r) case StringTrim(child) => transformExpression(child) + case s: ScalaUDF => + new MatchExpression(s.children.head.toString()) case _ => new SparkUnknownExpression(expr.transform { case AttributeReference(name, dataType, _, _) => http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java ---------------------------------------------------------------------- diff --git a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java index 9727352..f6406c7 100644 --- a/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java +++ b/store/search/src/main/java/org/apache/carbondata/store/worker/SearchRequestHandler.java @@ -18,6 +18,7 @@ package org.apache.carbondata.store.worker; import java.io.IOException; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -27,7 +28,9 @@ import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.datamap.DataMapChooser; +import org.apache.carbondata.core.datamap.DataMapDistributable; import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper; import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; import org.apache.carbondata.core.datastore.block.TableBlockInfo; import org.apache.carbondata.core.datastore.row.CarbonRow; @@ -112,6 +115,8 @@ public class SearchRequestHandler { queryModel.setVectorReader(false); CarbonMultiBlockSplit mbSplit = request.split().value(); + List<TableBlockInfo> list = CarbonInputSplit.createBlocks(mbSplit.getAllSplits()); + queryModel.setTableBlockInfos(list); long limit = request.limit(); long rowCount = 0; @@ -158,22 +163,38 @@ public class SearchRequestHandler { CarbonMultiBlockSplit mbSplit, DataMapExprWrapper datamap) throws IOException { Objects.requireNonNull(datamap); List<Segment> segments = new LinkedList<>(); + HashMap<String, Integer> uniqueSegments = new HashMap<>(); for (CarbonInputSplit split : mbSplit.getAllSplits()) { - segments.add( - Segment.toSegment(split.getSegmentId(), - new LatestFilesReadCommittedScope(table.getTablePath()))); + String segmentId = split.getSegmentId(); + if (uniqueSegments.get(segmentId) == null) { + segments.add(Segment.toSegment( + segmentId, + new LatestFilesReadCommittedScope(table.getTablePath(), segmentId))); + uniqueSegments.put(segmentId, 1); + } else { + uniqueSegments.put(segmentId, uniqueSegments.get(segmentId) + 1); + } + } + + List<DataMapDistributableWrapper> distributables = datamap.toDistributable(segments); + List<ExtendedBlocklet> prunnedBlocklets = new LinkedList<ExtendedBlocklet>(); + for (int i = 0; i < distributables.size(); i++) { + DataMapDistributable dataMapDistributable = distributables.get(i).getDistributable(); + prunnedBlocklets.addAll(datamap.prune(dataMapDistributable, null)); } - List<ExtendedBlocklet> prunnedBlocklets = datamap.prune(segments, null); - List<String> pathToRead = new LinkedList<>(); - for (ExtendedBlocklet prunnedBlocklet : prunnedBlocklets) { - pathToRead.add(prunnedBlocklet.getPath()); + HashMap<String, ExtendedBlocklet> pathToRead = new HashMap<>(); + for (ExtendedBlocklet prunedBlocklet : prunnedBlocklets) { + pathToRead.put(prunedBlocklet.getFilePath(), prunedBlocklet); } List<TableBlockInfo> blocks = queryModel.getTableBlockInfos(); List<TableBlockInfo> blockToRead = new LinkedList<>(); for (TableBlockInfo block : blocks) { - if (pathToRead.contains(block.getFilePath())) { + if (pathToRead.keySet().contains(block.getFilePath())) { + // If not set this, it will can't create FineGrainBlocklet object in + // org.apache.carbondata.core.indexstore.blockletindex.BlockletDataRefNode.getIndexedData + block.setDataMapWriterPath(pathToRead.get(block.getFilePath()).getDataMapWriterPath()); blockToRead.add(block); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb71ffe1/store/search/src/main/scala/org/apache/spark/rpc/Master.scala ---------------------------------------------------------------------- diff --git a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala index 26de74c..f48f5e4 100644 --- a/store/search/src/main/scala/org/apache/spark/rpc/Master.scala +++ b/store/search/src/main/scala/org/apache/spark/rpc/Master.scala @@ -38,8 +38,7 @@ import org.apache.spark.util.ThreadUtils import org.apache.carbondata.common.annotations.InterfaceAudience import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.datamap.DataMapChooser -import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper +import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.block.Distributable import org.apache.carbondata.core.datastore.row.CarbonRow import org.apache.carbondata.core.metadata.schema.table.CarbonTable @@ -232,10 +231,14 @@ class Master(sparkConf: SparkConf) { // if we have enough data already, we do not need to collect more result if (rowCount < globalLimit) { - // wait for worker for 10s - ThreadUtils.awaitResult(future, Duration.apply("10s")) + // wait for worker + val timeout = CarbonProperties + .getInstance() + .getProperty(CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT, + CarbonCommonConstants.CARBON_SEARCH_QUERY_TIMEOUT_DEFAULT) + ThreadUtils.awaitResult(future, Duration.apply(timeout)) LOG.info(s"[SearchId:$queryId] receive search response from worker " + - s"${worker.address}:${worker.port}") + s"${worker.address}:${worker.port}") try { future.value match { case Some(response: Try[SearchResult]) =>