Copilot commented on code in PR #3646:
URL: https://github.com/apache/celeborn/pull/3646#discussion_r3028692029


##########
client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornSortBasedPusher.java:
##########
@@ -126,46 +155,64 @@ public void insert(K key, V value, int partition) {
     }
   }
 
+  /**
+   * Get total record count across all partitions.
+   */
+  private int getTotalRecordCount() {
+    int total = 0;
+    for (KVBufferInfo bufferInfo : partitionedKVBuffers.values()) {
+      total += bufferInfo.count;
+    }
+    return total;
+  }

Review Comment:
   Calling `getTotalRecordCount()` on every `insert()` makes each record 
insertion O(#partitions) due to iterating `partitionedKVBuffers.values()`. This 
can add significant overhead at tens of millions of records. Consider 
maintaining a `totalRecordCount` field that increments in 
`insertRecordInternal(...)` (after `bufferInfo.add(...)`) and resets when 
spilling/clearing (e.g., in `sendKVAndUpdateWritePos()` and `close()`).



##########
client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornSortBasedPusher.java:
##########
@@ -245,13 +294,119 @@ private int writeVLong(byte[] data, int offset, long 
dataInt) {
   }
 
   private void sortKVs() {
-    for (Map.Entry<Integer, List<SerializedKV>> partitionKVEntry : 
partitionedKVs.entrySet()) {
-      partitionKVEntry
-          .getValue()
-          .sort(
-              (o1, o2) ->
-                  comparator.compare(
-                      serializedKV, o1.offset, o1.kLen, serializedKV, 
o2.offset, o2.kLen));
+    // Maximum number of temporary Record objects to create at once.
+    // This limits Young Gen pressure and prevents Full GC.
+    // Record size ~24 bytes, so 5M records = 120MB at peak
+    final int MAX_SORT_RECORDS = 5_000_000;
+
+    for (Map.Entry<Integer, KVBufferInfo> partitionKVEntry : 
partitionedKVBuffers.entrySet()) {
+      KVBufferInfo bufferInfo = partitionKVEntry.getValue();
+      if (bufferInfo.count <= 1) {
+        continue;
+      }
+
+      // If too many records, split into batches
+      if (bufferInfo.count > MAX_SORT_RECORDS) {
+        // Sort and flush each batch immediately
+        int remaining = bufferInfo.count;
+        int start = 0;
+
+        while (remaining > 0) {
+          int batchSize = Math.min(remaining, MAX_SORT_RECORDS);
+          int end = start + batchSize;
+
+          // Sort this batch
+          sortBatch(bufferInfo, start, end);
+
+          // Send this batch immediately to free memory
+          sendPartialBuffer(bufferInfo, start, batchSize);
+
+          start = end;
+          remaining -= batchSize;
+        }
+
+        // After all batches sent, clear the buffer
+        bufferInfo.clear();
+      } else {
+        // Full sort (single batch)
+        sortBatch(bufferInfo, 0, bufferInfo.count);
+      }
+    }
+  }
+
+  /**
+   * Sort a batch of records from start (inclusive) to end (exclusive).
+   */
+  private void sortBatch(KVBufferInfo bufferInfo, int start, int end) {
+    int size = end - start;
+
+    // Create temporary Record objects
+    Record[] records = new Record[size];
+    for (int i = 0; i < size; i++) {
+      records[i] = new Record(
+          serializedKV,
+          comparator,
+          bufferInfo.offsets[start + i],
+          bufferInfo.keyLens[start + i],
+          bufferInfo.valueLens[start + i]);
+    }
+
+    // Sort using Arrays.sort
+    Arrays.sort(records);
+
+    // Write back sorted results
+    for (int i = 0; i < size; i++) {
+      bufferInfo.offsets[start + i] = records[i].offset;
+      bufferInfo.keyLens[start + i] = records[i].kLen;
+      bufferInfo.valueLens[start + i] = records[i].vLen;
+    }
+  }
+
+  /**
+   * Send a portion of the buffer (after it's been sorted).
+   * This method requires careful re-design of sendKVAndUpdateWritePos
+   * to work with partial sends.
+   *
+   * For simplicity, we revert to the original approach of sorting the entire 
buffer
+   * when it's safe (within MAX_SORT_RECORDS). But if we exceed the limit,
+   * we need to handle partial sends differently.
+   *
+   * For the first version, let's just throw if we exceed MAX_SORT_RECORDS,
+   * and let the user adjust heap size or spill.percent instead.
+   */
+  private void sendPartialBuffer(KVBufferInfo bufferInfo, int start, int 
count) {
+    // TODO: This needs careful redesign of sendKVAndUpdateWritePos
+    // For now, throw to force user to adjust configuration
+    throw new UnsupportedOperationException(
+        "Buffer too large for single batch sorting. " +
+        "Please reduce mapreduce.task.io.sort.mb or increase 
mapreduce.map.java.opts heap.");
+  }

Review Comment:
   The PR description states the fix is fully backward compatible and requires 
no configuration changes, but this code path explicitly instructs users to 
change `mapreduce.task.io.sort.mb` or heap settings and fails the task. Either 
remove/avoid this user-facing failure mode (preferred, consistent with the PR 
description) or update the PR description to reflect the new behavior and 
conditions under which it can occur.



##########
client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornSortBasedPusher.java:
##########
@@ -245,13 +294,119 @@ private int writeVLong(byte[] data, int offset, long 
dataInt) {
   }
 
   private void sortKVs() {
-    for (Map.Entry<Integer, List<SerializedKV>> partitionKVEntry : 
partitionedKVs.entrySet()) {
-      partitionKVEntry
-          .getValue()
-          .sort(
-              (o1, o2) ->
-                  comparator.compare(
-                      serializedKV, o1.offset, o1.kLen, serializedKV, 
o2.offset, o2.kLen));
+    // Maximum number of temporary Record objects to create at once.
+    // This limits Young Gen pressure and prevents Full GC.
+    // Record size ~24 bytes, so 5M records = 120MB at peak
+    final int MAX_SORT_RECORDS = 5_000_000;
+
+    for (Map.Entry<Integer, KVBufferInfo> partitionKVEntry : 
partitionedKVBuffers.entrySet()) {
+      KVBufferInfo bufferInfo = partitionKVEntry.getValue();
+      if (bufferInfo.count <= 1) {
+        continue;
+      }
+
+      // If too many records, split into batches
+      if (bufferInfo.count > MAX_SORT_RECORDS) {
+        // Sort and flush each batch immediately
+        int remaining = bufferInfo.count;
+        int start = 0;
+
+        while (remaining > 0) {
+          int batchSize = Math.min(remaining, MAX_SORT_RECORDS);
+          int end = start + batchSize;
+
+          // Sort this batch
+          sortBatch(bufferInfo, start, end);
+
+          // Send this batch immediately to free memory
+          sendPartialBuffer(bufferInfo, start, batchSize);
+
+          start = end;
+          remaining -= batchSize;
+        }
+
+        // After all batches sent, clear the buffer
+        bufferInfo.clear();
+      } else {
+        // Full sort (single batch)
+        sortBatch(bufferInfo, 0, bufferInfo.count);
+      }

Review Comment:
   `sortKVs()` calls `sendPartialBuffer(...)` when `bufferInfo.count > 
MAX_SORT_RECORDS`, but `sendPartialBuffer` currently always throws 
`UnsupportedOperationException`. That makes large buffers fail at runtime 
rather than spilling safely. If the intent is to rely on early-spill to 
guarantee `count <= MAX_SORT_RECORDS`, consider removing this branch and 
enforcing the invariant (e.g., via a clear exception that should never happen). 
Otherwise, implement partial sending by integrating batch boundaries into 
`sendKVAndUpdateWritePos()` (so the batching logic that exists there is reused 
rather than throwing).
   ```suggestion
         // Enforce invariant that early spilling keeps record count within 
bounds.
         if (bufferInfo.count > MAX_SORT_RECORDS) {
           throw new IllegalStateException(
               "KVBufferInfo.count (" + bufferInfo.count
                   + ") exceeds MAX_SORT_RECORDS (" + MAX_SORT_RECORDS
                   + "). Early spill should prevent this; partial buffer 
sending "
                   + "is not supported in sortKVs().");
         }
   
         // Full sort (single batch)
         sortBatch(bufferInfo, 0, bufferInfo.count);
   ```



##########
client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornSortBasedPusher.java:
##########
@@ -114,6 +127,22 @@ public void insert(K key, V value, int partition) {
         sortKVs();
         sendKVAndUpdateWritePos();
       }
+
+      // Additional check: limit total record count to avoid memory pressure 
during sort
+      // If total records exceed safe threshold, force an early spill
+      int totalRecords = getTotalRecordCount();
+      final int MAX_RECORDS_BEFORE_SPILL = 5_000_000;  // 5M records = ~120MB 
temporary objects

Review Comment:
   Calling `getTotalRecordCount()` on every `insert()` makes each record 
insertion O(#partitions) due to iterating `partitionedKVBuffers.values()`. This 
can add significant overhead at tens of millions of records. Consider 
maintaining a `totalRecordCount` field that increments in 
`insertRecordInternal(...)` (after `bufferInfo.add(...)`) and resets when 
spilling/clearing (e.g., in `sendKVAndUpdateWritePos()` and `close()`).



##########
client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornSortBasedPusher.java:
##########
@@ -126,46 +155,64 @@ public void insert(K key, V value, int partition) {
     }
   }
 
+  /**
+   * Get total record count across all partitions.
+   */
+  private int getTotalRecordCount() {
+    int total = 0;
+    for (KVBufferInfo bufferInfo : partitionedKVBuffers.values()) {
+      total += bufferInfo.count;
+    }
+    return total;
+  }
+
   private void sendKVAndUpdateWritePos() throws IOException {
-    Iterator<Map.Entry<Integer, List<SerializedKV>>> entryIter =
-        partitionedKVs.entrySet().iterator();
+    Iterator<Map.Entry<Integer, KVBufferInfo>> entryIter =
+        partitionedKVBuffers.entrySet().iterator();
     while (entryIter.hasNext()) {
-      Map.Entry<Integer, List<SerializedKV>> entry = entryIter.next();
+      Map.Entry<Integer, KVBufferInfo> entry = entryIter.next();
       entryIter.remove();
       int partition = entry.getKey();
-      List<SerializedKV> kvs = entry.getValue();
-      List<SerializedKV> localKVs = new ArrayList<>();
+      KVBufferInfo bufferInfo = entry.getValue();
       int partitionKVTotalLen = 0;
-      // process buffers for specific partition
-      for (SerializedKV kv : kvs) {
-        partitionKVTotalLen += kv.kLen + kv.vLen;
-        localKVs.add(kv);
+      int batchStartIdx = 0;
+      // process buffers for specific partition (arrays are already sorted 
in-place)
+      for (int i = 0; i < bufferInfo.count; i++) {
+        partitionKVTotalLen += bufferInfo.keyLens[i] + bufferInfo.valueLens[i];
         if (partitionKVTotalLen > maxPushDataSize) {
           // limit max size of pushdata to avoid possible memory issue in 
Celeborn worker
           // data layout
           // pushdata header (16) + pushDataLen(4) +
           // [varKeyLen+varValLen+serializedRecord(x)][...]
-          sendSortedBuffersPartition(partition, localKVs, partitionKVTotalLen);
-          localKVs.clear();
+          sendSortedBuffersPartition(partition, bufferInfo, batchStartIdx, i - 
batchStartIdx + 1, partitionKVTotalLen);
+          // move batch start
           partitionKVTotalLen = 0;
+          batchStartIdx = i + 1;
         }
       }

Review Comment:
   This batching logic sends a batch *after* `partitionKVTotalLen` has already 
exceeded `maxPushDataSize`, which can produce push payloads larger than the 
configured limit (contradicting the comment about limiting max size). To 
actually enforce the limit, structure the loop so that if adding the next 
record would exceed the threshold, you send the previous batch first, then 
start a new batch with the current record. Also handle the edge case where a 
single record is larger than `maxPushDataSize` (so you avoid an empty-batch 
send or an infinite loop).



##########
client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornSortBasedPusher.java:
##########
@@ -245,13 +294,119 @@ private int writeVLong(byte[] data, int offset, long 
dataInt) {
   }
 
   private void sortKVs() {
-    for (Map.Entry<Integer, List<SerializedKV>> partitionKVEntry : 
partitionedKVs.entrySet()) {
-      partitionKVEntry
-          .getValue()
-          .sort(
-              (o1, o2) ->
-                  comparator.compare(
-                      serializedKV, o1.offset, o1.kLen, serializedKV, 
o2.offset, o2.kLen));
+    // Maximum number of temporary Record objects to create at once.
+    // This limits Young Gen pressure and prevents Full GC.
+    // Record size ~24 bytes, so 5M records = 120MB at peak
+    final int MAX_SORT_RECORDS = 5_000_000;

Review Comment:
   The 5M threshold is duplicated (`MAX_RECORDS_BEFORE_SPILL` and 
`MAX_SORT_RECORDS`) and could drift over time. Consider defining a single 
class-level constant (or deriving it from configuration) and using it 
consistently for both early-spill and sorting limits. This also makes it easier 
to document/adjust the policy in one place.



##########
client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornSortBasedPusher.java:
##########
@@ -114,6 +127,22 @@ public void insert(K key, V value, int partition) {
         sortKVs();
         sendKVAndUpdateWritePos();
       }
+
+      // Additional check: limit total record count to avoid memory pressure 
during sort
+      // If total records exceed safe threshold, force an early spill
+      int totalRecords = getTotalRecordCount();
+      final int MAX_RECORDS_BEFORE_SPILL = 5_000_000;  // 5M records = ~120MB 
temporary objects
+
+      if (totalRecords >= MAX_RECORDS_BEFORE_SPILL && writePos > 0) {
+        if (logger.isDebugEnabled()) {
+          logger.debug(
+              "Record count {} exceeds safe threshold {}, forcing early spill",
+              totalRecords, MAX_RECORDS_BEFORE_SPILL);

Review Comment:
   The 5M threshold is duplicated (`MAX_RECORDS_BEFORE_SPILL` and 
`MAX_SORT_RECORDS`) and could drift over time. Consider defining a single 
class-level constant (or deriving it from configuration) and using it 
consistently for both early-spill and sorting limits. This also makes it easier 
to document/adjust the policy in one place.
   ```suggestion
   
         if (totalRecords >= MAX_SORT_RECORDS && writePos > 0) {
           if (logger.isDebugEnabled()) {
             logger.debug(
                 "Record count {} exceeds safe threshold {}, forcing early 
spill",
                 totalRecords, MAX_SORT_RECORDS);
   ```



##########
client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornSortBasedPusher.java:
##########
@@ -126,46 +155,64 @@ public void insert(K key, V value, int partition) {
     }
   }
 
+  /**
+   * Get total record count across all partitions.
+   */
+  private int getTotalRecordCount() {
+    int total = 0;
+    for (KVBufferInfo bufferInfo : partitionedKVBuffers.values()) {
+      total += bufferInfo.count;
+    }
+    return total;
+  }
+
   private void sendKVAndUpdateWritePos() throws IOException {
-    Iterator<Map.Entry<Integer, List<SerializedKV>>> entryIter =
-        partitionedKVs.entrySet().iterator();
+    Iterator<Map.Entry<Integer, KVBufferInfo>> entryIter =
+        partitionedKVBuffers.entrySet().iterator();
     while (entryIter.hasNext()) {
-      Map.Entry<Integer, List<SerializedKV>> entry = entryIter.next();
+      Map.Entry<Integer, KVBufferInfo> entry = entryIter.next();
       entryIter.remove();
       int partition = entry.getKey();
-      List<SerializedKV> kvs = entry.getValue();
-      List<SerializedKV> localKVs = new ArrayList<>();
+      KVBufferInfo bufferInfo = entry.getValue();
       int partitionKVTotalLen = 0;
-      // process buffers for specific partition
-      for (SerializedKV kv : kvs) {
-        partitionKVTotalLen += kv.kLen + kv.vLen;
-        localKVs.add(kv);
+      int batchStartIdx = 0;
+      // process buffers for specific partition (arrays are already sorted 
in-place)
+      for (int i = 0; i < bufferInfo.count; i++) {
+        partitionKVTotalLen += bufferInfo.keyLens[i] + bufferInfo.valueLens[i];
         if (partitionKVTotalLen > maxPushDataSize) {

Review Comment:
   This batching logic sends a batch *after* `partitionKVTotalLen` has already 
exceeded `maxPushDataSize`, which can produce push payloads larger than the 
configured limit (contradicting the comment about limiting max size). To 
actually enforce the limit, structure the loop so that if adding the next 
record would exceed the threshold, you send the previous batch first, then 
start a new batch with the current record. Also handle the edge case where a 
single record is larger than `maxPushDataSize` (so you avoid an empty-batch 
send or an infinite loop).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to