This is an automated email from the ASF dual-hosted git repository.
xianjingfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 95c9eb4e [#720][FOLLOW-UP] Correct the shuffle server id (#792)
95c9eb4e is described below
commit 95c9eb4ef307faede9a344de6895440ca48ea979
Author: roryqi <[email protected]>
AuthorDate: Wed Apr 5 16:14:51 2023 +0800
[#720][FOLLOW-UP] Correct the shuffle server id (#792)
### What changes were proposed in this pull request?
Assign the correct shuffle server id
### Why are the changes needed?
#720 follow-up pr
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Existing UTs
---
.../apache/uniffle/server/ShuffleFlushManager.java | 12 ++++++-----
.../org/apache/uniffle/server/ShuffleServer.java | 25 +++++++++++++---------
.../apache/uniffle/server/ShuffleServerConf.java | 1 +
.../ShuffleFlushManagerOnKerberizedHdfsTest.java | 2 +-
.../uniffle/server/ShuffleFlushManagerTest.java | 17 +++++++--------
.../uniffle/server/TestShuffleFlushManager.java | 2 +-
.../server/buffer/ShuffleBufferManagerTest.java | 6 +++---
7 files changed, 36 insertions(+), 29 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index b81453cf..8fa9e2c3 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -56,7 +56,6 @@ public class ShuffleFlushManager {
protected final BlockingQueue<ShuffleDataFlushEvent> flushQueue =
Queues.newLinkedBlockingQueue();
private final Executor threadPoolExecutor;
private final List<String> storageBasePaths;
- private final String shuffleServerId;
private final String storageType;
private final int storageDataReplica;
private final ShuffleServerConf shuffleServerConf;
@@ -71,9 +70,8 @@ public class ShuffleFlushManager {
private int processPendingEventIndex = 0;
private final int maxConcurrencyOfSingleOnePartition;
- public ShuffleFlushManager(ShuffleServerConf shuffleServerConf, String
shuffleServerId, ShuffleServer shuffleServer,
+ public ShuffleFlushManager(ShuffleServerConf shuffleServerConf,
ShuffleServer shuffleServer,
StorageManager storageManager) {
- this.shuffleServerId = shuffleServerId;
this.shuffleServer = shuffleServer;
this.shuffleServerConf = shuffleServerConf;
this.storageManager = storageManager;
@@ -219,7 +217,7 @@ public class ShuffleFlushManager {
event.getStartPartition(),
event.getEndPartition(),
storageBasePaths.toArray(new String[storageBasePaths.size()]),
- shuffleServerId,
+ getShuffleServerId(),
hadoopConf,
storageDataReplica,
user,
@@ -259,7 +257,11 @@ public class ShuffleFlushManager {
}
}
}
-
+
+ private String getShuffleServerId() {
+ return shuffleServerConf.getString(ShuffleServerConf.SHUFFLE_SERVER_ID,
"shuffleServerId");
+ }
+
private void updateCommittedBlockIds(String appId, int shuffleId,
List<ShufflePartitionedBlock> blocks) {
if (blocks == null || blocks.size() == 0) {
return;
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index 0a1d2cd2..c7cf9484 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -125,13 +125,22 @@ public class ShuffleServer {
}
public void start() throws Exception {
- registerHeartBeat.startHeartBeat();
jettyServer.start();
server.start();
if (nettyServerEnabled) {
nettyPort = streamServer.start();
}
+ if (nettyServerEnabled) {
+ id = ip + "-" + grpcPort + "-" + nettyPort;
+ } else {
+ id = ip + "-" + grpcPort;
+ }
+ shuffleServerConf.setString(ShuffleServerConf.SHUFFLE_SERVER_ID, id);
+ LOG.info("Start to shuffle server with id {}", id);
+ initMetricsReporter();
+
+ registerHeartBeat.startHeartBeat();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
@@ -195,13 +204,7 @@ public class ShuffleServer {
}
grpcPort = shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT);
nettyPort =
shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_PORT);
- if (nettyPort >= 0) {
- // when nettyPort is zero,actual netty port will be changed,but id can't
be change.
- id = ip + "-" + grpcPort + "-" + nettyPort;
- } else {
- id = ip + "-" + grpcPort;
- }
- LOG.info("Start to initialize server {}", id);
+
jettyServer = new JettyServer(shuffleServerConf);
registerMetrics();
@@ -228,7 +231,7 @@ public class ShuffleServer {
}
registerHeartBeat = new RegisterHeartBeat(this);
- shuffleFlushManager = new ShuffleFlushManager(shuffleServerConf, id, this,
storageManager);
+ shuffleFlushManager = new ShuffleFlushManager(shuffleServerConf, this,
storageManager);
shuffleBufferManager = new ShuffleBufferManager(shuffleServerConf,
shuffleFlushManager);
shuffleTaskManager = new ShuffleTaskManager(shuffleServerConf,
shuffleFlushManager,
shuffleBufferManager, storageManager);
@@ -262,7 +265,7 @@ public class ShuffleServer {
}
}
- private void registerMetrics() throws Exception {
+ private void registerMetrics() {
LOG.info("Register metrics");
CollectorRegistry shuffleServerCollectorRegistry = new
CollectorRegistry(true);
ShuffleServerMetrics.register(shuffleServerCollectorRegistry);
@@ -291,7 +294,9 @@ public class ShuffleServer {
jettyServer.addServlet(
new CommonMetricsServlet(JvmMetrics.getCollectorRegistry(), true),
"/prometheus/metrics/jvm");
+ }
+ private void initMetricsReporter() throws Exception {
metricReporter =
MetricReporterFactory.getMetricReporter(shuffleServerConf, id);
if (metricReporter != null) {
metricReporter.addCollectorRegistry(ShuffleServerMetrics.getCollectorRegistry());
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 5f7b21e1..566e4e7a 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -31,6 +31,7 @@ import org.apache.uniffle.common.util.RssUtils;
public class ShuffleServerConf extends RssBaseConf {
public static final String PREFIX_HADOOP_CONF = "rss.server.hadoop";
+ public static final String SHUFFLE_SERVER_ID = "rss.server.id";
public static final ConfigOption<Long> SERVER_BUFFER_CAPACITY = ConfigOptions
.key("rss.server.buffer.capacity")
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHdfsTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHdfsTest.java
index 2c730aa8..45a69ada 100644
---
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHdfsTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHdfsTest.java
@@ -111,7 +111,7 @@ public class ShuffleFlushManagerOnKerberizedHdfsTest
extends KerberizedHdfsBase
storageManager.registerRemoteStorage(appId1, remoteStorage);
storageManager.registerRemoteStorage(appId2, remoteStorage);
ShuffleFlushManager manager =
- new ShuffleFlushManager(shuffleServerConf, "shuffleServerId",
mockShuffleServer, storageManager);
+ new ShuffleFlushManager(shuffleServerConf, mockShuffleServer,
storageManager);
ShuffleDataFlushEvent event1 =
createShuffleDataFlushEvent(appId1, 1, 0, 1, null);
manager.addToFlushQueue(event1);
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 1645b9d1..aa8f88b6 100644
---
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -113,7 +113,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
StorageManager storageManager =
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
ShuffleFlushManager manager =
- new ShuffleFlushManager(shuffleServerConf, "shuffleServerId",
mockShuffleServer, storageManager);
+ new ShuffleFlushManager(shuffleServerConf, mockShuffleServer,
storageManager);
assertEquals("2", manager.getHadoopConf().get("dfs.replication"));
assertEquals("value", manager.getHadoopConf().get("a.b"));
}
@@ -135,7 +135,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
storageManager.registerRemoteStorage(appId, remoteStorage);
ShuffleFlushManager manager =
- new ShuffleFlushManager(shuffleServerConf, "shuffleServerId",
mockShuffleServer, storageManager);
+ new ShuffleFlushManager(shuffleServerConf, mockShuffleServer,
storageManager);
for (int i = 0; i < 10; i++) {
ShuffleDataFlushEvent shuffleDataFlushEvent =
createShuffleDataFlushEvent(appId, i, 1, 1, null);
@@ -163,7 +163,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
storageManager.registerRemoteStorage(appId, remoteStorage);
ShuffleFlushManager manager =
- new ShuffleFlushManager(shuffleServerConf, "shuffleServerId",
mockShuffleServer, storageManager);
+ new ShuffleFlushManager(shuffleServerConf, mockShuffleServer,
storageManager);
IntStream.range(0, 20).forEach(x -> {
ShuffleDataFlushEvent event = createShuffleDataFlushEvent(appId, 1, 1,
1, null);
@@ -191,7 +191,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
assertEquals(0.0,
ShuffleServerMetrics.counterRemoteStorageFailedWrite.get(storageHost).get(),
0.5);
assertEquals(0.0,
ShuffleServerMetrics.counterRemoteStorageSuccessWrite.get(storageHost).get(),
0.5);
ShuffleFlushManager manager =
- new ShuffleFlushManager(shuffleServerConf, "shuffleServerId",
mockShuffleServer, storageManager);
+ new ShuffleFlushManager(shuffleServerConf, mockShuffleServer,
storageManager);
ShuffleDataFlushEvent event1 =
createShuffleDataFlushEvent(appId, 1, 1, 1, null);
final List<ShufflePartitionedBlock> blocks1 = event1.getShuffleBlocks();
@@ -236,7 +236,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
List<ShufflePartitionedBlock> expectedBlocks = Lists.newArrayList();
List<ShuffleDataFlushEvent> flushEvents1 = Lists.newArrayList();
List<ShuffleDataFlushEvent> flushEvents2 = Lists.newArrayList();
- ShuffleFlushManager manager = new ShuffleFlushManager(shuffleServerConf,
"shuffleServerId",
+ ShuffleFlushManager manager = new ShuffleFlushManager(shuffleServerConf,
mockShuffleServer, storageManager);
for (int i = 0; i < 30; i++) {
ShuffleDataFlushEvent flushEvent1 = createShuffleDataFlushEvent(appId,
1, 1, 1, null);
@@ -275,7 +275,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
storageManager.registerRemoteStorage(appId1, remoteStorage);
storageManager.registerRemoteStorage(appId2, remoteStorage);
ShuffleFlushManager manager =
- new ShuffleFlushManager(shuffleServerConf, "shuffleServerId",
mockShuffleServer, storageManager);
+ new ShuffleFlushManager(shuffleServerConf, mockShuffleServer,
storageManager);
ShuffleDataFlushEvent event1 =
createShuffleDataFlushEvent(appId1, 1, 0, 1, null);
manager.addToFlushQueue(event1);
@@ -342,7 +342,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
StorageManager storageManager =
StorageManagerFactory.getInstance().createStorageManager(serverConf);
ShuffleFlushManager manager =
- new ShuffleFlushManager(serverConf, "shuffleServerId",
mockShuffleServer, storageManager);
+ new ShuffleFlushManager(serverConf, mockShuffleServer, storageManager);
ShuffleDataFlushEvent event1 =
createShuffleDataFlushEvent(appId1, 1, 0, 1, null);
manager.addToFlushQueue(event1);
@@ -513,7 +513,6 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
ShuffleFlushManager flushManager = new ShuffleFlushManager(
shuffleServerConf,
- "shuffle-server-id",
mockShuffleServer,
storageManager
);
@@ -555,7 +554,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
StorageManager storageManager =
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
ShuffleFlushManager manager =
- new ShuffleFlushManager(shuffleServerConf, "shuffleServerId",
mockShuffleServer, storageManager);
+ new ShuffleFlushManager(shuffleServerConf, mockShuffleServer,
storageManager);
ShuffleDataFlushEvent event = new ShuffleDataFlushEvent(1, "1", 1, 1, 1,
100, null, null, null);
assertEquals(0, manager.getPendingEventsSize());
manager.addPendingEvents(event);
diff --git
a/server/src/test/java/org/apache/uniffle/server/TestShuffleFlushManager.java
b/server/src/test/java/org/apache/uniffle/server/TestShuffleFlushManager.java
index 675b765e..3ce5efd0 100644
---
a/server/src/test/java/org/apache/uniffle/server/TestShuffleFlushManager.java
+++
b/server/src/test/java/org/apache/uniffle/server/TestShuffleFlushManager.java
@@ -24,7 +24,7 @@ import org.apache.uniffle.server.storage.StorageManager;
public class TestShuffleFlushManager extends ShuffleFlushManager {
public TestShuffleFlushManager(ShuffleServerConf shuffleServerConf, String
shuffleServerId,
ShuffleServer shuffleServer, StorageManager
storageManager) {
- super(shuffleServerConf, shuffleServerId, shuffleServer, storageManager);
+ super(shuffleServerConf, shuffleServer, storageManager);
}
@Override
diff --git
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
index a5faffa6..ef456088 100644
---
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
@@ -391,7 +391,7 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
ShuffleServer mockShuffleServer = mock(ShuffleServer.class);
StorageManager storageManager =
StorageManagerFactory.getInstance().createStorageManager(conf);
ShuffleFlushManager shuffleFlushManager = new ShuffleFlushManager(conf,
- "serverId", mockShuffleServer, storageManager);
+ mockShuffleServer, storageManager);
shuffleBufferManager = new ShuffleBufferManager(conf, shuffleFlushManager);
when(mockShuffleServer
@@ -464,7 +464,7 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
ShuffleServer mockShuffleServer = mock(ShuffleServer.class);
StorageManager storageManager =
StorageManagerFactory.getInstance().createStorageManager(shuffleConf);
ShuffleFlushManager shuffleFlushManager =
- new ShuffleFlushManager(shuffleConf, "serverId", mockShuffleServer,
storageManager);
+ new ShuffleFlushManager(shuffleConf, mockShuffleServer,
storageManager);
shuffleBufferManager = new ShuffleBufferManager(shuffleConf,
shuffleFlushManager);
ShuffleTaskManager shuffleTaskManager =
new ShuffleTaskManager(shuffleConf, shuffleFlushManager,
shuffleBufferManager, storageManager);
@@ -528,7 +528,7 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
ShuffleServer mockShuffleServer = mock(ShuffleServer.class);
StorageManager storageManager =
StorageManagerFactory.getInstance().createStorageManager(shuffleConf);
ShuffleFlushManager shuffleFlushManager =
- new ShuffleFlushManager(shuffleConf, "serverId",
mockShuffleServer, storageManager);
+ new ShuffleFlushManager(shuffleConf, mockShuffleServer,
storageManager);
shuffleBufferManager = new ShuffleBufferManager(shuffleConf,
shuffleFlushManager);
when(mockShuffleServer