Github user ajantha-bhat commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2936#discussion_r235842114 --- Diff: core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java --- @@ -120,37 +132,166 @@ public BlockletDetailsFetcher getBlockletDetailsFetcher() { * @param filterExp * @return */ - public List<ExtendedBlocklet> prune(List<Segment> segments, FilterResolverIntf filterExp, - List<PartitionSpec> partitions) throws IOException { - List<ExtendedBlocklet> blocklets = new ArrayList<>(); - SegmentProperties segmentProperties; - Map<Segment, List<DataMap>> dataMaps = dataMapFactory.getDataMaps(segments); + public List<ExtendedBlocklet> prune(List<Segment> segments, final FilterResolverIntf filterExp, + final List<PartitionSpec> partitions) throws IOException { + final List<ExtendedBlocklet> blocklets = new ArrayList<>(); + final Map<Segment, List<DataMap>> dataMaps = dataMapFactory.getDataMaps(segments); + // for non-filter queries + if (filterExp == null) { + // if filter is not passed, then return all the blocklets. + return pruneWithoutFilter(segments, partitions, blocklets); + } + // for filter queries + int totalFiles = 0; + boolean isBlockDataMapType = true; + for (Segment segment : segments) { + for (DataMap dataMap : dataMaps.get(segment)) { + if (!(dataMap instanceof BlockDataMap)) { + isBlockDataMapType = false; + break; + } + totalFiles += ((BlockDataMap) dataMap).getTotalBlocks(); + } + if (!isBlockDataMapType) { + // totalFiles fill be 0 for non-BlockDataMap Type. ex: lucene, bloom datamap. use old flow. + break; + } + } + int numOfThreadsForPruning = getNumOfThreadsForPruning(); + int filesPerEachThread = totalFiles / numOfThreadsForPruning; + if (numOfThreadsForPruning == 1 || filesPerEachThread == 1 + || segments.size() < numOfThreadsForPruning || totalFiles + < CarbonCommonConstants.CARBON_DRIVER_PRUNING_MULTI_THREAD_ENABLE_FILES_COUNT) { + // use multi-thread, only if the files are more than 0.1 million. + // As 0.1 million files block pruning can take only 1 second. + // Doing multi-thread for smaller values is not recommended as + // driver should have minimum threads opened to support multiple concurrent queries. + return pruneWithFilter(segments, filterExp, partitions, blocklets, dataMaps); + } + // handle by multi-thread + return pruneWithFilterMultiThread(segments, filterExp, partitions, blocklets, dataMaps, + totalFiles); + } + + private List<ExtendedBlocklet> pruneWithoutFilter(List<Segment> segments, + List<PartitionSpec> partitions, List<ExtendedBlocklet> blocklets) throws IOException { + for (Segment segment : segments) { + List<Blocklet> allBlocklets = blockletDetailsFetcher.getAllBlocklets(segment, partitions); + blocklets.addAll( + addSegmentId(blockletDetailsFetcher.getExtendedBlocklets(allBlocklets, segment), + segment.toString())); + } + return blocklets; + } + + private List<ExtendedBlocklet> pruneWithFilter(List<Segment> segments, + FilterResolverIntf filterExp, List<PartitionSpec> partitions, + List<ExtendedBlocklet> blocklets, Map<Segment, List<DataMap>> dataMaps) throws IOException { for (Segment segment : segments) { List<Blocklet> pruneBlocklets = new ArrayList<>(); - // if filter is not passed then return all the blocklets - if (filterExp == null) { - pruneBlocklets = blockletDetailsFetcher.getAllBlocklets(segment, partitions); - } else { - segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment); - for (DataMap dataMap : dataMaps.get(segment)) { - pruneBlocklets.addAll(dataMap.prune(filterExp, segmentProperties, partitions)); + SegmentProperties segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment); + for (DataMap dataMap : dataMaps.get(segment)) { + pruneBlocklets.addAll(dataMap.prune(filterExp, segmentProperties, partitions)); + } + blocklets.addAll( + addSegmentId(blockletDetailsFetcher.getExtendedBlocklets(pruneBlocklets, segment), + segment.toString())); + } + return blocklets; + } + + private List<ExtendedBlocklet> pruneWithFilterMultiThread(List<Segment> segments, + final FilterResolverIntf filterExp, final List<PartitionSpec> partitions, + List<ExtendedBlocklet> blocklets, final Map<Segment, List<DataMap>> dataMaps, + int totalFiles) { + int numOfThreadsForPruning = getNumOfThreadsForPruning(); + int filesPerEachThread = (int) Math.ceil((double)totalFiles / numOfThreadsForPruning); + int prev = 0; + int filesCount = 0; + int processedFileCount = 0; + List<List<Segment>> segmentList = new ArrayList<>(); --- End diff -- done
---