[ https://issues.apache.org/jira/browse/HDFS-14750?focusedWorklogId=767107&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-767107 ]
ASF GitHub Bot logged work on HDFS-14750: ----------------------------------------- Author: ASF GitHub Bot Created on: 06/May/22 09:49 Start Date: 06/May/22 09:49 Worklog Time Spent: 10m Work Description: ferhui commented on code in PR #4199: URL: https://github.com/apache/hadoop/pull/4199#discussion_r866622590 ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/DynamicRouterRpcFairnessPolicyController.java: ########## @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.fairness; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.utils.AdjustableSemaphore; +import org.apache.hadoop.util.concurrent.HadoopExecutors; + +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_DEFAULT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; + +/** + * Dynamic fairness policy extending {@link StaticRouterRpcFairnessPolicyController} + * and fetching handlers from configuration for all available name services. + * The handlers count changes according to traffic to namespaces. + * Total handlers might NOT strictly add up to the value defined by DFS_ROUTER_HANDLER_COUNT_KEY. + */ +public class DynamicRouterRpcFairnessPolicyController + extends StaticRouterRpcFairnessPolicyController { + + private static final Logger LOG = + LoggerFactory.getLogger(DynamicRouterRpcFairnessPolicyController.class); + + private static final ScheduledExecutorService scheduledExecutor = HadoopExecutors + .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("DynamicRouterRpcFairnessPolicyControllerPermitsResizer").build()); + private PermitsResizerService permitsResizerService; + private ScheduledFuture<?> refreshTask; + private int handlerCount; + + /** + * Initializes using the same logic as {@link StaticRouterRpcFairnessPolicyController} + * and starts a periodic semaphore resizer thread + * + * @param conf configuration + */ + public DynamicRouterRpcFairnessPolicyController(Configuration conf) { + super(conf); + handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, DFS_ROUTER_HANDLER_COUNT_DEFAULT); + long refreshInterval = + conf.getTimeDuration(DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_KEY, + DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); + permitsResizerService = new PermitsResizerService(); + refreshTask = scheduledExecutor + .scheduleWithFixedDelay(permitsResizerService, refreshInterval, refreshInterval, + TimeUnit.MILLISECONDS); + } + + @VisibleForTesting + public DynamicRouterRpcFairnessPolicyController(Configuration conf, long refreshInterval) { + super(conf); + handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, DFS_ROUTER_HANDLER_COUNT_DEFAULT); + permitsResizerService = new PermitsResizerService(); + refreshTask = scheduledExecutor + .scheduleWithFixedDelay(permitsResizerService, refreshInterval, refreshInterval, + TimeUnit.MILLISECONDS); + } + + @VisibleForTesting + public void refreshPermitsCap() { + permitsResizerService.run(); + } + + @Override + public void shutdown() { + super.shutdown(); + if (refreshTask != null) { + refreshTask.cancel(true); + } + if (scheduledExecutor != null) { + scheduledExecutor.shutdown(); + } + } + + class PermitsResizerService implements Runnable { + + @Override + public synchronized void run() { + long totalOps = 0; + Map<String, Long> nsOps = new HashMap<>(); + for (Map.Entry<String, AdjustableSemaphore> entry : permits.entrySet()) { + long ops = (rejectedPermitsPerNs.containsKey(entry.getKey()) ? + rejectedPermitsPerNs.get(entry.getKey()).longValue() : + 0) + (acceptedPermitsPerNs.containsKey(entry.getKey()) ? + acceptedPermitsPerNs.get(entry.getKey()).longValue() : + 0); + nsOps.put(entry.getKey(), ops); + totalOps += ops; + } + + for (Map.Entry<String, AdjustableSemaphore> entry : permits.entrySet()) { + String ns = entry.getKey(); + AdjustableSemaphore semaphore = entry.getValue(); + int oldPermitCap = permitSizes.get(ns); + int newPermitCap = (int) Math.ceil((float) nsOps.get(ns) / totalOps * handlerCount); + // Leave at least 1 handler even if there's no traffic + if (newPermitCap == 0) { + newPermitCap = 1; Review Comment: How about adding a new configuration for this. e.g. xxx.min.handler.count ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/DynamicRouterRpcFairnessPolicyController.java: ########## @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.fairness; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.utils.AdjustableSemaphore; +import org.apache.hadoop.util.concurrent.HadoopExecutors; + +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_DEFAULT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; + +/** + * Dynamic fairness policy extending {@link StaticRouterRpcFairnessPolicyController} + * and fetching handlers from configuration for all available name services. + * The handlers count changes according to traffic to namespaces. + * Total handlers might NOT strictly add up to the value defined by DFS_ROUTER_HANDLER_COUNT_KEY. + */ +public class DynamicRouterRpcFairnessPolicyController + extends StaticRouterRpcFairnessPolicyController { + + private static final Logger LOG = + LoggerFactory.getLogger(DynamicRouterRpcFairnessPolicyController.class); + + private static final ScheduledExecutorService scheduledExecutor = HadoopExecutors + .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("DynamicRouterRpcFairnessPolicyControllerPermitsResizer").build()); + private PermitsResizerService permitsResizerService; + private ScheduledFuture<?> refreshTask; + private int handlerCount; + + /** + * Initializes using the same logic as {@link StaticRouterRpcFairnessPolicyController} + * and starts a periodic semaphore resizer thread + * + * @param conf configuration + */ + public DynamicRouterRpcFairnessPolicyController(Configuration conf) { + super(conf); + handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, DFS_ROUTER_HANDLER_COUNT_DEFAULT); + long refreshInterval = + conf.getTimeDuration(DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_KEY, + DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); + permitsResizerService = new PermitsResizerService(); + refreshTask = scheduledExecutor + .scheduleWithFixedDelay(permitsResizerService, refreshInterval, refreshInterval, + TimeUnit.MILLISECONDS); + } + + @VisibleForTesting + public DynamicRouterRpcFairnessPolicyController(Configuration conf, long refreshInterval) { + super(conf); + handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, DFS_ROUTER_HANDLER_COUNT_DEFAULT); + permitsResizerService = new PermitsResizerService(); + refreshTask = scheduledExecutor + .scheduleWithFixedDelay(permitsResizerService, refreshInterval, refreshInterval, + TimeUnit.MILLISECONDS); + } + + @VisibleForTesting + public void refreshPermitsCap() { + permitsResizerService.run(); + } + + @Override + public void shutdown() { + super.shutdown(); + if (refreshTask != null) { + refreshTask.cancel(true); + } + if (scheduledExecutor != null) { + scheduledExecutor.shutdown(); + } + } + + class PermitsResizerService implements Runnable { + + @Override + public synchronized void run() { + long totalOps = 0; + Map<String, Long> nsOps = new HashMap<>(); + for (Map.Entry<String, AdjustableSemaphore> entry : permits.entrySet()) { + long ops = (rejectedPermitsPerNs.containsKey(entry.getKey()) ? + rejectedPermitsPerNs.get(entry.getKey()).longValue() : + 0) + (acceptedPermitsPerNs.containsKey(entry.getKey()) ? + acceptedPermitsPerNs.get(entry.getKey()).longValue() : + 0); + nsOps.put(entry.getKey(), ops); + totalOps += ops; + } + + for (Map.Entry<String, AdjustableSemaphore> entry : permits.entrySet()) { Review Comment: It seems that maybe handlerCount does not equal to total newPermitCap, right? ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestDynamicRouterRpcFairnessPolicyController.java: ########## @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.fairness; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.LongAdder; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; + +import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE; + +/** + * Test functionality of {@link DynamicRouterRpcFairnessPolicyController). + */ +public class TestDynamicRouterRpcFairnessPolicyController { + + private static String nameServices = "ns1.nn1, ns1.nn2, ns2.nn1, ns2.nn2"; + + @Test + public void testDynamicControllerSimple() throws InterruptedException { + verifyDynamicControllerSimple(true); + verifyDynamicControllerSimple(false); + } + + @Test + public void testDynamicControllerAllPermitsAcquired() throws InterruptedException { + verifyDynamicControllerAllPermitsAcquired(true); + verifyDynamicControllerAllPermitsAcquired(false); + } + + private void verifyDynamicControllerSimple(boolean manualRefresh) + throws InterruptedException { + // 3 permits each ns + DynamicRouterRpcFairnessPolicyController controller; + if (manualRefresh) { + controller = getFairnessPolicyController(9); + } else { + controller = getFairnessPolicyController(9, 4000); + } + for (int i = 0; i < 3; i++) { + Assert.assertTrue(controller.acquirePermit("ns1")); + Assert.assertTrue(controller.acquirePermit("ns2")); + Assert.assertTrue(controller.acquirePermit(CONCURRENT_NS)); + } + Assert.assertFalse(controller.acquirePermit("ns1")); + Assert.assertFalse(controller.acquirePermit("ns2")); + Assert.assertFalse(controller.acquirePermit(CONCURRENT_NS)); + + // Release all permits + for (int i = 0; i < 3; i++) { + controller.releasePermit("ns1"); + controller.releasePermit("ns2"); + controller.releasePermit(CONCURRENT_NS); + } + + // Inject dummy metrics + // Split half half for ns1 and concurrent + Map<String, LongAdder> rejectedPermitsPerNs = new HashMap<>(); + Map<String, LongAdder> acceptedPermitsPerNs = new HashMap<>(); + injectDummyMetrics(rejectedPermitsPerNs, "ns1", 10); + injectDummyMetrics(rejectedPermitsPerNs, "ns2", 0); + injectDummyMetrics(rejectedPermitsPerNs, CONCURRENT_NS, 10); + controller.setMetrics(rejectedPermitsPerNs, acceptedPermitsPerNs); + + if (manualRefresh) { + controller.refreshPermitsCap(); + } else { + Thread.sleep(5000); Review Comment: Better to use GenericTestUtils.waitFor instead of sleep. ########## hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestDynamicRouterRpcFairnessPolicyController.java: ########## @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.fairness; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.LongAdder; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; + +import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE; + +/** + * Test functionality of {@link DynamicRouterRpcFairnessPolicyController). + */ +public class TestDynamicRouterRpcFairnessPolicyController { + + private static String nameServices = "ns1.nn1, ns1.nn2, ns2.nn1, ns2.nn2"; + + @Test + public void testDynamicControllerSimple() throws InterruptedException { + verifyDynamicControllerSimple(true); + verifyDynamicControllerSimple(false); + } + + @Test + public void testDynamicControllerAllPermitsAcquired() throws InterruptedException { + verifyDynamicControllerAllPermitsAcquired(true); + verifyDynamicControllerAllPermitsAcquired(false); + } + + private void verifyDynamicControllerSimple(boolean manualRefresh) + throws InterruptedException { + // 3 permits each ns + DynamicRouterRpcFairnessPolicyController controller; + if (manualRefresh) { + controller = getFairnessPolicyController(9); + } else { + controller = getFairnessPolicyController(9, 4000); + } + for (int i = 0; i < 3; i++) { + Assert.assertTrue(controller.acquirePermit("ns1")); + Assert.assertTrue(controller.acquirePermit("ns2")); + Assert.assertTrue(controller.acquirePermit(CONCURRENT_NS)); + } + Assert.assertFalse(controller.acquirePermit("ns1")); + Assert.assertFalse(controller.acquirePermit("ns2")); + Assert.assertFalse(controller.acquirePermit(CONCURRENT_NS)); + + // Release all permits + for (int i = 0; i < 3; i++) { + controller.releasePermit("ns1"); + controller.releasePermit("ns2"); + controller.releasePermit(CONCURRENT_NS); + } + + // Inject dummy metrics + // Split half half for ns1 and concurrent + Map<String, LongAdder> rejectedPermitsPerNs = new HashMap<>(); + Map<String, LongAdder> acceptedPermitsPerNs = new HashMap<>(); + injectDummyMetrics(rejectedPermitsPerNs, "ns1", 10); + injectDummyMetrics(rejectedPermitsPerNs, "ns2", 0); + injectDummyMetrics(rejectedPermitsPerNs, CONCURRENT_NS, 10); + controller.setMetrics(rejectedPermitsPerNs, acceptedPermitsPerNs); + + if (manualRefresh) { + controller.refreshPermitsCap(); + } else { + Thread.sleep(5000); + } + + // Current permits count should be 5:1:5 Review Comment: Total old permits don't equal to total new permits Issue Time Tracking ------------------- Worklog Id: (was: 767107) Time Spent: 0.5h (was: 20m) > RBF: Improved isolation for downstream name nodes. {Dynamic} > ------------------------------------------------------------ > > Key: HDFS-14750 > URL: https://issues.apache.org/jira/browse/HDFS-14750 > Project: Hadoop HDFS > Issue Type: Improvement > Reporter: CR Hota > Assignee: CR Hota > Priority: Major > Labels: pull-request-available > Time Spent: 0.5h > Remaining Estimate: 0h > > This Jira tracks the work around dynamic allocation of resources in routers > for downstream hdfs clusters. -- This message was sent by Atlassian Jira (v8.20.7#820007) --------------------------------------------------------------------- To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org