haridsv commented on code in PR #2411:
URL: https://github.com/apache/phoenix/pull/2411#discussion_r3138392840
##########
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormat.java:
##########
@@ -211,4 +242,93 @@ List<InputSplit> filterCompletedSplits(List<InputSplit>
allSplits,
}
return unprocessedSplits;
}
+
+ /**
+ * Coalesces multiple region splits from the same RegionServer into single
InputSplits. All
+ * regions from the same server are coalesced into one split, regardless of
count or size. This
+ * reduces mapper count and avoids hot spotting when many concurrent mappers
hit the same server.
+ * @param unprocessedSplits Splits remaining after filtering completed
regions
+ * @param regionLocator HBase RegionLocator for querying region locations
+ * @return Coalesced splits with all regions per server combined into one
split
+ */
+ List<InputSplit> coalesceSplits(List<InputSplit> unprocessedSplits,
RegionLocator regionLocator)
+ throws IOException, InterruptedException {
+ // Group splits by RegionServer location
+ Map<String, List<PhoenixInputSplit>> splitsByServer =
+ groupSplitsByServer(unprocessedSplits, regionLocator);
+
+ List<InputSplit> coalescedSplits = new ArrayList<>();
+
+ // For each RegionServer, create one coalesced split with ALL regions from
that server
+ for (Map.Entry<String, List<PhoenixInputSplit>> entry :
splitsByServer.entrySet()) {
+ String serverName = entry.getKey();
+ List<PhoenixInputSplit> serverSplits = entry.getValue();
+
+ // Sort splits by start key for sequential processing
+ serverSplits.sort((s1, s2) ->
Bytes.compareTo(s1.getKeyRange().getLowerRange(),
+ s2.getKeyRange().getLowerRange()));
+ // Create single coalesced split with ALL regions from this server
+ coalescedSplits.add(createCoalescedSplit(serverSplits, serverName));
+ }
+
+ return coalescedSplits;
+ }
+
+ /**
+ * Groups splits by RegionServer location for locality-aware coalescing.
Uses HBase RegionLocator
+ * API to determine which server hosts each region.
+ * @param splits List of splits to group
+ * @param regionLocator HBase RegionLocator for querying region locations
+ * @return Map of server name to list of splits hosted on that server
+ */
+ private Map<String, List<PhoenixInputSplit>>
groupSplitsByServer(List<InputSplit> splits,
+ RegionLocator regionLocator) throws IOException {
+ Map<String, List<PhoenixInputSplit>> splitsByServer = new HashMap<>();
+ for (InputSplit split : splits) {
+ PhoenixInputSplit pSplit = (PhoenixInputSplit) split;
+ KeyRange keyRange = pSplit.getKeyRange();
+ HRegionLocation regionLocation =
+ regionLocator.getRegionLocation(keyRange.getLowerRange(), false);
+ if (regionLocation == null) {
+ throw new IOException("Could not determine region location for key: "
+ + Bytes.toStringBinary(keyRange.getLowerRange()));
+ }
Review Comment:
It is only retrying on exception case (not on null value), besides, the
retry doesn't happen with reload=true either.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]