Github user JamesRTaylor commented on a diff in the pull request:
https://github.com/apache/phoenix/pull/12#discussion_r17405518
--- Diff:
phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java
---
@@ -117,121 +125,59 @@ public boolean apply(HRegionLocation location) {
}
protected List<KeyRange> genKeyRanges(List<HRegionLocation> regions) {
- if (regions.isEmpty()) {
- return Collections.emptyList();
+ if (regions.isEmpty()) { return Collections.emptyList(); }
+ List<PColumnFamily> columnFamilies =
this.tableRef.getTable().getColumnFamilies();
+ // Collect all the guide posts across families. Sort them and then
create a key range that starts
+ // from [] to []. Then intersect it with the region boundary
+ List<KeyRange> regionStartEndKey =
Lists.newArrayListWithExpectedSize(regions.size());
+ for (HRegionLocation region : regions) {
+
regionStartEndKey.add(KeyRange.getKeyRange(region.getRegionInfo().getStartKey(),
region.getRegionInfo()
+ .getEndKey()));
}
-
- StatsManager statsManager =
context.getConnection().getQueryServices().getStatsManager();
- // the splits are computed as follows:
- //
- // let's suppose:
- // t = target concurrency
- // m = max concurrency
- // r = the number of regions we need to scan
- //
- // if r >= t:
- // scan using regional boundaries
- // elif r > t/2:
- // split each region in s splits such that:
- // s = max(x) where s * x < m
- // else:
- // split each region in s splits such that:
- // s = max(x) where s * x < t
- //
- // The idea is to align splits with region boundaries. If rows are
not evenly
- // distributed across regions, using this scheme compensates for
regions that
- // have more rows than others, by applying tighter splits and
therefore spawning
- // off more scans over the overloaded regions.
- int splitsPerRegion = getSplitsPerRegion(regions.size());
- // Create a multi-map of ServerName to List<KeyRange> which we'll
use to round robin from to ensure
- // that we keep each region server busy for each query.
- ListMultimap<HRegionLocation,KeyRange> keyRangesPerRegion =
ArrayListMultimap.create(regions.size(),regions.size() * splitsPerRegion);;
- if (splitsPerRegion == 1) {
- for (HRegionLocation region : regions) {
- keyRangesPerRegion.put(region,
ParallelIterators.TO_KEY_RANGE.apply(region));
- }
- } else {
- // Maintain bucket for each server and then returns KeyRanges
in round-robin
- // order to ensure all servers are utilized.
- for (HRegionLocation region : regions) {
- byte[] startKey = region.getRegionInfo().getStartKey();
- byte[] stopKey = region.getRegionInfo().getEndKey();
- boolean lowerUnbound = Bytes.compareTo(startKey,
HConstants.EMPTY_START_ROW) == 0;
- boolean upperUnbound = Bytes.compareTo(stopKey,
HConstants.EMPTY_END_ROW) == 0;
- /*
- * If lower/upper unbound, get the min/max key from the
stats manager.
- * We use this as the boundary to split on, but we still
use the empty
- * byte as the boundary in the actual scan (in case our
stats are out
- * of date).
- */
- if (lowerUnbound) {
- startKey = statsManager.getMinKey(tableRef);
- if (startKey == null) {
-
keyRangesPerRegion.put(region,ParallelIterators.TO_KEY_RANGE.apply(region));
- continue;
- }
- }
- if (upperUnbound) {
- stopKey = statsManager.getMaxKey(tableRef);
- if (stopKey == null) {
-
keyRangesPerRegion.put(region,ParallelIterators.TO_KEY_RANGE.apply(region));
- continue;
- }
- }
-
- byte[][] boundaries = null;
- // Both startKey and stopKey will be empty the first time
- if (Bytes.compareTo(startKey, stopKey) >= 0 || (boundaries
= Bytes.split(startKey, stopKey, splitsPerRegion - 1)) == null) {
- // Bytes.split may return null if the key space
- // between start and end key is too small
-
keyRangesPerRegion.put(region,ParallelIterators.TO_KEY_RANGE.apply(region));
- } else {
-
keyRangesPerRegion.put(region,KeyRange.getKeyRange(lowerUnbound ?
KeyRange.UNBOUND : boundaries[0], boundaries[1]));
- if (boundaries.length > 1) {
- for (int i = 1; i < boundaries.length-2; i++) {
-
keyRangesPerRegion.put(region,KeyRange.getKeyRange(boundaries[i], true,
boundaries[i+1], false));
+ List<KeyRange> guidePosts =
Lists.newArrayListWithCapacity(regions.size());
+ List<byte[]> guidePostsBytes =
Lists.newArrayListWithCapacity(regions.size());
+ for (PColumnFamily fam : columnFamilies) {
+ List<byte[]> gps = fam.getGuidePosts();
+ if (gps != null) {
+ for (byte[] guidePost : gps) {
+ PhoenixArray array =
(PhoenixArray)PDataType.VARBINARY_ARRAY.toObject(guidePost);
+ if (array != null && array.getDimensions() != 0) {
+ for (int j = 0; j < array.getDimensions(); j++) {
+ guidePostsBytes.add(array.toBytes(j));
}
-
keyRangesPerRegion.put(region,KeyRange.getKeyRange(boundaries[boundaries.length-2],
true, upperUnbound ? KeyRange.UNBOUND : boundaries[boundaries.length-1],
false));
}
}
}
}
- List<KeyRange> splits =
Lists.newArrayListWithCapacity(regions.size() * splitsPerRegion);
- // as documented for ListMultimap
- Collection<Collection<KeyRange>> values =
keyRangesPerRegion.asMap().values();
- List<Collection<KeyRange>> keyRangesList =
Lists.newArrayList(values);
- // Randomize range order to ensure that we don't hit the region
servers in the same order every time
- // thus helping to distribute the load more evenly
- Collections.shuffle(keyRangesList);
- // Transpose values in map to get regions in round-robin server
order. This ensures that
- // all servers will be used to process the set of parallel threads
available in our executor.
- int i = 0;
- boolean done;
- do {
- done = true;
- for (int j = 0; j < keyRangesList.size(); j++) {
- List<KeyRange> keyRanges =
(List<KeyRange>)keyRangesList.get(j);
- if (i < keyRanges.size()) {
- splits.add(keyRanges.get(i));
- done = false;
+ // If the guideposts are already sorted this may not be needed.
But across family it is difficult to ensure
+ // they are sorted
+ Collections.sort(guidePostsBytes, Bytes.BYTES_COMPARATOR);
+ int size = guidePostsBytes.size();
+ if (size > 0) {
+ if (size > 1) {
+
guidePosts.add(KeyRange.getKeyRange(HConstants.EMPTY_BYTE_ARRAY,
guidePostsBytes.get(0)));
+ for (int i = 0; i < size - 2; i++) {
+
guidePosts.add(KeyRange.getKeyRange(guidePostsBytes.get(i),
(guidePostsBytes.get(i + 1))));
}
+
guidePosts.add(KeyRange.getKeyRange(guidePostsBytes.get(size - 2),
(guidePostsBytes.get(size - 1))));
+
guidePosts.add(KeyRange.getKeyRange(guidePostsBytes.get(size - 1),
(HConstants.EMPTY_BYTE_ARRAY)));
+ } else {
+ byte[] gp = guidePostsBytes.get(0);
+
guidePosts.add(KeyRange.getKeyRange(HConstants.EMPTY_BYTE_ARRAY, gp));
+ guidePosts.add(KeyRange.getKeyRange(gp,
HConstants.EMPTY_BYTE_ARRAY));
}
- i++;
- } while (!done);
- return splits;
+
+ }
+ if (guidePosts.size() > 0) {
+ List<KeyRange> intersect = KeyRange.intersect(guidePosts,
regionStartEndKey);
--- End diff --
We don't want to do an intersect here. We want to do a merge sort between
the region boundaries and the guideposts along these lines:
* get the region boundaries (we know these ranges cover the table
completely)
* merge bisect the region boundary ranges with the guideposts. The
guideposts are keys, not ranges. Normally a guidepost (gp1) to its
next guidepost (gp2) will not span a region boundary. If it does, it
means that a split has occurred and the guideposts have not been
updated. In that case, you'd get an extra range from gp1 to
region.upperRange and from region.upperRange to gp2.
So the processing would be along the lines of walking through both the
region boundary ranges and the guideposts like a merge sort, bisecting
when necessary.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---