haridsv commented on code in PR #2411:
URL: https://github.com/apache/phoenix/pull/2411#discussion_r3099675545


##########
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormat.java:
##########
@@ -211,4 +242,93 @@ List<InputSplit> filterCompletedSplits(List<InputSplit> 
allSplits,
     }
     return unprocessedSplits;
   }
+
+  /**
+   * Coalesces multiple region splits from the same RegionServer into single 
InputSplits. All
+   * regions from the same server are coalesced into one split, regardless of 
count or size. This
+   * reduces mapper count and avoids hot spotting when many concurrent mappers 
hit the same server.
+   * @param unprocessedSplits Splits remaining after filtering completed 
regions
+   * @param regionLocator     HBase RegionLocator for querying region locations
+   * @return Coalesced splits with all regions per server combined into one 
split
+   */
+  List<InputSplit> coalesceSplits(List<InputSplit> unprocessedSplits, 
RegionLocator regionLocator)
+    throws IOException, InterruptedException {
+    // Group splits by RegionServer location
+    Map<String, List<PhoenixInputSplit>> splitsByServer =
+      groupSplitsByServer(unprocessedSplits, regionLocator);
+
+    List<InputSplit> coalescedSplits = new ArrayList<>();
+
+    // For each RegionServer, create one coalesced split with ALL regions from 
that server
+    for (Map.Entry<String, List<PhoenixInputSplit>> entry : 
splitsByServer.entrySet()) {
+      String serverName = entry.getKey();
+      List<PhoenixInputSplit> serverSplits = entry.getValue();
+
+      // Sort splits by start key for sequential processing
+      serverSplits.sort((s1, s2) -> 
Bytes.compareTo(s1.getKeyRange().getLowerRange(),
+        s2.getKeyRange().getLowerRange()));
+      // Create single coalesced split with ALL regions from this server
+      coalescedSplits.add(createCoalescedSplit(serverSplits, serverName));
+    }
+
+    return coalescedSplits;
+  }
+
+  /**
+   * Groups splits by RegionServer location for locality-aware coalescing. 
Uses HBase RegionLocator
+   * API to determine which server hosts each region.
+   * @param splits        List of splits to group
+   * @param regionLocator HBase RegionLocator for querying region locations
+   * @return Map of server name to list of splits hosted on that server
+   */
+  private Map<String, List<PhoenixInputSplit>> 
groupSplitsByServer(List<InputSplit> splits,
+    RegionLocator regionLocator) throws IOException {
+    Map<String, List<PhoenixInputSplit>> splitsByServer = new HashMap<>();
+    for (InputSplit split : splits) {
+      PhoenixInputSplit pSplit = (PhoenixInputSplit) split;
+      KeyRange keyRange = pSplit.getKeyRange();
+      HRegionLocation regionLocation =
+        regionLocator.getRegionLocation(keyRange.getLowerRange(), false);
+      if (regionLocation == null) {
+        throw new IOException("Could not determine region location for key: "
+          + Bytes.toStringBinary(keyRange.getLowerRange()));
+      }

Review Comment:
   I am thinking that this should never see a `null` if all regions are fully 
online and client cache is up to date. However (per what AI says), if the 
region the key maps to is currently offline due to a RIT event (say, in the 
middle of a split), then the return value will be null, so shouldn't we retry 
to ride over such RITs, just like how hbase-client does during data path such 
as scans? Otherwise, there is a high chance of hitting a RIT and treating it as 
an error, correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to