[ 
https://issues.apache.org/jira/browse/HDFS-14750?focusedWorklogId=767107&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-767107
 ]

ASF GitHub Bot logged work on HDFS-14750:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 06/May/22 09:49
            Start Date: 06/May/22 09:49
    Worklog Time Spent: 10m 
      Work Description: ferhui commented on code in PR #4199:
URL: https://github.com/apache/hadoop/pull/4199#discussion_r866622590


##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/DynamicRouterRpcFairnessPolicyController.java:
##########
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.fairness;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import 
org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.utils.AdjustableSemaphore;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_DEFAULT;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_KEY;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
+
+/**
+ * Dynamic fairness policy extending {@link 
StaticRouterRpcFairnessPolicyController}
+ * and fetching handlers from configuration for all available name services.
+ * The handlers count changes according to traffic to namespaces.
+ * Total handlers might NOT strictly add up to the value defined by 
DFS_ROUTER_HANDLER_COUNT_KEY.
+ */
+public class DynamicRouterRpcFairnessPolicyController
+    extends StaticRouterRpcFairnessPolicyController {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DynamicRouterRpcFairnessPolicyController.class);
+
+  private static final ScheduledExecutorService scheduledExecutor = 
HadoopExecutors
+      .newSingleThreadScheduledExecutor(new 
ThreadFactoryBuilder().setDaemon(true)
+          
.setNameFormat("DynamicRouterRpcFairnessPolicyControllerPermitsResizer").build());
+  private PermitsResizerService permitsResizerService;
+  private ScheduledFuture<?> refreshTask;
+  private int handlerCount;
+
+  /**
+   * Initializes using the same logic as {@link 
StaticRouterRpcFairnessPolicyController}
+   * and starts a periodic semaphore resizer thread
+   *
+   * @param conf configuration
+   */
+  public DynamicRouterRpcFairnessPolicyController(Configuration conf) {
+    super(conf);
+    handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, 
DFS_ROUTER_HANDLER_COUNT_DEFAULT);
+    long refreshInterval =
+        
conf.getTimeDuration(DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_KEY,
+            DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_DEFAULT, 
TimeUnit.MILLISECONDS);
+    permitsResizerService = new PermitsResizerService();
+    refreshTask = scheduledExecutor
+        .scheduleWithFixedDelay(permitsResizerService, refreshInterval, 
refreshInterval,
+            TimeUnit.MILLISECONDS);
+  }
+
+  @VisibleForTesting
+  public DynamicRouterRpcFairnessPolicyController(Configuration conf, long 
refreshInterval) {
+    super(conf);
+    handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, 
DFS_ROUTER_HANDLER_COUNT_DEFAULT);
+    permitsResizerService = new PermitsResizerService();
+    refreshTask = scheduledExecutor
+        .scheduleWithFixedDelay(permitsResizerService, refreshInterval, 
refreshInterval,
+            TimeUnit.MILLISECONDS);
+  }
+
+  @VisibleForTesting
+  public void refreshPermitsCap() {
+    permitsResizerService.run();
+  }
+
+  @Override
+  public void shutdown() {
+    super.shutdown();
+    if (refreshTask != null) {
+      refreshTask.cancel(true);
+    }
+    if (scheduledExecutor != null) {
+      scheduledExecutor.shutdown();
+    }
+  }
+
+  class PermitsResizerService implements Runnable {
+
+    @Override
+    public synchronized void run() {
+      long totalOps = 0;
+      Map<String, Long> nsOps = new HashMap<>();
+      for (Map.Entry<String, AdjustableSemaphore> entry : permits.entrySet()) {
+        long ops = (rejectedPermitsPerNs.containsKey(entry.getKey()) ?
+            rejectedPermitsPerNs.get(entry.getKey()).longValue() :
+            0) + (acceptedPermitsPerNs.containsKey(entry.getKey()) ?
+            acceptedPermitsPerNs.get(entry.getKey()).longValue() :
+            0);
+        nsOps.put(entry.getKey(), ops);
+        totalOps += ops;
+      }
+
+      for (Map.Entry<String, AdjustableSemaphore> entry : permits.entrySet()) {
+        String ns = entry.getKey();
+        AdjustableSemaphore semaphore = entry.getValue();
+        int oldPermitCap = permitSizes.get(ns);
+        int newPermitCap = (int) Math.ceil((float) nsOps.get(ns) / totalOps * 
handlerCount);
+        // Leave at least 1 handler even if there's no traffic
+        if (newPermitCap == 0) {
+          newPermitCap = 1;

Review Comment:
   How about adding a new configuration for this.
   e.g. xxx.min.handler.count



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/DynamicRouterRpcFairnessPolicyController.java:
##########
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.fairness;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import 
org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.utils.AdjustableSemaphore;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_DEFAULT;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_KEY;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
+
+/**
+ * Dynamic fairness policy extending {@link 
StaticRouterRpcFairnessPolicyController}
+ * and fetching handlers from configuration for all available name services.
+ * The handlers count changes according to traffic to namespaces.
+ * Total handlers might NOT strictly add up to the value defined by 
DFS_ROUTER_HANDLER_COUNT_KEY.
+ */
+public class DynamicRouterRpcFairnessPolicyController
+    extends StaticRouterRpcFairnessPolicyController {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DynamicRouterRpcFairnessPolicyController.class);
+
+  private static final ScheduledExecutorService scheduledExecutor = 
HadoopExecutors
+      .newSingleThreadScheduledExecutor(new 
ThreadFactoryBuilder().setDaemon(true)
+          
.setNameFormat("DynamicRouterRpcFairnessPolicyControllerPermitsResizer").build());
+  private PermitsResizerService permitsResizerService;
+  private ScheduledFuture<?> refreshTask;
+  private int handlerCount;
+
+  /**
+   * Initializes using the same logic as {@link 
StaticRouterRpcFairnessPolicyController}
+   * and starts a periodic semaphore resizer thread
+   *
+   * @param conf configuration
+   */
+  public DynamicRouterRpcFairnessPolicyController(Configuration conf) {
+    super(conf);
+    handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, 
DFS_ROUTER_HANDLER_COUNT_DEFAULT);
+    long refreshInterval =
+        
conf.getTimeDuration(DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_KEY,
+            DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_DEFAULT, 
TimeUnit.MILLISECONDS);
+    permitsResizerService = new PermitsResizerService();
+    refreshTask = scheduledExecutor
+        .scheduleWithFixedDelay(permitsResizerService, refreshInterval, 
refreshInterval,
+            TimeUnit.MILLISECONDS);
+  }
+
+  @VisibleForTesting
+  public DynamicRouterRpcFairnessPolicyController(Configuration conf, long 
refreshInterval) {
+    super(conf);
+    handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, 
DFS_ROUTER_HANDLER_COUNT_DEFAULT);
+    permitsResizerService = new PermitsResizerService();
+    refreshTask = scheduledExecutor
+        .scheduleWithFixedDelay(permitsResizerService, refreshInterval, 
refreshInterval,
+            TimeUnit.MILLISECONDS);
+  }
+
+  @VisibleForTesting
+  public void refreshPermitsCap() {
+    permitsResizerService.run();
+  }
+
+  @Override
+  public void shutdown() {
+    super.shutdown();
+    if (refreshTask != null) {
+      refreshTask.cancel(true);
+    }
+    if (scheduledExecutor != null) {
+      scheduledExecutor.shutdown();
+    }
+  }
+
+  class PermitsResizerService implements Runnable {
+
+    @Override
+    public synchronized void run() {
+      long totalOps = 0;
+      Map<String, Long> nsOps = new HashMap<>();
+      for (Map.Entry<String, AdjustableSemaphore> entry : permits.entrySet()) {
+        long ops = (rejectedPermitsPerNs.containsKey(entry.getKey()) ?
+            rejectedPermitsPerNs.get(entry.getKey()).longValue() :
+            0) + (acceptedPermitsPerNs.containsKey(entry.getKey()) ?
+            acceptedPermitsPerNs.get(entry.getKey()).longValue() :
+            0);
+        nsOps.put(entry.getKey(), ops);
+        totalOps += ops;
+      }
+
+      for (Map.Entry<String, AdjustableSemaphore> entry : permits.entrySet()) {

Review Comment:
   It seems that maybe handlerCount does not equal to total newPermitCap, right?



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestDynamicRouterRpcFairnessPolicyController.java:
##########
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.fairness;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.LongAdder;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+
+import static 
org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
+
+/**
+ * Test functionality of {@link DynamicRouterRpcFairnessPolicyController).
+ */
+public class TestDynamicRouterRpcFairnessPolicyController {
+
+  private static String nameServices = "ns1.nn1, ns1.nn2, ns2.nn1, ns2.nn2";
+
+  @Test
+  public void testDynamicControllerSimple() throws InterruptedException {
+    verifyDynamicControllerSimple(true);
+    verifyDynamicControllerSimple(false);
+  }
+
+  @Test
+  public void testDynamicControllerAllPermitsAcquired() throws 
InterruptedException {
+    verifyDynamicControllerAllPermitsAcquired(true);
+    verifyDynamicControllerAllPermitsAcquired(false);
+  }
+
+  private void verifyDynamicControllerSimple(boolean manualRefresh)
+      throws InterruptedException {
+    // 3 permits each ns
+    DynamicRouterRpcFairnessPolicyController controller;
+    if (manualRefresh) {
+      controller = getFairnessPolicyController(9);
+    } else {
+      controller = getFairnessPolicyController(9, 4000);
+    }
+    for (int i = 0; i < 3; i++) {
+      Assert.assertTrue(controller.acquirePermit("ns1"));
+      Assert.assertTrue(controller.acquirePermit("ns2"));
+      Assert.assertTrue(controller.acquirePermit(CONCURRENT_NS));
+    }
+    Assert.assertFalse(controller.acquirePermit("ns1"));
+    Assert.assertFalse(controller.acquirePermit("ns2"));
+    Assert.assertFalse(controller.acquirePermit(CONCURRENT_NS));
+
+    // Release all permits
+    for (int i = 0; i < 3; i++) {
+      controller.releasePermit("ns1");
+      controller.releasePermit("ns2");
+      controller.releasePermit(CONCURRENT_NS);
+    }
+
+    // Inject dummy metrics
+    // Split half half for ns1 and concurrent
+    Map<String, LongAdder> rejectedPermitsPerNs = new HashMap<>();
+    Map<String, LongAdder> acceptedPermitsPerNs = new HashMap<>();
+    injectDummyMetrics(rejectedPermitsPerNs, "ns1", 10);
+    injectDummyMetrics(rejectedPermitsPerNs, "ns2", 0);
+    injectDummyMetrics(rejectedPermitsPerNs, CONCURRENT_NS, 10);
+    controller.setMetrics(rejectedPermitsPerNs, acceptedPermitsPerNs);
+
+    if (manualRefresh) {
+      controller.refreshPermitsCap();
+    } else {
+      Thread.sleep(5000);

Review Comment:
   Better to use GenericTestUtils.waitFor instead of sleep.



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestDynamicRouterRpcFairnessPolicyController.java:
##########
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.fairness;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.LongAdder;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+
+import static 
org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
+import static 
org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
+
+/**
+ * Test functionality of {@link DynamicRouterRpcFairnessPolicyController).
+ */
+public class TestDynamicRouterRpcFairnessPolicyController {
+
+  private static String nameServices = "ns1.nn1, ns1.nn2, ns2.nn1, ns2.nn2";
+
+  @Test
+  public void testDynamicControllerSimple() throws InterruptedException {
+    verifyDynamicControllerSimple(true);
+    verifyDynamicControllerSimple(false);
+  }
+
+  @Test
+  public void testDynamicControllerAllPermitsAcquired() throws 
InterruptedException {
+    verifyDynamicControllerAllPermitsAcquired(true);
+    verifyDynamicControllerAllPermitsAcquired(false);
+  }
+
+  private void verifyDynamicControllerSimple(boolean manualRefresh)
+      throws InterruptedException {
+    // 3 permits each ns
+    DynamicRouterRpcFairnessPolicyController controller;
+    if (manualRefresh) {
+      controller = getFairnessPolicyController(9);
+    } else {
+      controller = getFairnessPolicyController(9, 4000);
+    }
+    for (int i = 0; i < 3; i++) {
+      Assert.assertTrue(controller.acquirePermit("ns1"));
+      Assert.assertTrue(controller.acquirePermit("ns2"));
+      Assert.assertTrue(controller.acquirePermit(CONCURRENT_NS));
+    }
+    Assert.assertFalse(controller.acquirePermit("ns1"));
+    Assert.assertFalse(controller.acquirePermit("ns2"));
+    Assert.assertFalse(controller.acquirePermit(CONCURRENT_NS));
+
+    // Release all permits
+    for (int i = 0; i < 3; i++) {
+      controller.releasePermit("ns1");
+      controller.releasePermit("ns2");
+      controller.releasePermit(CONCURRENT_NS);
+    }
+
+    // Inject dummy metrics
+    // Split half half for ns1 and concurrent
+    Map<String, LongAdder> rejectedPermitsPerNs = new HashMap<>();
+    Map<String, LongAdder> acceptedPermitsPerNs = new HashMap<>();
+    injectDummyMetrics(rejectedPermitsPerNs, "ns1", 10);
+    injectDummyMetrics(rejectedPermitsPerNs, "ns2", 0);
+    injectDummyMetrics(rejectedPermitsPerNs, CONCURRENT_NS, 10);
+    controller.setMetrics(rejectedPermitsPerNs, acceptedPermitsPerNs);
+
+    if (manualRefresh) {
+      controller.refreshPermitsCap();
+    } else {
+      Thread.sleep(5000);
+    }
+
+    // Current permits count should be 5:1:5

Review Comment:
   Total old permits don't equal to total new permits





Issue Time Tracking
-------------------

    Worklog Id:     (was: 767107)
    Time Spent: 0.5h  (was: 20m)

> RBF: Improved isolation for downstream name nodes. {Dynamic}
> ------------------------------------------------------------
>
>                 Key: HDFS-14750
>                 URL: https://issues.apache.org/jira/browse/HDFS-14750
>             Project: Hadoop HDFS
>          Issue Type: Improvement
>            Reporter: CR Hota
>            Assignee: CR Hota
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> This Jira tracks the work around dynamic allocation of resources in routers 
> for downstream hdfs clusters. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to