merlimat closed pull request #1157: Allow to override the auto-detected NIC speed limit URL: https://github.com/apache/incubator-pulsar/pull/1157
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/conf/broker.conf b/conf/broker.conf index 7f9dd3396..979f4e2d9 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -355,6 +355,14 @@ loadBalancerNamespaceBundleMaxBandwidthMbytes=100 # maximum number of bundles in a namespace loadBalancerNamespaceMaximumBundles=128 +# Override the auto-detection of the network interfaces max speed. +# This option is useful in some environments (eg: EC2 VMs) where the max speed +# reported by Linux is not reflecting the real bandwidth available to the broker. +# Since the network usage is employed by the load manager to decide when a broker +# is overloaded, it is important to make sure the info is correct or override it +# with the right value here. +loadBalancerOverrideBrokerNicSpeedGbps= + # Name of load manager to use loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 88117e077..0cc40011b 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -18,10 +18,9 @@ */ package org.apache.pulsar.broker; -import java.lang.reflect.Field; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.Properties; import java.util.Set; @@ -349,6 +348,9 @@ @FieldContext(dynamic = true) private String loadManagerClassName = "org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl"; + // Option to override the auto-detected network interfaces max speed + private Integer loadBalancerOverrideBrokerNicSpeedGbps; + /**** --- Replication --- ****/ // Enable replication metrics private boolean replicationMetricsEnabled = false; @@ -1218,6 +1220,14 @@ public int getLoadBalancerNamespaceMaximumBundles() { return this.loadBalancerNamespaceMaximumBundles; } + public Optional<Integer> getLoadBalancerOverrideBrokerNicSpeedGbps() { + return Optional.ofNullable(loadBalancerOverrideBrokerNicSpeedGbps); + } + + public void setLoadBalancerOverrideBrokerNicSpeedGbps(int loadBalancerOverrideBrokerNicSpeedGbps) { + this.loadBalancerOverrideBrokerNicSpeedGbps = loadBalancerOverrideBrokerNicSpeedGbps; + } + public boolean isReplicationMetricsEnabled() { return replicationMetricsEnabled; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java index 4b62f84a7..7cdf84c30 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java @@ -18,15 +18,6 @@ */ package org.apache.pulsar.broker.loadbalance.impl; -import com.sun.management.OperatingSystemMXBean; - -import org.apache.pulsar.broker.PulsarService; -import org.apache.pulsar.broker.loadbalance.BrokerHostUsage; -import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; -import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.lang.management.ManagementFactory; import java.nio.file.Files; @@ -35,10 +26,20 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.loadbalance.BrokerHostUsage; +import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; +import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.sun.management.OperatingSystemMXBean; + /** * Class that will return the broker host usage. * @@ -54,6 +55,8 @@ private OperatingSystemMXBean systemBean; private SystemResourceUsage usage; + private final Optional<Integer> overrideBrokerNicSpeedGbps; + private static final Logger LOG = LoggerFactory.getLogger(LinuxBrokerHostUsageImpl.class); public LinuxBrokerHostUsageImpl(PulsarService pulsar) { @@ -61,6 +64,7 @@ public LinuxBrokerHostUsageImpl(PulsarService pulsar) { this.systemBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean(); this.lastCollection = 0L; this.usage = new SystemResourceUsage(); + this.overrideBrokerNicSpeedGbps = pulsar.getConfiguration().getLoadBalancerOverrideBrokerNicSpeedGbps(); pulsar.getLoadManagerExecutor().scheduleAtFixedRate(this::calculateBrokerHostUsage, 0, hostUsageCheckIntervalMin, TimeUnit.MINUTES); } @@ -117,12 +121,12 @@ private double getTotalCpuLimit() { /** * Reads first line of /proc/stat to get total cpu usage. - * + * * <pre> * cpu user nice system idle iowait irq softirq steal guest guest_nice * cpu 317808 128 58637 2503692 7634 0 13472 0 0 0 * </pre> - * + * * Line is split in "words", filtering the first. The sum of all numbers give the amount of cpu cycles used this * far. Real CPU usage should equal the sum substracting the idle cycles, this would include iowait, irq and steal. */ @@ -175,6 +179,12 @@ private Path getNicSpeedPath(String nic) { } private double getTotalNicLimitKbps(List<String> nics) { + if (overrideBrokerNicSpeedGbps.isPresent()) { + // Use the override value as configured. Return the total max speed across all available NICs, converted + // from Gbps into Kbps + return ((double) overrideBrokerNicSpeedGbps.get()) * nics.size() * 1024 * 1024; + } + // Nic speed is in Mbits/s, return kbits/s return nics.stream().mapToDouble(s -> { try { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadReportNetworkLimit.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadReportNetworkLimit.java new file mode 100644 index 000000000..7f8324731 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadReportNetworkLimit.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance; + +import org.apache.commons.lang3.SystemUtils; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import static org.testng.Assert.*; + +public class LoadReportNetworkLimit extends MockedPulsarServiceBaseTest { + + @BeforeClass + @Override + public void setup() throws Exception { + conf.setLoadBalancerEnabled(true); + conf.setLoadBalancerOverrideBrokerNicSpeedGbps(5); + super.internalSetup(); + } + + @AfterClass + @Override + public void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void checkLoadReportNicSpeed() throws Exception { + // Since we have overridden the NIC speed in the configuration, the load report for the broker should always + + LoadManagerReport report = admin.brokerStats().getLoadReport(); + + if (SystemUtils.IS_OS_LINUX) { + assertEquals(report.getBandwidthIn().limit, 5.0 * 1024 * 1024); + assertEquals(report.getBandwidthOut().limit, 5.0 * 1024 * 1024); + } else { + // On non-Linux system we don't report the network usage + assertEquals(report.getBandwidthIn().limit, -1.0); + assertEquals(report.getBandwidthOut().limit, -1.0); + } + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java index f19803cbf..bad061c81 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java @@ -29,6 +29,7 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.nio.charset.StandardCharsets; +import java.util.Optional; import java.util.Properties; import org.apache.pulsar.broker.ServiceConfiguration; @@ -36,7 +37,7 @@ import org.testng.annotations.Test; /** - * + * * */ public class ServiceConfigurationTest { @@ -45,7 +46,7 @@ /** * test {@link ServiceConfiguration} initialization - * + * * @throws Exception */ @Test @@ -59,9 +60,25 @@ public void testInit() throws Exception { assertEquals(config.getBootstrapNamespaces().get(1), "ns2"); } + @Test + public void testOptionalSettingEmpty() throws Exception { + String confFile = "loadBalancerOverrideBrokerNicSpeedGbps=\n"; + InputStream stream = new ByteArrayInputStream(confFile.getBytes()); + final ServiceConfiguration config = PulsarConfigurationLoader.create(stream, ServiceConfiguration.class); + assertEquals(config.getLoadBalancerOverrideBrokerNicSpeedGbps(), Optional.empty()); + } + + @Test + public void testOptionalSettingPresent() throws Exception { + String confFile = "loadBalancerOverrideBrokerNicSpeedGbps=5\n"; + InputStream stream = new ByteArrayInputStream(confFile.getBytes()); + final ServiceConfiguration config = PulsarConfigurationLoader.create(stream, ServiceConfiguration.class); + assertEquals(config.getLoadBalancerOverrideBrokerNicSpeedGbps(), Optional.of(5)); + } + /** * test {@link ServiceConfiguration} with incorrect values. - * + * * @throws Exception */ @Test(expectedExceptions = IllegalArgumentException.class) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java index c7bab5f3a..ebac11186 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java @@ -18,8 +18,6 @@ */ package org.apache.pulsar.common.util; -import org.apache.pulsar.common.policies.data.AuthAction; - import static com.google.common.base.Preconditions.checkNotNull; import static java.lang.String.format; @@ -33,6 +31,8 @@ import java.util.Set; import java.util.stream.Collectors; +import io.netty.util.internal.StringUtil; + /** * * Generic value converter. @@ -192,7 +192,12 @@ private static void initWrappers() { * @return The converted Integer value. */ public static Integer stringToInteger(String val) { - return Integer.valueOf(trim(val)); + String v = trim(val); + if (StringUtil.isNullOrEmpty(v)) { + return null; + } else { + return Integer.valueOf(v); + } } /** ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services