Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2936#discussion_r235612449
  
    --- Diff: 
core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java ---
    @@ -120,37 +132,166 @@ public BlockletDetailsFetcher 
getBlockletDetailsFetcher() {
        * @param filterExp
        * @return
        */
    -  public List<ExtendedBlocklet> prune(List<Segment> segments, 
FilterResolverIntf filterExp,
    -      List<PartitionSpec> partitions) throws IOException {
    -    List<ExtendedBlocklet> blocklets = new ArrayList<>();
    -    SegmentProperties segmentProperties;
    -    Map<Segment, List<DataMap>> dataMaps = 
dataMapFactory.getDataMaps(segments);
    +  public List<ExtendedBlocklet> prune(List<Segment> segments, final 
FilterResolverIntf filterExp,
    +      final List<PartitionSpec> partitions) throws IOException {
    +    final List<ExtendedBlocklet> blocklets = new ArrayList<>();
    +    final Map<Segment, List<DataMap>> dataMaps = 
dataMapFactory.getDataMaps(segments);
    +    // for non-filter queries
    +    if (filterExp == null) {
    +      // if filter is not passed, then return all the blocklets.
    +      return pruneWithoutFilter(segments, partitions, blocklets);
    +    }
    +    // for filter queries
    +    int totalFiles = 0;
    +    boolean isBlockDataMapType = true;
    +    for (Segment segment : segments) {
    +      for (DataMap dataMap : dataMaps.get(segment)) {
    +        if (!(dataMap instanceof BlockDataMap)) {
    +          isBlockDataMapType = false;
    +          break;
    +        }
    +        totalFiles += ((BlockDataMap) dataMap).getTotalBlocks();
    +      }
    +      if (!isBlockDataMapType) {
    +        // totalFiles fill be 0 for non-BlockDataMap Type. ex: lucene, 
bloom datamap. use old flow.
    +        break;
    +      }
    +    }
    +    int numOfThreadsForPruning = getNumOfThreadsForPruning();
    +    int filesPerEachThread = totalFiles / numOfThreadsForPruning;
    +    if (numOfThreadsForPruning == 1 || filesPerEachThread == 1
    +        || segments.size() < numOfThreadsForPruning || totalFiles
    +        < 
CarbonCommonConstants.CARBON_DRIVER_PRUNING_MULTI_THREAD_ENABLE_FILES_COUNT) {
    +      // use multi-thread, only if the files are more than 0.1 million.
    +      // As 0.1 million files block pruning can take only 1 second.
    +      // Doing multi-thread for smaller values is not recommended as
    +      // driver should have minimum threads opened to support multiple 
concurrent queries.
    +      return pruneWithFilter(segments, filterExp, partitions, blocklets, 
dataMaps);
    +    }
    +    // handle by multi-thread
    +    return pruneWithFilterMultiThread(segments, filterExp, partitions, 
blocklets, dataMaps,
    +        totalFiles);
    +  }
    +
    +  private List<ExtendedBlocklet> pruneWithoutFilter(List<Segment> segments,
    +      List<PartitionSpec> partitions, List<ExtendedBlocklet> blocklets) 
throws IOException {
    +    for (Segment segment : segments) {
    +      List<Blocklet> allBlocklets = 
blockletDetailsFetcher.getAllBlocklets(segment, partitions);
    +      blocklets.addAll(
    +          
addSegmentId(blockletDetailsFetcher.getExtendedBlocklets(allBlocklets, segment),
    +              segment.toString()));
    +    }
    +    return blocklets;
    +  }
    +
    +  private List<ExtendedBlocklet> pruneWithFilter(List<Segment> segments,
    +      FilterResolverIntf filterExp, List<PartitionSpec> partitions,
    +      List<ExtendedBlocklet> blocklets, Map<Segment, List<DataMap>> 
dataMaps) throws IOException {
         for (Segment segment : segments) {
           List<Blocklet> pruneBlocklets = new ArrayList<>();
    -      // if filter is not passed then return all the blocklets
    -      if (filterExp == null) {
    -        pruneBlocklets = blockletDetailsFetcher.getAllBlocklets(segment, 
partitions);
    -      } else {
    -        segmentProperties = 
segmentPropertiesFetcher.getSegmentProperties(segment);
    -        for (DataMap dataMap : dataMaps.get(segment)) {
    -          pruneBlocklets.addAll(dataMap.prune(filterExp, 
segmentProperties, partitions));
    +      SegmentProperties segmentProperties = 
segmentPropertiesFetcher.getSegmentProperties(segment);
    +      for (DataMap dataMap : dataMaps.get(segment)) {
    +        pruneBlocklets.addAll(dataMap.prune(filterExp, segmentProperties, 
partitions));
    +      }
    +      blocklets.addAll(
    +          
addSegmentId(blockletDetailsFetcher.getExtendedBlocklets(pruneBlocklets, 
segment),
    +              segment.toString()));
    +    }
    +    return blocklets;
    +  }
    +
    +  private List<ExtendedBlocklet> pruneWithFilterMultiThread(List<Segment> 
segments,
    +      final FilterResolverIntf filterExp, final List<PartitionSpec> 
partitions,
    +      List<ExtendedBlocklet> blocklets, final Map<Segment, List<DataMap>> 
dataMaps,
    +      int totalFiles) {
    +    int numOfThreadsForPruning = getNumOfThreadsForPruning();
    +    int filesPerEachThread = (int) Math.ceil((double)totalFiles / 
numOfThreadsForPruning);
    +    int prev = 0;
    +    int filesCount = 0;
    +    int processedFileCount = 0;
    +    List<List<Segment>> segmentList = new ArrayList<>();
    --- End diff --
    
    I feel it is  better splitting should happen as per datamaps not segments. 
One segment can have million files in case of big load, so please try parallel 
execution of datamap pruning at datamap level


---

Reply via email to