Github user ajantha-bhat commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2936#discussion_r235842114
  
    --- Diff: 
core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java ---
    @@ -120,37 +132,166 @@ public BlockletDetailsFetcher 
getBlockletDetailsFetcher() {
        * @param filterExp
        * @return
        */
    -  public List<ExtendedBlocklet> prune(List<Segment> segments, 
FilterResolverIntf filterExp,
    -      List<PartitionSpec> partitions) throws IOException {
    -    List<ExtendedBlocklet> blocklets = new ArrayList<>();
    -    SegmentProperties segmentProperties;
    -    Map<Segment, List<DataMap>> dataMaps = 
dataMapFactory.getDataMaps(segments);
    +  public List<ExtendedBlocklet> prune(List<Segment> segments, final 
FilterResolverIntf filterExp,
    +      final List<PartitionSpec> partitions) throws IOException {
    +    final List<ExtendedBlocklet> blocklets = new ArrayList<>();
    +    final Map<Segment, List<DataMap>> dataMaps = 
dataMapFactory.getDataMaps(segments);
    +    // for non-filter queries
    +    if (filterExp == null) {
    +      // if filter is not passed, then return all the blocklets.
    +      return pruneWithoutFilter(segments, partitions, blocklets);
    +    }
    +    // for filter queries
    +    int totalFiles = 0;
    +    boolean isBlockDataMapType = true;
    +    for (Segment segment : segments) {
    +      for (DataMap dataMap : dataMaps.get(segment)) {
    +        if (!(dataMap instanceof BlockDataMap)) {
    +          isBlockDataMapType = false;
    +          break;
    +        }
    +        totalFiles += ((BlockDataMap) dataMap).getTotalBlocks();
    +      }
    +      if (!isBlockDataMapType) {
    +        // totalFiles fill be 0 for non-BlockDataMap Type. ex: lucene, 
bloom datamap. use old flow.
    +        break;
    +      }
    +    }
    +    int numOfThreadsForPruning = getNumOfThreadsForPruning();
    +    int filesPerEachThread = totalFiles / numOfThreadsForPruning;
    +    if (numOfThreadsForPruning == 1 || filesPerEachThread == 1
    +        || segments.size() < numOfThreadsForPruning || totalFiles
    +        < 
CarbonCommonConstants.CARBON_DRIVER_PRUNING_MULTI_THREAD_ENABLE_FILES_COUNT) {
    +      // use multi-thread, only if the files are more than 0.1 million.
    +      // As 0.1 million files block pruning can take only 1 second.
    +      // Doing multi-thread for smaller values is not recommended as
    +      // driver should have minimum threads opened to support multiple 
concurrent queries.
    +      return pruneWithFilter(segments, filterExp, partitions, blocklets, 
dataMaps);
    +    }
    +    // handle by multi-thread
    +    return pruneWithFilterMultiThread(segments, filterExp, partitions, 
blocklets, dataMaps,
    +        totalFiles);
    +  }
    +
    +  private List<ExtendedBlocklet> pruneWithoutFilter(List<Segment> segments,
    +      List<PartitionSpec> partitions, List<ExtendedBlocklet> blocklets) 
throws IOException {
    +    for (Segment segment : segments) {
    +      List<Blocklet> allBlocklets = 
blockletDetailsFetcher.getAllBlocklets(segment, partitions);
    +      blocklets.addAll(
    +          
addSegmentId(blockletDetailsFetcher.getExtendedBlocklets(allBlocklets, segment),
    +              segment.toString()));
    +    }
    +    return blocklets;
    +  }
    +
    +  private List<ExtendedBlocklet> pruneWithFilter(List<Segment> segments,
    +      FilterResolverIntf filterExp, List<PartitionSpec> partitions,
    +      List<ExtendedBlocklet> blocklets, Map<Segment, List<DataMap>> 
dataMaps) throws IOException {
         for (Segment segment : segments) {
           List<Blocklet> pruneBlocklets = new ArrayList<>();
    -      // if filter is not passed then return all the blocklets
    -      if (filterExp == null) {
    -        pruneBlocklets = blockletDetailsFetcher.getAllBlocklets(segment, 
partitions);
    -      } else {
    -        segmentProperties = 
segmentPropertiesFetcher.getSegmentProperties(segment);
    -        for (DataMap dataMap : dataMaps.get(segment)) {
    -          pruneBlocklets.addAll(dataMap.prune(filterExp, 
segmentProperties, partitions));
    +      SegmentProperties segmentProperties = 
segmentPropertiesFetcher.getSegmentProperties(segment);
    +      for (DataMap dataMap : dataMaps.get(segment)) {
    +        pruneBlocklets.addAll(dataMap.prune(filterExp, segmentProperties, 
partitions));
    +      }
    +      blocklets.addAll(
    +          
addSegmentId(blockletDetailsFetcher.getExtendedBlocklets(pruneBlocklets, 
segment),
    +              segment.toString()));
    +    }
    +    return blocklets;
    +  }
    +
    +  private List<ExtendedBlocklet> pruneWithFilterMultiThread(List<Segment> 
segments,
    +      final FilterResolverIntf filterExp, final List<PartitionSpec> 
partitions,
    +      List<ExtendedBlocklet> blocklets, final Map<Segment, List<DataMap>> 
dataMaps,
    +      int totalFiles) {
    +    int numOfThreadsForPruning = getNumOfThreadsForPruning();
    +    int filesPerEachThread = (int) Math.ceil((double)totalFiles / 
numOfThreadsForPruning);
    +    int prev = 0;
    +    int filesCount = 0;
    +    int processedFileCount = 0;
    +    List<List<Segment>> segmentList = new ArrayList<>();
    --- End diff --
    
    done


---

Reply via email to