Copilot commented on code in PR #3646:
URL: https://github.com/apache/celeborn/pull/3646#discussion_r3028692029
##########
client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornSortBasedPusher.java:
##########
@@ -126,46 +155,64 @@ public void insert(K key, V value, int partition) {
}
}
+ /**
+ * Get total record count across all partitions.
+ */
+ private int getTotalRecordCount() {
+ int total = 0;
+ for (KVBufferInfo bufferInfo : partitionedKVBuffers.values()) {
+ total += bufferInfo.count;
+ }
+ return total;
+ }
Review Comment:
Calling `getTotalRecordCount()` on every `insert()` makes each record
insertion O(#partitions) due to iterating `partitionedKVBuffers.values()`. This
can add significant overhead at tens of millions of records. Consider
maintaining a `totalRecordCount` field that increments in
`insertRecordInternal(...)` (after `bufferInfo.add(...)`) and resets when
spilling/clearing (e.g., in `sendKVAndUpdateWritePos()` and `close()`).
##########
client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornSortBasedPusher.java:
##########
@@ -245,13 +294,119 @@ private int writeVLong(byte[] data, int offset, long
dataInt) {
}
private void sortKVs() {
- for (Map.Entry<Integer, List<SerializedKV>> partitionKVEntry :
partitionedKVs.entrySet()) {
- partitionKVEntry
- .getValue()
- .sort(
- (o1, o2) ->
- comparator.compare(
- serializedKV, o1.offset, o1.kLen, serializedKV,
o2.offset, o2.kLen));
+ // Maximum number of temporary Record objects to create at once.
+ // This limits Young Gen pressure and prevents Full GC.
+ // Record size ~24 bytes, so 5M records = 120MB at peak
+ final int MAX_SORT_RECORDS = 5_000_000;
+
+ for (Map.Entry<Integer, KVBufferInfo> partitionKVEntry :
partitionedKVBuffers.entrySet()) {
+ KVBufferInfo bufferInfo = partitionKVEntry.getValue();
+ if (bufferInfo.count <= 1) {
+ continue;
+ }
+
+ // If too many records, split into batches
+ if (bufferInfo.count > MAX_SORT_RECORDS) {
+ // Sort and flush each batch immediately
+ int remaining = bufferInfo.count;
+ int start = 0;
+
+ while (remaining > 0) {
+ int batchSize = Math.min(remaining, MAX_SORT_RECORDS);
+ int end = start + batchSize;
+
+ // Sort this batch
+ sortBatch(bufferInfo, start, end);
+
+ // Send this batch immediately to free memory
+ sendPartialBuffer(bufferInfo, start, batchSize);
+
+ start = end;
+ remaining -= batchSize;
+ }
+
+ // After all batches sent, clear the buffer
+ bufferInfo.clear();
+ } else {
+ // Full sort (single batch)
+ sortBatch(bufferInfo, 0, bufferInfo.count);
+ }
+ }
+ }
+
+ /**
+ * Sort a batch of records from start (inclusive) to end (exclusive).
+ */
+ private void sortBatch(KVBufferInfo bufferInfo, int start, int end) {
+ int size = end - start;
+
+ // Create temporary Record objects
+ Record[] records = new Record[size];
+ for (int i = 0; i < size; i++) {
+ records[i] = new Record(
+ serializedKV,
+ comparator,
+ bufferInfo.offsets[start + i],
+ bufferInfo.keyLens[start + i],
+ bufferInfo.valueLens[start + i]);
+ }
+
+ // Sort using Arrays.sort
+ Arrays.sort(records);
+
+ // Write back sorted results
+ for (int i = 0; i < size; i++) {
+ bufferInfo.offsets[start + i] = records[i].offset;
+ bufferInfo.keyLens[start + i] = records[i].kLen;
+ bufferInfo.valueLens[start + i] = records[i].vLen;
+ }
+ }
+
+ /**
+ * Send a portion of the buffer (after it's been sorted).
+ * This method requires careful re-design of sendKVAndUpdateWritePos
+ * to work with partial sends.
+ *
+ * For simplicity, we revert to the original approach of sorting the entire
buffer
+ * when it's safe (within MAX_SORT_RECORDS). But if we exceed the limit,
+ * we need to handle partial sends differently.
+ *
+ * For the first version, let's just throw if we exceed MAX_SORT_RECORDS,
+ * and let the user adjust heap size or spill.percent instead.
+ */
+ private void sendPartialBuffer(KVBufferInfo bufferInfo, int start, int
count) {
+ // TODO: This needs careful redesign of sendKVAndUpdateWritePos
+ // For now, throw to force user to adjust configuration
+ throw new UnsupportedOperationException(
+ "Buffer too large for single batch sorting. " +
+ "Please reduce mapreduce.task.io.sort.mb or increase
mapreduce.map.java.opts heap.");
+ }
Review Comment:
The PR description states the fix is fully backward compatible and requires
no configuration changes, but this code path explicitly instructs users to
change `mapreduce.task.io.sort.mb` or heap settings and fails the task. Either
remove/avoid this user-facing failure mode (preferred, consistent with the PR
description) or update the PR description to reflect the new behavior and
conditions under which it can occur.
##########
client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornSortBasedPusher.java:
##########
@@ -245,13 +294,119 @@ private int writeVLong(byte[] data, int offset, long
dataInt) {
}
private void sortKVs() {
- for (Map.Entry<Integer, List<SerializedKV>> partitionKVEntry :
partitionedKVs.entrySet()) {
- partitionKVEntry
- .getValue()
- .sort(
- (o1, o2) ->
- comparator.compare(
- serializedKV, o1.offset, o1.kLen, serializedKV,
o2.offset, o2.kLen));
+ // Maximum number of temporary Record objects to create at once.
+ // This limits Young Gen pressure and prevents Full GC.
+ // Record size ~24 bytes, so 5M records = 120MB at peak
+ final int MAX_SORT_RECORDS = 5_000_000;
+
+ for (Map.Entry<Integer, KVBufferInfo> partitionKVEntry :
partitionedKVBuffers.entrySet()) {
+ KVBufferInfo bufferInfo = partitionKVEntry.getValue();
+ if (bufferInfo.count <= 1) {
+ continue;
+ }
+
+ // If too many records, split into batches
+ if (bufferInfo.count > MAX_SORT_RECORDS) {
+ // Sort and flush each batch immediately
+ int remaining = bufferInfo.count;
+ int start = 0;
+
+ while (remaining > 0) {
+ int batchSize = Math.min(remaining, MAX_SORT_RECORDS);
+ int end = start + batchSize;
+
+ // Sort this batch
+ sortBatch(bufferInfo, start, end);
+
+ // Send this batch immediately to free memory
+ sendPartialBuffer(bufferInfo, start, batchSize);
+
+ start = end;
+ remaining -= batchSize;
+ }
+
+ // After all batches sent, clear the buffer
+ bufferInfo.clear();
+ } else {
+ // Full sort (single batch)
+ sortBatch(bufferInfo, 0, bufferInfo.count);
+ }
Review Comment:
`sortKVs()` calls `sendPartialBuffer(...)` when `bufferInfo.count >
MAX_SORT_RECORDS`, but `sendPartialBuffer` currently always throws
`UnsupportedOperationException`. That makes large buffers fail at runtime
rather than spilling safely. If the intent is to rely on early-spill to
guarantee `count <= MAX_SORT_RECORDS`, consider removing this branch and
enforcing the invariant (e.g., via a clear exception that should never happen).
Otherwise, implement partial sending by integrating batch boundaries into
`sendKVAndUpdateWritePos()` (so the batching logic that exists there is reused
rather than throwing).
```suggestion
// Enforce invariant that early spilling keeps record count within
bounds.
if (bufferInfo.count > MAX_SORT_RECORDS) {
throw new IllegalStateException(
"KVBufferInfo.count (" + bufferInfo.count
+ ") exceeds MAX_SORT_RECORDS (" + MAX_SORT_RECORDS
+ "). Early spill should prevent this; partial buffer
sending "
+ "is not supported in sortKVs().");
}
// Full sort (single batch)
sortBatch(bufferInfo, 0, bufferInfo.count);
```
##########
client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornSortBasedPusher.java:
##########
@@ -114,6 +127,22 @@ public void insert(K key, V value, int partition) {
sortKVs();
sendKVAndUpdateWritePos();
}
+
+ // Additional check: limit total record count to avoid memory pressure
during sort
+ // If total records exceed safe threshold, force an early spill
+ int totalRecords = getTotalRecordCount();
+ final int MAX_RECORDS_BEFORE_SPILL = 5_000_000; // 5M records = ~120MB
temporary objects
Review Comment:
Calling `getTotalRecordCount()` on every `insert()` makes each record
insertion O(#partitions) due to iterating `partitionedKVBuffers.values()`. This
can add significant overhead at tens of millions of records. Consider
maintaining a `totalRecordCount` field that increments in
`insertRecordInternal(...)` (after `bufferInfo.add(...)`) and resets when
spilling/clearing (e.g., in `sendKVAndUpdateWritePos()` and `close()`).
##########
client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornSortBasedPusher.java:
##########
@@ -126,46 +155,64 @@ public void insert(K key, V value, int partition) {
}
}
+ /**
+ * Get total record count across all partitions.
+ */
+ private int getTotalRecordCount() {
+ int total = 0;
+ for (KVBufferInfo bufferInfo : partitionedKVBuffers.values()) {
+ total += bufferInfo.count;
+ }
+ return total;
+ }
+
private void sendKVAndUpdateWritePos() throws IOException {
- Iterator<Map.Entry<Integer, List<SerializedKV>>> entryIter =
- partitionedKVs.entrySet().iterator();
+ Iterator<Map.Entry<Integer, KVBufferInfo>> entryIter =
+ partitionedKVBuffers.entrySet().iterator();
while (entryIter.hasNext()) {
- Map.Entry<Integer, List<SerializedKV>> entry = entryIter.next();
+ Map.Entry<Integer, KVBufferInfo> entry = entryIter.next();
entryIter.remove();
int partition = entry.getKey();
- List<SerializedKV> kvs = entry.getValue();
- List<SerializedKV> localKVs = new ArrayList<>();
+ KVBufferInfo bufferInfo = entry.getValue();
int partitionKVTotalLen = 0;
- // process buffers for specific partition
- for (SerializedKV kv : kvs) {
- partitionKVTotalLen += kv.kLen + kv.vLen;
- localKVs.add(kv);
+ int batchStartIdx = 0;
+ // process buffers for specific partition (arrays are already sorted
in-place)
+ for (int i = 0; i < bufferInfo.count; i++) {
+ partitionKVTotalLen += bufferInfo.keyLens[i] + bufferInfo.valueLens[i];
if (partitionKVTotalLen > maxPushDataSize) {
// limit max size of pushdata to avoid possible memory issue in
Celeborn worker
// data layout
// pushdata header (16) + pushDataLen(4) +
// [varKeyLen+varValLen+serializedRecord(x)][...]
- sendSortedBuffersPartition(partition, localKVs, partitionKVTotalLen);
- localKVs.clear();
+ sendSortedBuffersPartition(partition, bufferInfo, batchStartIdx, i -
batchStartIdx + 1, partitionKVTotalLen);
+ // move batch start
partitionKVTotalLen = 0;
+ batchStartIdx = i + 1;
}
}
Review Comment:
This batching logic sends a batch *after* `partitionKVTotalLen` has already
exceeded `maxPushDataSize`, which can produce push payloads larger than the
configured limit (contradicting the comment about limiting max size). To
actually enforce the limit, structure the loop so that if adding the next
record would exceed the threshold, you send the previous batch first, then
start a new batch with the current record. Also handle the edge case where a
single record is larger than `maxPushDataSize` (so you avoid an empty-batch
send or an infinite loop).
##########
client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornSortBasedPusher.java:
##########
@@ -245,13 +294,119 @@ private int writeVLong(byte[] data, int offset, long
dataInt) {
}
private void sortKVs() {
- for (Map.Entry<Integer, List<SerializedKV>> partitionKVEntry :
partitionedKVs.entrySet()) {
- partitionKVEntry
- .getValue()
- .sort(
- (o1, o2) ->
- comparator.compare(
- serializedKV, o1.offset, o1.kLen, serializedKV,
o2.offset, o2.kLen));
+ // Maximum number of temporary Record objects to create at once.
+ // This limits Young Gen pressure and prevents Full GC.
+ // Record size ~24 bytes, so 5M records = 120MB at peak
+ final int MAX_SORT_RECORDS = 5_000_000;
Review Comment:
The 5M threshold is duplicated (`MAX_RECORDS_BEFORE_SPILL` and
`MAX_SORT_RECORDS`) and could drift over time. Consider defining a single
class-level constant (or deriving it from configuration) and using it
consistently for both early-spill and sorting limits. This also makes it easier
to document/adjust the policy in one place.
##########
client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornSortBasedPusher.java:
##########
@@ -114,6 +127,22 @@ public void insert(K key, V value, int partition) {
sortKVs();
sendKVAndUpdateWritePos();
}
+
+ // Additional check: limit total record count to avoid memory pressure
during sort
+ // If total records exceed safe threshold, force an early spill
+ int totalRecords = getTotalRecordCount();
+ final int MAX_RECORDS_BEFORE_SPILL = 5_000_000; // 5M records = ~120MB
temporary objects
+
+ if (totalRecords >= MAX_RECORDS_BEFORE_SPILL && writePos > 0) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Record count {} exceeds safe threshold {}, forcing early spill",
+ totalRecords, MAX_RECORDS_BEFORE_SPILL);
Review Comment:
The 5M threshold is duplicated (`MAX_RECORDS_BEFORE_SPILL` and
`MAX_SORT_RECORDS`) and could drift over time. Consider defining a single
class-level constant (or deriving it from configuration) and using it
consistently for both early-spill and sorting limits. This also makes it easier
to document/adjust the policy in one place.
```suggestion
if (totalRecords >= MAX_SORT_RECORDS && writePos > 0) {
if (logger.isDebugEnabled()) {
logger.debug(
"Record count {} exceeds safe threshold {}, forcing early
spill",
totalRecords, MAX_SORT_RECORDS);
```
##########
client-mr/mr/src/main/java/org/apache/hadoop/mapred/CelebornSortBasedPusher.java:
##########
@@ -126,46 +155,64 @@ public void insert(K key, V value, int partition) {
}
}
+ /**
+ * Get total record count across all partitions.
+ */
+ private int getTotalRecordCount() {
+ int total = 0;
+ for (KVBufferInfo bufferInfo : partitionedKVBuffers.values()) {
+ total += bufferInfo.count;
+ }
+ return total;
+ }
+
private void sendKVAndUpdateWritePos() throws IOException {
- Iterator<Map.Entry<Integer, List<SerializedKV>>> entryIter =
- partitionedKVs.entrySet().iterator();
+ Iterator<Map.Entry<Integer, KVBufferInfo>> entryIter =
+ partitionedKVBuffers.entrySet().iterator();
while (entryIter.hasNext()) {
- Map.Entry<Integer, List<SerializedKV>> entry = entryIter.next();
+ Map.Entry<Integer, KVBufferInfo> entry = entryIter.next();
entryIter.remove();
int partition = entry.getKey();
- List<SerializedKV> kvs = entry.getValue();
- List<SerializedKV> localKVs = new ArrayList<>();
+ KVBufferInfo bufferInfo = entry.getValue();
int partitionKVTotalLen = 0;
- // process buffers for specific partition
- for (SerializedKV kv : kvs) {
- partitionKVTotalLen += kv.kLen + kv.vLen;
- localKVs.add(kv);
+ int batchStartIdx = 0;
+ // process buffers for specific partition (arrays are already sorted
in-place)
+ for (int i = 0; i < bufferInfo.count; i++) {
+ partitionKVTotalLen += bufferInfo.keyLens[i] + bufferInfo.valueLens[i];
if (partitionKVTotalLen > maxPushDataSize) {
Review Comment:
This batching logic sends a batch *after* `partitionKVTotalLen` has already
exceeded `maxPushDataSize`, which can produce push payloads larger than the
configured limit (contradicting the comment about limiting max size). To
actually enforce the limit, structure the loop so that if adding the next
record would exceed the threshold, you send the previous batch first, then
start a new batch with the current record. Also handle the edge case where a
single record is larger than `maxPushDataSize` (so you avoid an empty-batch
send or an infinite loop).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]