This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new efabd80b3 improvement(client): Eliminate non-updated partitionIds
result creation to reduce log size when reassign is enabled (#2147)
efabd80b3 is described below
commit efabd80b3293b0c63b5b5a122c55cbe41a260983
Author: maobaolong <[email protected]>
AuthorDate: Thu Sep 26 12:08:18 2024 +0800
improvement(client): Eliminate non-updated partitionIds result creation to
reduce log size when reassign is enabled (#2147)
### What changes were proposed in this pull request?
Reduce the empty size server in the reassignResult.
### Why are the changes needed?
Reduce the log size.
Without this PR, the size of driver log is huge. There are large size log
content like this.
```
{
192.168.1.100-19977-17000={818=[], 819=[], 803=[], 968=[], 857=[], 906=[],
826=[], 939=[], 956=[], 845=[]},
192.168.1.101-19977-17000={193=[], 65=[], 3=[], 68=[], 134=[], 135=[],
139=[], 76=[], 12=[], 78=[], 17=[], 82=[], 19=[], 151=[], 91=[], 29=[], 94=[],
160=[], 33=[], 167=[], 168=[], 173=[], 177=[], 178=[], 115=[], 183=[], 58=[]},
192.168.1.102-19977-17000={288=[], 294=[], 262=[], 361=[], 267=[], 332=[],
333=[], 238=[], 270=[], 207=[], 336=[], 241=[], 372=[], 278=[], 280=[], 282=[],
380=[], 221=[], 349=[], 318=[]},
192.168.1.103-19977-17000={1187=[], 1096=[], 1097=[], 1130=[], 1003=[],
1197=[], 1006=[], 1199=[], 1137=[], 1075=[], 1076=[], 1044=[], 1142=[],
1019=[], 1052=[], 1149=[]},
192.168.1.104-19977-17000={450=[], 484=[], 420=[], 581=[], 518=[], 459=[],
493=[], 463=[], 433=[], 498=[], 434=[], 530=[], 467=[], 435=[], 596=[], 470=[],
471=[], 439=[], 472=[], 408=[], 475=[], 446=[], 542=[], 511=[]},
192.168.1.105-19977-17000={1280=[], 1385=[], 1386=[], 1322=[], 1293=[],
1360=[], 1329=[], 1396=[], 1205=[], 1302=[], 1208=[], 1241=[], 1341=[],
1311=[]},
192.168.1.106-19977-17000={1505=[], 1410=[], 1444=[], 1481=[], 1577=[],
1418=[], 1580=[], 1585=[], 1426=[], 1554=[], 1427=[], 1493=[], 1432=[],
1529=[], 1434=[], 1402=[], 1535=[], 1471=[]},
192.168.1.107-19977-17000={1602=[], 1700=[], 1669=[], 1766=[], 1606=[],
1798=[], 1639=[], 1609=[], 1610=[], 1709=[], 1677=[], 1679=[], 1747=[],
1651=[], 1716=[], 1783=[], 1720=[], 1689=[], 1722=[], 1791=[], 1663=[]},
192.168.1.110-19977-17000={705=[], 673=[], 641=[], 611=[], 773=[], 743=[],
616=[], 681=[], 649=[], 783=[], 754=[], 755=[], 787=[], 757=[], 600=[], 699=[],
635=[]},
192.168.1.108-19977-17000={1888=[], 1957=[], 1830=[], 1800=[], 1993=[],
1834=[], 1963=[], 1869=[], 1998=[], 1967=[], 1840=[], 1809=[], 1812=[],
1909=[], 1881=[], 1882=[], 1850=[], 1818=[], 1851=[], 1951=[]}}
```
After this PR, the empty `updatedReassignServers`, should never added to
the `reassignResult` , so the log can be tiny.
### Does this PR introduce _any_ user-facing change?
(Please list the user-facing changes introduced by your change, including
1. Change in user-facing APIs.
2. Addition or removal of property keys.)
No.
### How was this patch tested?
(Please test your changes, and provide instructions on how to test it:
1. If you add a feature or fix a bug, add a test to cover your changes.
2. If you fix a flaky test, repeat it for many times to prove it works.)
---
.../shuffle/manager/RssShuffleManagerBase.java | 26 +++++++++++++---------
1 file changed, 15 insertions(+), 11 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index 767bb03ea..5c5f97864 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -788,17 +788,21 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
Set<ShuffleServerInfo> updatedReassignServers =
internalHandle.updateAssignment(partitionId, serverId,
replacements);
- reassignResult
- .computeIfAbsent(serverId, x -> new HashMap<>())
- .computeIfAbsent(partitionId, x -> new HashSet<>())
- .addAll(
- updatedReassignServers.stream().map(x ->
x.getId()).collect(Collectors.toSet()));
-
- if (serverHasReplaced) {
- for (ShuffleServerInfo serverInfo : updatedReassignServers) {
- newServerToPartitions
- .computeIfAbsent(serverInfo, x -> new ArrayList<>())
- .add(new PartitionRange(partitionId, partitionId));
+ if (!updatedReassignServers.isEmpty()) {
+ reassignResult
+ .computeIfAbsent(serverId, x -> new HashMap<>())
+ .computeIfAbsent(partitionId, x -> new HashSet<>())
+ .addAll(
+ updatedReassignServers.stream()
+ .map(x -> x.getId())
+ .collect(Collectors.toSet()));
+
+ if (serverHasReplaced) {
+ for (ShuffleServerInfo serverInfo : updatedReassignServers) {
+ newServerToPartitions
+ .computeIfAbsent(serverInfo, x -> new ArrayList<>())
+ .add(new PartitionRange(partitionId, partitionId));
+ }
}
}
}