haridsv commented on code in PR #2411:
URL: https://github.com/apache/phoenix/pull/2411#discussion_r3099675545
##########
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormat.java:
##########
@@ -211,4 +242,93 @@ List<InputSplit> filterCompletedSplits(List<InputSplit>
allSplits,
}
return unprocessedSplits;
}
+
+ /**
+ * Coalesces multiple region splits from the same RegionServer into single
InputSplits. All
+ * regions from the same server are coalesced into one split, regardless of
count or size. This
+ * reduces mapper count and avoids hot spotting when many concurrent mappers
hit the same server.
+ * @param unprocessedSplits Splits remaining after filtering completed
regions
+ * @param regionLocator HBase RegionLocator for querying region locations
+ * @return Coalesced splits with all regions per server combined into one
split
+ */
+ List<InputSplit> coalesceSplits(List<InputSplit> unprocessedSplits,
RegionLocator regionLocator)
+ throws IOException, InterruptedException {
+ // Group splits by RegionServer location
+ Map<String, List<PhoenixInputSplit>> splitsByServer =
+ groupSplitsByServer(unprocessedSplits, regionLocator);
+
+ List<InputSplit> coalescedSplits = new ArrayList<>();
+
+ // For each RegionServer, create one coalesced split with ALL regions from
that server
+ for (Map.Entry<String, List<PhoenixInputSplit>> entry :
splitsByServer.entrySet()) {
+ String serverName = entry.getKey();
+ List<PhoenixInputSplit> serverSplits = entry.getValue();
+
+ // Sort splits by start key for sequential processing
+ serverSplits.sort((s1, s2) ->
Bytes.compareTo(s1.getKeyRange().getLowerRange(),
+ s2.getKeyRange().getLowerRange()));
+ // Create single coalesced split with ALL regions from this server
+ coalescedSplits.add(createCoalescedSplit(serverSplits, serverName));
+ }
+
+ return coalescedSplits;
+ }
+
+ /**
+ * Groups splits by RegionServer location for locality-aware coalescing.
Uses HBase RegionLocator
+ * API to determine which server hosts each region.
+ * @param splits List of splits to group
+ * @param regionLocator HBase RegionLocator for querying region locations
+ * @return Map of server name to list of splits hosted on that server
+ */
+ private Map<String, List<PhoenixInputSplit>>
groupSplitsByServer(List<InputSplit> splits,
+ RegionLocator regionLocator) throws IOException {
+ Map<String, List<PhoenixInputSplit>> splitsByServer = new HashMap<>();
+ for (InputSplit split : splits) {
+ PhoenixInputSplit pSplit = (PhoenixInputSplit) split;
+ KeyRange keyRange = pSplit.getKeyRange();
+ HRegionLocation regionLocation =
+ regionLocator.getRegionLocation(keyRange.getLowerRange(), false);
+ if (regionLocation == null) {
+ throw new IOException("Could not determine region location for key: "
+ + Bytes.toStringBinary(keyRange.getLowerRange()));
+ }
Review Comment:
I am thinking that this should never see a `null` if all regions are fully
online and client cache is up to date. However (per what AI says), if the
region the key maps to is currently offline due to a RIT event (say, in the
middle of a split), then the return value will be null, so shouldn't we retry
to ride over such RITs, just like how hbase-client does during data path such
as scans? Otherwise, there is a high chance of hitting a RIT and treating it as
an error, correct?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]