Github user JamesRTaylor commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/12#discussion_r17405518
  
    --- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
 ---
    @@ -117,121 +125,59 @@ public boolean apply(HRegionLocation location) {
         }
     
         protected List<KeyRange> genKeyRanges(List<HRegionLocation> regions) {
    -        if (regions.isEmpty()) {
    -            return Collections.emptyList();
    +        if (regions.isEmpty()) { return Collections.emptyList(); }
    +        List<PColumnFamily> columnFamilies = 
this.tableRef.getTable().getColumnFamilies();
    +        // Collect all the guide posts across families. Sort them and then 
create a key range that starts 
    +        // from [] to [].  Then intersect it with the region boundary
    +        List<KeyRange> regionStartEndKey = 
Lists.newArrayListWithExpectedSize(regions.size());
    +        for (HRegionLocation region : regions) {
    +            
regionStartEndKey.add(KeyRange.getKeyRange(region.getRegionInfo().getStartKey(),
 region.getRegionInfo()
    +                    .getEndKey()));
             }
    -        
    -        StatsManager statsManager = 
context.getConnection().getQueryServices().getStatsManager();
    -        // the splits are computed as follows:
    -        //
    -        // let's suppose:
    -        // t = target concurrency
    -        // m = max concurrency
    -        // r = the number of regions we need to scan
    -        //
    -        // if r >= t:
    -        //    scan using regional boundaries
    -        // elif r > t/2:
    -        //    split each region in s splits such that:
    -        //    s = max(x) where s * x < m
    -        // else:
    -        //    split each region in s splits such that:
    -        //    s = max(x) where s * x < t
    -        //
    -        // The idea is to align splits with region boundaries. If rows are 
not evenly
    -        // distributed across regions, using this scheme compensates for 
regions that
    -        // have more rows than others, by applying tighter splits and 
therefore spawning
    -        // off more scans over the overloaded regions.
    -        int splitsPerRegion = getSplitsPerRegion(regions.size());
    -        // Create a multi-map of ServerName to List<KeyRange> which we'll 
use to round robin from to ensure
    -        // that we keep each region server busy for each query.
    -        ListMultimap<HRegionLocation,KeyRange> keyRangesPerRegion = 
ArrayListMultimap.create(regions.size(),regions.size() * splitsPerRegion);;
    -        if (splitsPerRegion == 1) {
    -            for (HRegionLocation region : regions) {
    -                keyRangesPerRegion.put(region, 
ParallelIterators.TO_KEY_RANGE.apply(region));
    -            }
    -        } else {
    -            // Maintain bucket for each server and then returns KeyRanges 
in round-robin
    -            // order to ensure all servers are utilized.
    -            for (HRegionLocation region : regions) {
    -                byte[] startKey = region.getRegionInfo().getStartKey();
    -                byte[] stopKey = region.getRegionInfo().getEndKey();
    -                boolean lowerUnbound = Bytes.compareTo(startKey, 
HConstants.EMPTY_START_ROW) == 0;
    -                boolean upperUnbound = Bytes.compareTo(stopKey, 
HConstants.EMPTY_END_ROW) == 0;
    -                /*
    -                 * If lower/upper unbound, get the min/max key from the 
stats manager.
    -                 * We use this as the boundary to split on, but we still 
use the empty
    -                 * byte as the boundary in the actual scan (in case our 
stats are out
    -                 * of date).
    -                 */
    -                if (lowerUnbound) {
    -                    startKey = statsManager.getMinKey(tableRef);
    -                    if (startKey == null) {
    -                        
keyRangesPerRegion.put(region,ParallelIterators.TO_KEY_RANGE.apply(region));
    -                        continue;
    -                    }
    -                }
    -                if (upperUnbound) {
    -                    stopKey = statsManager.getMaxKey(tableRef);
    -                    if (stopKey == null) {
    -                        
keyRangesPerRegion.put(region,ParallelIterators.TO_KEY_RANGE.apply(region));
    -                        continue;
    -                    }
    -                }
    -                
    -                byte[][] boundaries = null;
    -                // Both startKey and stopKey will be empty the first time
    -                if (Bytes.compareTo(startKey, stopKey) >= 0 || (boundaries 
= Bytes.split(startKey, stopKey, splitsPerRegion - 1)) == null) {
    -                    // Bytes.split may return null if the key space
    -                    // between start and end key is too small
    -                    
keyRangesPerRegion.put(region,ParallelIterators.TO_KEY_RANGE.apply(region));
    -                } else {
    -                    
keyRangesPerRegion.put(region,KeyRange.getKeyRange(lowerUnbound ? 
KeyRange.UNBOUND : boundaries[0], boundaries[1]));
    -                    if (boundaries.length > 1) {
    -                        for (int i = 1; i < boundaries.length-2; i++) {
    -                            
keyRangesPerRegion.put(region,KeyRange.getKeyRange(boundaries[i], true, 
boundaries[i+1], false));
    +        List<KeyRange> guidePosts = 
Lists.newArrayListWithCapacity(regions.size());
    +        List<byte[]> guidePostsBytes = 
Lists.newArrayListWithCapacity(regions.size());
    +        for (PColumnFamily fam : columnFamilies) {
    +            List<byte[]> gps = fam.getGuidePosts();
    +            if (gps != null) {
    +                for (byte[] guidePost : gps) {
    +                    PhoenixArray array = 
(PhoenixArray)PDataType.VARBINARY_ARRAY.toObject(guidePost);
    +                    if (array != null && array.getDimensions() != 0) {
    +                        for (int j = 0; j < array.getDimensions(); j++) {
    +                            guidePostsBytes.add(array.toBytes(j));
                             }
    -                        
keyRangesPerRegion.put(region,KeyRange.getKeyRange(boundaries[boundaries.length-2],
 true, upperUnbound ? KeyRange.UNBOUND : boundaries[boundaries.length-1], 
false));
                         }
                     }
                 }
             }
    -        List<KeyRange> splits = 
Lists.newArrayListWithCapacity(regions.size() * splitsPerRegion);
    -        // as documented for ListMultimap
    -        Collection<Collection<KeyRange>> values = 
keyRangesPerRegion.asMap().values();
    -        List<Collection<KeyRange>> keyRangesList = 
Lists.newArrayList(values);
    -        // Randomize range order to ensure that we don't hit the region 
servers in the same order every time
    -        // thus helping to distribute the load more evenly
    -        Collections.shuffle(keyRangesList);
    -        // Transpose values in map to get regions in round-robin server 
order. This ensures that
    -        // all servers will be used to process the set of parallel threads 
available in our executor.
    -        int i = 0;
    -        boolean done;
    -        do {
    -            done = true;
    -            for (int j = 0; j < keyRangesList.size(); j++) {
    -                List<KeyRange> keyRanges = 
(List<KeyRange>)keyRangesList.get(j);
    -                if (i < keyRanges.size()) {
    -                    splits.add(keyRanges.get(i));
    -                    done = false;
    +        // If the guideposts are already sorted this may not be needed. 
But across family it is difficult to ensure
    +        // they are sorted
    +        Collections.sort(guidePostsBytes, Bytes.BYTES_COMPARATOR);
    +        int size = guidePostsBytes.size();
    +        if (size > 0) {
    +            if (size > 1) {
    +                
guidePosts.add(KeyRange.getKeyRange(HConstants.EMPTY_BYTE_ARRAY, 
guidePostsBytes.get(0)));
    +                for (int i = 0; i < size - 2; i++) {
    +                    
guidePosts.add(KeyRange.getKeyRange(guidePostsBytes.get(i), 
(guidePostsBytes.get(i + 1))));
                     }
    +                
guidePosts.add(KeyRange.getKeyRange(guidePostsBytes.get(size - 2), 
(guidePostsBytes.get(size - 1))));
    +                
guidePosts.add(KeyRange.getKeyRange(guidePostsBytes.get(size - 1), 
(HConstants.EMPTY_BYTE_ARRAY)));
    +            } else {
    +                byte[] gp = guidePostsBytes.get(0);
    +                
guidePosts.add(KeyRange.getKeyRange(HConstants.EMPTY_BYTE_ARRAY, gp));
    +                guidePosts.add(KeyRange.getKeyRange(gp, 
HConstants.EMPTY_BYTE_ARRAY));
                 }
    -            i++;
    -        } while (!done);
    -        return splits;
    +
    +        }
    +        if (guidePosts.size() > 0) {
    +            List<KeyRange> intersect = KeyRange.intersect(guidePosts, 
regionStartEndKey);
    --- End diff --
    
    We don't want to do an intersect here. We want to do a merge sort between 
the region boundaries and the guideposts along these lines:
    
    * get the region boundaries (we know these ranges cover the table 
completely)
    * merge bisect the region boundary ranges with the guideposts. The
    guideposts are keys, not ranges. Normally a guidepost (gp1) to its
    next guidepost (gp2) will not span a region boundary. If it does, it
    means that a split has occurred and the guideposts have not been
    updated. In that case, you'd get an extra range from gp1 to
    region.upperRange and from region.upperRange to gp2.
    
    So the processing would be along the lines of walking through both the
    region boundary ranges and the guideposts like a merge sort, bisecting
    when necessary.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to