This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a272c5b76a0 [improve] [broker] Add overrideBrokerNics for adaptation
of heterogeneous network environments (#24883)
a272c5b76a0 is described below
commit a272c5b76a03631c0157d0771e34af8981a81874
Author: dong_zhong_hua <[email protected]>
AuthorDate: Tue Dec 2 14:18:12 2025 +0800
[improve] [broker] Add overrideBrokerNics for adaptation of heterogeneous
network environments (#24883)
Co-authored-by: dongzhonghua03 <[email protected]>
---
.../apache/pulsar/broker/ServiceConfiguration.java | 6 +++-
.../loadbalance/impl/LinuxBrokerHostUsageImpl.java | 6 +++-
.../pulsar/broker/tools/LoadReportCommand.java | 3 +-
.../impl/LinuxBrokerHostUsageImplTest.java | 40 ++++++++++++++++++++--
4 files changed, 50 insertions(+), 5 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index a3183316ccf..c6bedcfd01d 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2994,7 +2994,11 @@ public class ServiceConfiguration implements
PulsarConfiguration {
doc = "Option to override the auto-detected network interfaces max
speed"
)
private Optional<Double> loadBalancerOverrideBrokerNicSpeedGbps =
Optional.empty();
-
+ @FieldContext(
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Option to override the auto-detected network interfaces"
+ )
+ private List<String> loadBalancerOverrideBrokerNics = new ArrayList<>();
@FieldContext(
category = CATEGORY_LOAD_BALANCER,
dynamic = true,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
index 6d0e6bb9073..86634c4fdb2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
@@ -56,23 +56,27 @@ public class LinuxBrokerHostUsageImpl implements
BrokerHostUsage {
private OperatingSystemMXBean systemBean;
private SystemResourceUsage usage;
private final Optional<Double> overrideBrokerNicSpeedGbps;
+ private final List<String> overrideBrokerNics;
private final boolean isCGroupsEnabled;
public LinuxBrokerHostUsageImpl(PulsarService pulsar) {
this(
pulsar.getConfiguration().getLoadBalancerHostUsageCheckIntervalMinutes(),
pulsar.getConfiguration().getLoadBalancerOverrideBrokerNicSpeedGbps(),
+ pulsar.getConfiguration().getLoadBalancerOverrideBrokerNics(),
pulsar.getLoadManagerExecutor()
);
}
public LinuxBrokerHostUsageImpl(int hostUsageCheckIntervalMin,
Optional<Double>
overrideBrokerNicSpeedGbps,
+ List<String> overrideBrokerNics,
ScheduledExecutorService executorService) {
this.systemBean = (OperatingSystemMXBean)
ManagementFactory.getOperatingSystemMXBean();
this.lastCollection = 0L;
this.usage = new SystemResourceUsage();
this.overrideBrokerNicSpeedGbps = overrideBrokerNicSpeedGbps;
+ this.overrideBrokerNics = overrideBrokerNics;
this.isCGroupsEnabled = isCGroupEnabled();
// Call now to initialize values before the constructor returns
calculateBrokerHostUsage();
@@ -88,7 +92,7 @@ public class LinuxBrokerHostUsageImpl implements
BrokerHostUsage {
@Override
public void calculateBrokerHostUsage() {
- List<String> nics = getUsablePhysicalNICs();
+ List<String> nics = !overrideBrokerNics.isEmpty() ? overrideBrokerNics
: getUsablePhysicalNICs();
double totalNicLimit = getTotalNicLimitWithConfiguration(nics);
double totalNicUsageTx = getTotalNicUsage(nics, NICUsageType.TX,
BitRateUnit.Kilobit);
double totalNicUsageRx = getTotalNicUsage(nics, NICUsageType.RX,
BitRateUnit.Kilobit);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/tools/LoadReportCommand.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/tools/LoadReportCommand.java
index f1f4a917571..bc240a05d29 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/tools/LoadReportCommand.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/tools/LoadReportCommand.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.tools;
+import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
@@ -67,7 +68,7 @@ public class LoadReportCommand implements Callable<Integer> {
try {
if (isLinux) {
hostUsage = new LinuxBrokerHostUsageImpl(
- Integer.MAX_VALUE, Optional.empty(), scheduler
+ Integer.MAX_VALUE, Optional.empty(), new ArrayList<>(),
scheduler
);
} else {
hostUsage = new GenericBrokerHostUsageImpl(
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImplTest.java
index 563f707c445..f6d0a53d8c2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImplTest.java
@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -26,7 +29,11 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LinuxInfoUtils;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -38,7 +45,7 @@ public class LinuxBrokerHostUsageImplTest {
@Cleanup("shutdown")
ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
LinuxBrokerHostUsageImpl linuxBrokerHostUsage =
- new LinuxBrokerHostUsageImpl(1, Optional.of(3.0),
executorService);
+ new LinuxBrokerHostUsageImpl(1, Optional.of(3.0), new
ArrayList<>(), executorService);
List<String> nics = new ArrayList<>();
nics.add("1");
nics.add("2");
@@ -47,6 +54,34 @@ public class LinuxBrokerHostUsageImplTest {
Assert.assertEquals(totalLimit, 3.0 * 1000 * 1000 * 3);
}
+ @Test
+ public void checkOverrideBrokerNics() {
+ try (MockedStatic<LinuxInfoUtils> mockedUtils =
Mockito.mockStatic(LinuxInfoUtils.class)) {
+ mockedUtils.when(() -> LinuxInfoUtils.getTotalNicUsage(any(),
any(), any())).thenReturn(3.0d);
+
mockedUtils.when(LinuxInfoUtils::getCpuUsageForEntireHost).thenReturn(LinuxInfoUtils.ResourceUsage.empty());
+ List<String> nics = new ArrayList<>();
+ nics.add("1");
+ nics.add("2");
+ nics.add("3");
+ ServiceConfiguration config = new ServiceConfiguration();
+
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(3.0d));
+ config.setLoadBalancerOverrideBrokerNics(nics);
+ PulsarService pulsarService = mock(PulsarService.class);
+ when(pulsarService.getConfiguration()).thenReturn(config);
+ @Cleanup("shutdown")
+ ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
+
when(pulsarService.getLoadManagerExecutor()).thenReturn(executorService);
+ LinuxBrokerHostUsageImpl linuxBrokerHostUsage = new
LinuxBrokerHostUsageImpl(pulsarService);
+ linuxBrokerHostUsage.calculateBrokerHostUsage();
+ double totalLimit =
linuxBrokerHostUsage.getTotalNicLimitWithConfiguration(nics);
+ Assert.assertEquals(totalLimit, 3.0 * 1000 * 1000 * 3);
+ double totalNicLimitRx =
linuxBrokerHostUsage.getBrokerHostUsage().getBandwidthIn().limit;
+ double totalNicLimitTx =
linuxBrokerHostUsage.getBrokerHostUsage().getBandwidthOut().limit;
+ Assert.assertEquals(totalNicLimitRx, 3.0 * 1000 * 1000 * 3);
+ Assert.assertEquals(totalNicLimitTx, 3.0 * 1000 * 1000 * 3);
+ }
+ }
+
@Test
public void testCpuUsage() throws InterruptedException {
if (!LinuxInfoUtils.isLinux()) {
@@ -56,7 +91,8 @@ public class LinuxBrokerHostUsageImplTest {
@Cleanup("shutdown")
ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
LinuxBrokerHostUsageImpl linuxBrokerHostUsage =
- new LinuxBrokerHostUsageImpl(Integer.MAX_VALUE,
Optional.empty(), executorService);
+ new LinuxBrokerHostUsageImpl(Integer.MAX_VALUE,
Optional.empty(),
+ new ArrayList<>(), executorService);
linuxBrokerHostUsage.calculateBrokerHostUsage();
TimeUnit.SECONDS.sleep(1);