This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 5875eb37 Introduce startup-silent-period mechanism to avoid partial 
assignments (#247)
5875eb37 is described below

commit 5875eb37e8e038d72afd7483c4d4807d05d9e08a
Author: Junfan Zhang <zus...@apache.org>
AuthorDate: Mon Oct 10 11:28:00 2022 +0800

    Introduce startup-silent-period mechanism to avoid partial assignments 
(#247)
    
    ### What changes were proposed in this pull request?
    Introduce startup-silent-period mechanism to avoid partial assignments
    
    ### Why are the changes needed?
    When changing some coordinator's conf and then restart, coordinator will 
accept client getAssignment request immediately, but it will serve for jobs 
request based on the partial registered shuffle-servers, which will make some 
jobs gotten not enough required shuffle-servers and then slow the running speed.
    
    I think we should make coordinator wait for more than one shuffle-server 
heartbeat interval before serving for client. During out-of-service, requests 
from client will fallback to slave coordinator.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes
    
    ### How was this patch tested?
    UTs
---
 .../apache/uniffle/coordinator/ClusterManager.java |  5 +++
 .../uniffle/coordinator/CoordinatorConf.java       | 13 ++++++
 .../coordinator/CoordinatorGrpcService.java        |  4 ++
 .../uniffle/coordinator/SimpleClusterManager.java  | 38 +++++++++++++++++
 .../coordinator/SimpleClusterManagerTest.java      | 13 ++++++
 docs/coordinator_guide.md                          |  8 ++--
 ...berTest.java => CoordinatorAssignmentTest.java} | 49 +++++++++++++++++++---
 .../apache/uniffle/test/IntegrationTestBase.java   |  1 +
 8 files changed, 122 insertions(+), 9 deletions(-)

diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
index b8005c0a..8db4abc5 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
@@ -49,4 +49,9 @@ public interface ClusterManager extends Closeable {
   List<ServerNode> list();
 
   int getShuffleNodesMax();
+
+  /**
+   * @return whether to be ready for serving
+   */
+  boolean isReadyForServe();
 }
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
index b343ac64..70408cb5 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
@@ -159,6 +159,19 @@ public class CoordinatorConf extends RssBaseConf {
           .enumType(AbstractAssignmentStrategy.HostAssignmentStrategy.class)
           
.defaultValue(AbstractAssignmentStrategy.HostAssignmentStrategy.PREFER_DIFF)
           .withDescription("Strategy for selecting shuffle servers");
+  public static final ConfigOption<Boolean> 
COORDINATOR_START_SILENT_PERIOD_ENABLED = ConfigOptions
+      .key("rss.coordinator.startup-silent-period.enabled")
+      .booleanType()
+      .defaultValue(false)
+      .withDescription("Enable the startup-silent-period to reject the 
assignment requests "
+          + "for avoiding partial assignments. To avoid service interruption, 
this mechanism is disabled by default. "
+          + "Especially it's recommended to use in coordinator HA mode when 
restarting single coordinator.");
+  public static final ConfigOption<Long> 
COORDINATOR_START_SILENT_PERIOD_DURATION = ConfigOptions
+      .key("rss.coordinator.startup-silent-period.duration")
+      .longType()
+      .defaultValue(20 * 1000L)
+      .withDescription("The waiting duration(ms) when conf of "
+          + COORDINATOR_START_SILENT_PERIOD_ENABLED + " is enabled.");
 
   public CoordinatorConf() {
   }
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
index d2c3fc28..987c85a9 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
@@ -120,6 +120,10 @@ public class CoordinatorGrpcService extends 
CoordinatorServerGrpc.CoordinatorSer
 
     GetShuffleAssignmentsResponse response;
     try {
+      if (!coordinatorServer.getClusterManager().isReadyForServe()) {
+        throw new Exception("Coordinator is out-of-service when in starting.");
+      }
+
       final PartitionRangeAssignment pra =
           coordinatorServer
               .getAssignmentStrategy()
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
index c60a0d8d..cc9b553e 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
@@ -65,6 +65,11 @@ public class SimpleClusterManager implements ClusterManager {
   private long outputAliveServerCount = 0;
   private final long periodicOutputIntervalTimes;
 
+  private long startTime;
+  private boolean startupSilentPeriodEnabled;
+  private long startupSilentPeriodDurationMs;
+  private boolean readyForServe = false;
+
   public SimpleClusterManager(CoordinatorConf conf, Configuration hadoopConf) 
throws Exception {
     this.shuffleNodesMax = 
conf.getInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
     this.heartbeatTimeout = 
conf.getLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT);
@@ -72,6 +77,9 @@ public class SimpleClusterManager implements ClusterManager {
     scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
         ThreadUtils.getThreadFactory("SimpleClusterManager-%d"));
 
+    this.startupSilentPeriodEnabled = 
conf.get(CoordinatorConf.COORDINATOR_START_SILENT_PERIOD_ENABLED);
+    this.startupSilentPeriodDurationMs = 
conf.get(CoordinatorConf.COORDINATOR_START_SILENT_PERIOD_DURATION);
+
     periodicOutputIntervalTimes = 
conf.get(CoordinatorConf.COORDINATOR_NODES_PERIODIC_OUTPUT_INTERVAL_TIMES);
     scheduledExecutorService.scheduleAtFixedRate(
         () -> nodesCheck(), heartbeatTimeout / 3,
@@ -86,6 +94,8 @@ public class SimpleClusterManager implements ClusterManager {
       checkNodesExecutorService.scheduleAtFixedRate(
           () -> updateExcludeNodes(excludeNodesPath), updateNodesInterval, 
updateNodesInterval, TimeUnit.MILLISECONDS);
     }
+
+    this.startTime = System.currentTimeMillis();
   }
 
   void nodesCheck() {
@@ -224,6 +234,19 @@ public class SimpleClusterManager implements 
ClusterManager {
     return shuffleNodesMax;
   }
 
+  @Override
+  public boolean isReadyForServe() {
+    if (!startupSilentPeriodEnabled) {
+      return true;
+    }
+
+    if (!readyForServe && System.currentTimeMillis() - startTime > 
startupSilentPeriodDurationMs) {
+      readyForServe = true;
+    }
+
+    return readyForServe;
+  }
+
   @Override
   public void close() throws IOException {
     if (hadoopFileSystem != null) {
@@ -238,4 +261,19 @@ public class SimpleClusterManager implements 
ClusterManager {
       checkNodesExecutorService.shutdown();
     }
   }
+
+  @VisibleForTesting
+  public void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+
+  @VisibleForTesting
+  public void setReadyForServe(boolean readyForServe) {
+    this.readyForServe = readyForServe;
+  }
+
+  @VisibleForTesting
+  public void setStartupSilentPeriodEnabled(boolean 
startupSilentPeriodEnabled) {
+    this.startupSilentPeriodEnabled = startupSilentPeriodEnabled;
+  }
 }
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
index be3fe2ce..df73bfa2 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
 
 import static org.awaitility.Awaitility.await;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class SimpleClusterManagerTest {
@@ -56,6 +57,18 @@ public class SimpleClusterManagerTest {
     CoordinatorMetrics.clear();
   }
 
+  @Test
+  public void startupSilentPeriodTest() throws Exception {
+    CoordinatorConf coordinatorConf = new CoordinatorConf();
+    
coordinatorConf.set(CoordinatorConf.COORDINATOR_START_SILENT_PERIOD_ENABLED, 
true);
+    
coordinatorConf.set(CoordinatorConf.COORDINATOR_START_SILENT_PERIOD_DURATION, 
20 * 1000L);
+    SimpleClusterManager manager = new SimpleClusterManager(coordinatorConf, 
new Configuration());
+    assertFalse(manager.isReadyForServe());
+
+    manager.setStartTime(System.currentTimeMillis() - 30 * 1000L);
+    assertTrue(manager.isReadyForServe());
+  }
+
   @Test
   public void getServerListTest() throws Exception {
     CoordinatorConf ssc = new CoordinatorConf();
diff --git a/docs/coordinator_guide.md b/docs/coordinator_guide.md
index 79f9209d..13a21906 100644
--- a/docs/coordinator_guide.md
+++ b/docs/coordinator_guide.md
@@ -96,9 +96,11 @@ This document will introduce how to deploy Uniffle 
coordinators.
 |rss.rpc.server.port|-|RPC port for coordinator|
 |rss.jetty.http.port|-|Http port for coordinator|
 |rss.coordinator.remote.storage.select.strategy|APP_BALANCE|Strategy for 
selecting the remote path|
-|rss.coordinator.remote.storage.schedule.time|60000|The time of scheduling the 
read and write time of the paths to obtain different HDFS|
-|rss.coordinator.remote.storage.schedule.file.size|204800000|The size of the 
file that the scheduled thread reads and writes|
-|rss.coordinator.remote.storage.schedule.access.times|3|The number of times to 
read and write HDFS files|
+|rss.coordinator.remote.storage.io.sample.schedule.time|60000|The time of 
scheduling the read and write time of the paths to obtain different HDFS|
+|rss.coordinator.remote.storage.io.sample.file.size|204800000|The size of the 
file that the scheduled thread reads and writes|
+|rss.coordinator.remote.storage.io.sample.access.times|3|The number of times 
to read and write HDFS files|
+|rss.coordinator.startup-silent-period.enabled|false|Enable the 
startup-silent-period to reject the assignment requests for avoiding partial 
assignments. To avoid service interruption, this mechanism is disabled by 
default. Especially it's recommended to use in coordinator HA mode when 
restarting single coordinator.|
+|rss.coordinator.startup-silent-period.duration|20000|The waiting duration(ms) 
when conf of rss.coordinator.startup-silent-period.enabled is enabled.|
 
 ### AccessClusterLoadChecker settings
 |Property Name|Default|        Description|
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentServerNodesNumberTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
similarity index 66%
rename from 
integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentServerNodesNumberTest.java
rename to 
integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
index 2ed3e45f..c45c2e19 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentServerNodesNumberTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
@@ -34,23 +34,34 @@ import org.apache.uniffle.client.util.ClientType;
 import org.apache.uniffle.common.ShuffleAssignmentsInfo;
 import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.SimpleClusterManager;
 import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.storage.util.StorageType;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
-public class AssignmentServerNodesNumberTest extends CoordinatorTestBase {
-  private static final Logger LOG = 
LoggerFactory.getLogger(AssignmentServerNodesNumberTest.class);
+public class CoordinatorAssignmentTest extends CoordinatorTestBase {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CoordinatorAssignmentTest.class);
   private static final int SHUFFLE_NODES_MAX = 10;
   private static final int SERVER_NUM = 10;
   private static final HashSet<String> TAGS = Sets.newHashSet("t1");
 
+  private static final String QUORUM =
+      LOCALHOST + ":" + COORDINATOR_PORT_1 + "," + LOCALHOST + ":" + 
COORDINATOR_PORT_2;
+
   @BeforeAll
   public static void setupServers() throws Exception {
-    CoordinatorConf coordinatorConf = getCoordinatorConf();
-    coordinatorConf.setLong(CoordinatorConf.COORDINATOR_APP_EXPIRED, 2000);
-    coordinatorConf.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, 
SHUFFLE_NODES_MAX);
-    createCoordinatorServer(coordinatorConf);
+    CoordinatorConf coordinatorConf1 = getCoordinatorConf();
+    coordinatorConf1.setLong(CoordinatorConf.COORDINATOR_APP_EXPIRED, 2000);
+    coordinatorConf1.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, 
SHUFFLE_NODES_MAX);
+    createCoordinatorServer(coordinatorConf1);
+
+    CoordinatorConf coordinatorConf2 = getCoordinatorConf();
+    coordinatorConf2.setLong(CoordinatorConf.COORDINATOR_APP_EXPIRED, 2000);
+    coordinatorConf2.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, 
SHUFFLE_NODES_MAX);
+    coordinatorConf2.setInteger(CoordinatorConf.RPC_SERVER_PORT, 
COORDINATOR_PORT_2);
+    coordinatorConf2.setInteger(CoordinatorConf.JETTY_HTTP_PORT, JETTY_PORT_2);
+    createCoordinatorServer(coordinatorConf2);
 
     for (int i = 0; i < SERVER_NUM; i++) {
       ShuffleServerConf shuffleServerConf = getShuffleServerConf();
@@ -65,6 +76,7 @@ public class AssignmentServerNodesNumberTest extends 
CoordinatorTestBase {
       shuffleServerConf.setInteger(RssBaseConf.RPC_SERVER_PORT, 18001 + i);
       shuffleServerConf.setInteger(RssBaseConf.JETTY_HTTP_PORT, 19010 + i);
       shuffleServerConf.set(ShuffleServerConf.TAGS, new ArrayList<>(TAGS));
+      shuffleServerConf.setString("rss.coordinator.quorum", QUORUM);
       createShuffleServer(shuffleServerConf);
     }
     startServers();
@@ -72,6 +84,31 @@ public class AssignmentServerNodesNumberTest extends 
CoordinatorTestBase {
     Thread.sleep(1000 * 5);
   }
 
+  @Test
+  public void testSilentPeriod() throws Exception {
+    ShuffleWriteClientImpl shuffleWriteClient = new 
ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1,
+        1, 1, 1, true, 1, 1);
+    shuffleWriteClient.registerCoordinators(QUORUM);
+
+    // Case1: Disable silent period
+    ShuffleAssignmentsInfo info = 
shuffleWriteClient.getShuffleAssignments("app1", 0, 10, 1, TAGS, -1);
+    assertEquals(SHUFFLE_NODES_MAX, 
info.getServerToPartitionRanges().keySet().size());
+
+    // Case2: Enable silent period mechanism, it should fallback to slave 
coordinator.
+    SimpleClusterManager clusterManager = (SimpleClusterManager) 
coordinators.get(0).getClusterManager();
+    clusterManager.setReadyForServe(false);
+    clusterManager.setStartupSilentPeriodEnabled(true);
+    clusterManager.setStartTime(System.currentTimeMillis() - 1);
+
+    if (clusterManager.getNodesNum() < 10) {
+      info = shuffleWriteClient.getShuffleAssignments("app1", 0, 10, 1, TAGS, 
-1);
+      assertEquals(SHUFFLE_NODES_MAX, 
info.getServerToPartitionRanges().keySet().size());
+    }
+
+    // recover
+    clusterManager.setReadyForServe(true);
+  }
+
   @Test
   public void testAssignmentServerNodesNumber() throws Exception {
     ShuffleWriteClientImpl shuffleWriteClient = new 
ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1,
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
index dff9848b..06e3be7b 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
@@ -44,6 +44,7 @@ public abstract class IntegrationTestBase extends 
HdfsTestBase {
   protected static final int COORDINATOR_PORT_1 = 19999;
   protected static final int COORDINATOR_PORT_2 = 20030;
   protected static final int JETTY_PORT_1 = 19998;
+  protected static final int JETTY_PORT_2 = 20040;
   protected static final String COORDINATOR_QUORUM = LOCALHOST + ":" + 
COORDINATOR_PORT_1;
 
   protected static List<ShuffleServer> shuffleServers = Lists.newArrayList();

Reply via email to