This is an automated email from the ASF dual-hosted git repository.
zhongjiajie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new cbd354d134 Add host/port in heartbeat (#14591)
cbd354d134 is described below
commit cbd354d134ca32194c91c85f8ea3ffe615bd5536
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu Jul 20 21:26:35 2023 +0800
Add host/port in heartbeat (#14591)
---
docs/docs/en/architecture/configuration.md | 1 +
docs/docs/zh/architecture/configuration.md | 2 +
.../alert/registry/AlertHeartbeatTask.java | 4 +-
.../common/model/AlertServerHeartBeat.java | 4 +-
.../dolphinscheduler/common/model/HeartBeat.java | 5 +
.../common/model/MasterHeartBeat.java | 3 +
.../common/model/WorkerHeartBeat.java | 3 +
.../dolphinscheduler/common/utils/NetUtils.java | 284 +++++++++++----------
.../common/utils/NetUtilsTest.java | 19 +-
.../server/master/task/MasterHeartBeatTask.java | 3 +
.../registry/api/RegistryClient.java | 14 +-
.../server/worker/task/WorkerHeartBeatTask.java | 3 +
12 files changed, 200 insertions(+), 145 deletions(-)
diff --git a/docs/docs/en/architecture/configuration.md
b/docs/docs/en/architecture/configuration.md
index a9316aa9f8..08b6ac4011 100644
--- a/docs/docs/en/architecture/configuration.md
+++ b/docs/docs/en/architecture/configuration.md
@@ -129,6 +129,7 @@ export DOLPHINSCHEDULER_OPTS="
```
> "-XX:DisableExplicitGC" is not recommended due to may lead to memory link
> (DolphinScheduler dependent on Netty to communicate).
+> If add "-Djava.net.preferIPv6Addresses=true" will use ipv6 address, if add
"-Djava.net.preferIPv4Addresses=true" will use ipv4 address, if doesn't set the
two parameter will use ipv4 or ipv6.
### Database connection related configuration
diff --git a/docs/docs/zh/architecture/configuration.md
b/docs/docs/zh/architecture/configuration.md
index f50dae2f8b..5a39ff725c 100644
--- a/docs/docs/zh/architecture/configuration.md
+++ b/docs/docs/zh/architecture/configuration.md
@@ -129,6 +129,8 @@ export DOLPHINSCHEDULER_OPTS="
```
> 不建议设置"-XX:DisableExplicitGC" , DolphinScheduler使用Netty进行通讯,设置该参数,可能会导致内存泄漏.
+>
+>> 如果设置"-Djava.net.preferIPv6Addresses=true" 将会使用ipv6的IP地址,
如果设置"-Djava.net.preferIPv4Addresses=true"将会使用ipv4的IP地址, 如果都不设置,将会随机使用ipv4或者ipv6.
## 数据库连接相关配置
diff --git
a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/registry/AlertHeartbeatTask.java
b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/registry/AlertHeartbeatTask.java
index 31b30ae42b..150b8dbe0c 100644
---
a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/registry/AlertHeartbeatTask.java
+++
b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/registry/AlertHeartbeatTask.java
@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.common.model.AlertServerHeartBeat;
import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
@@ -59,7 +60,8 @@ public class AlertHeartbeatTask extends
BaseHeartBeatTask<AlertServerHeartBeat>
.cpuUsage(OSUtils.cpuUsagePercentage())
.memoryUsage(OSUtils.memoryUsagePercentage())
.availablePhysicalMemorySize(OSUtils.availablePhysicalMemorySize())
- .alertServerAddress(alertConfig.getAlertServerAddress())
+ .host(NetUtils.getHost())
+ .port(alertConfig.getPort())
.build();
}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/AlertServerHeartBeat.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/AlertServerHeartBeat.java
index 4d3610b7fe..9273b77a1c 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/AlertServerHeartBeat.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/AlertServerHeartBeat.java
@@ -34,5 +34,7 @@ public class AlertServerHeartBeat implements HeartBeat {
private double cpuUsage;
private double memoryUsage;
private double availablePhysicalMemorySize;
- private String alertServerAddress;
+
+ private String host;
+ private int port;
}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/HeartBeat.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/HeartBeat.java
index 1a1d1610e4..218afa245e 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/HeartBeat.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/HeartBeat.java
@@ -18,4 +18,9 @@
package org.apache.dolphinscheduler.common.model;
public interface HeartBeat {
+
+ String getHost();
+
+ int getPort();
+
}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
index b6086b9bbe..52c96defc6 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
@@ -36,4 +36,7 @@ public class MasterHeartBeat implements HeartBeat {
private double reservedMemory;
private double diskAvailable;
private int processId;
+
+ private String host;
+ private int port;
}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java
index dcacd75c6c..d3843d2783 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java
@@ -39,6 +39,9 @@ public class WorkerHeartBeat implements HeartBeat {
private int serverStatus;
private int processId;
+ private String host;
+ private int port;
+
private int workerHostWeight; // worker host weight
private int workerWaitingTaskCount; // worker waiting task count
private int workerExecThreadCount; // worker thread pool thread count
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/NetUtils.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/NetUtils.java
index da265219d8..161ee4698d 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/NetUtils.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/NetUtils.java
@@ -17,24 +17,25 @@
package org.apache.dolphinscheduler.common.utils;
-import static java.util.Collections.emptyList;
-
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.http.conn.util.InetAddressUtils;
import java.io.IOException;
+import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Enumeration;
import java.util.LinkedList;
import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
+import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
@@ -106,7 +107,8 @@ public class NetUtils {
if (null != LOCAL_ADDRESS) {
return LOCAL_ADDRESS;
}
- return getLocalAddress0();
+ LOCAL_ADDRESS = getLocalAddress0();
+ return LOCAL_ADDRESS;
}
/**
@@ -115,55 +117,12 @@ public class NetUtils {
* @return first valid local IP
*/
private static synchronized InetAddress getLocalAddress0() {
- if (null != LOCAL_ADDRESS) {
- return LOCAL_ADDRESS;
+ List<NetworkInterface> suitableNetworkInterface =
findSuitableNetworkInterface();
+ List<InetAddress> suitableInetAddress =
findSuitableInetAddress(suitableNetworkInterface);
+ if (CollectionUtils.isEmpty(suitableInetAddress)) {
+ return null;
}
-
- InetAddress localAddress = null;
- try {
- Optional<NetworkInterface> networkInterface =
findNetworkInterface();
- if (networkInterface.isPresent()) {
- Enumeration<InetAddress> addresses =
networkInterface.get().getInetAddresses();
- while (addresses.hasMoreElements()) {
- Optional<InetAddress> addressOp =
toValidAddress(addresses.nextElement());
- if (addressOp.isPresent()) {
- try {
- if (addressOp.get().isReachable(200)) {
- LOCAL_ADDRESS = addressOp.get();
- return LOCAL_ADDRESS;
- }
- } catch (IOException e) {
- log.warn("test address id reachable io exception",
e);
- }
- }
- }
-
- }
- localAddress = InetAddress.getLocalHost();
- } catch (UnknownHostException e) {
- log.warn("InetAddress get LocalHost exception", e);
- }
- Optional<InetAddress> addressOp = toValidAddress(localAddress);
- if (addressOp.isPresent()) {
- LOCAL_ADDRESS = addressOp.get();
- }
- return LOCAL_ADDRESS;
- }
-
- private static Optional<InetAddress> toValidAddress(InetAddress address) {
- if (address instanceof Inet6Address) {
- Inet6Address v6Address = (Inet6Address) address;
- if (isPreferIPV6Address()) {
- InetAddress inetAddress = normalizeV6Address(v6Address);
- log.debug("The host prefer ipv6 address, will use ipv6
address: {} directly", inetAddress);
- return Optional.ofNullable(inetAddress);
- }
- }
- if (isValidV4Address(address)) {
- return Optional.of(address);
- }
- log.warn("The address of the host is invalid, address: {}", address);
- return Optional.empty();
+ return suitableInetAddress.get(0);
}
private static InetAddress normalizeV6Address(Inet6Address address) {
@@ -179,9 +138,8 @@ public class NetUtils {
return address;
}
- public static boolean isValidV4Address(InetAddress address) {
-
- if (address == null || address.isLoopbackAddress()) {
+ protected static boolean isValidV4Address(InetAddress address) {
+ if (!(address instanceof Inet4Address)) {
return false;
}
String name = address.getHostAddress();
@@ -191,6 +149,17 @@ public class NetUtils {
&& !address.isLoopbackAddress());
}
+ protected static boolean isValidV6Address(InetAddress address) {
+ if (!(address instanceof Inet6Address)) {
+ return false;
+ }
+ String name = address.getHostAddress();
+ return (name != null
+ && InetAddressUtils.isIPv6Address(name)
+ && !address.isAnyLocalAddress()
+ && !address.isLoopbackAddress());
+ }
+
/**
* Check if an ipv6 address
*
@@ -200,79 +169,142 @@ public class NetUtils {
return Boolean.getBoolean("java.net.preferIPv6Addresses");
}
+ private static boolean isPreferIPV4Address() {
+ return Boolean.getBoolean("java.net.preferIPv4Addresses");
+ }
+
/**
* Get the suitable {@link NetworkInterface}
- *
- * @return If no {@link NetworkInterface} is available , return
<code>null</code>
*/
- private static Optional<NetworkInterface> findNetworkInterface() {
-
- List<NetworkInterface> validNetworkInterfaces = emptyList();
+ private static List<NetworkInterface> findSuitableNetworkInterface() {
+ // Find all network interfaces
+ List<NetworkInterface> networkInterfaces = Collections.emptyList();
try {
- validNetworkInterfaces = getValidNetworkInterfaces();
+ networkInterfaces = getAllNetworkInterfaces();
} catch (SocketException e) {
log.warn("ValidNetworkInterfaces exception", e);
}
- if (CollectionUtils.isEmpty(validNetworkInterfaces)) {
- log.warn("ValidNetworkInterfaces is empty");
- return Optional.empty();
- }
- // Try to specify config NetWork Interface
- Optional<NetworkInterface> specifyNetworkInterface =
-
validNetworkInterfaces.stream().filter(NetUtils::isSpecifyNetworkInterface).findFirst();
- if (specifyNetworkInterface.isPresent()) {
- log.info("Use specified NetworkInterface: {}",
specifyNetworkInterface.get());
- return specifyNetworkInterface;
+ // Filter the loopback/virtual/ network interfaces
+ List<NetworkInterface> validNetworkInterfaces = networkInterfaces
+ .stream()
+ .filter(networkInterface -> {
+ try {
+ return !(networkInterface == null
+ || networkInterface.isLoopback()
+ || networkInterface.isVirtual()
+ || !networkInterface.isUp());
+ } catch (SocketException e) {
+ log.warn("ValidNetworkInterfaces exception", e);
+ return false;
+ }
+ })
+ .collect(Collectors.toList());
+
+ // Use the specified network interface if set
+ if (StringUtils.isNotBlank(specifyNetworkInterfaceName())) {
+ String specifyNetworkInterfaceName = specifyNetworkInterfaceName();
+ validNetworkInterfaces = validNetworkInterfaces.stream()
+ .filter(networkInterface ->
specifyNetworkInterfaceName.equals(networkInterface.getDisplayName()))
+ .collect(Collectors.toList());
+ if (CollectionUtils.isEmpty(validNetworkInterfaces)) {
+ throw new IllegalArgumentException(
+ "The specified network interface: " +
specifyNetworkInterfaceName + " is not found");
+ }
}
- return findAddress(validNetworkInterfaces);
+ return filterByNetworkPriority(validNetworkInterfaces);
}
/**
- * Get the valid {@link NetworkInterface network interfaces}
- *
- * @throws SocketException SocketException if an I/O error occurs.
+ * Get the suitable {@link InetAddress}
*/
- private static List<NetworkInterface> getValidNetworkInterfaces() throws
SocketException {
+ private static List<InetAddress>
findSuitableInetAddress(List<NetworkInterface> networkInterfaces) {
+ if (CollectionUtils.isEmpty(networkInterfaces)) {
+ return Collections.emptyList();
+ }
+ List<InetAddress> allInetAddresses = new LinkedList<>();
+ for (NetworkInterface networkInterface : networkInterfaces) {
+ Enumeration<InetAddress> addresses =
networkInterface.getInetAddresses();
+ while (addresses.hasMoreElements()) {
+ allInetAddresses.add(addresses.nextElement());
+ }
+ }
+ // Get prefer addresses
+ List<InetAddress> preferInetAddress = new ArrayList<>();
+ if (!isPreferIPV6Address() && !isPreferIPV4Address()) {
+ // no prefer, will use all addresses
+ preferInetAddress.addAll(getIpv4Addresses(allInetAddresses));
+ preferInetAddress.addAll(getIpv6Addresses(allInetAddresses));
+ }
+ if (isPreferIPV4Address()) {
+ preferInetAddress.addAll(getIpv4Addresses(allInetAddresses));
+ }
+ if (isPreferIPV6Address()) {
+ preferInetAddress.addAll(getIpv6Addresses(allInetAddresses));
+ }
+ // Get reachable addresses
+ return preferInetAddress.stream()
+ .filter(inetAddress -> {
+ try {
+ return inetAddress.isReachable(100);
+ } catch (IOException e) {
+ log.warn("InetAddress isReachable exception", e);
+ return false;
+ }
+ }).collect(Collectors.toList());
+ }
+
+ private static List<InetAddress> getIpv4Addresses(List<InetAddress>
allInetAddresses) {
+ if (CollectionUtils.isEmpty(allInetAddresses)) {
+ return Collections.emptyList();
+ }
+ List<InetAddress> validIpv4Addresses = new ArrayList<>();
+ for (InetAddress inetAddress : allInetAddresses) {
+ if (isValidV4Address(inetAddress)) {
+ validIpv4Addresses.add(inetAddress);
+ }
+ }
+ return validIpv4Addresses;
+ }
+
+ private static List<InetAddress> getIpv6Addresses(List<InetAddress>
allInetAddresses) {
+ if (CollectionUtils.isEmpty(allInetAddresses)) {
+ return Collections.emptyList();
+ }
+ List<InetAddress> validIpv6Addresses = new ArrayList<>();
+ for (InetAddress inetAddress : allInetAddresses) {
+ if (!isValidV6Address(inetAddress)) {
+ continue;
+ }
+ Inet6Address v6Address = (Inet6Address) inetAddress;
+ InetAddress normalizedV6Address = normalizeV6Address(v6Address);
+ validIpv6Addresses.add(normalizedV6Address);
+ }
+ return validIpv6Addresses;
+ }
+
+ private static List<NetworkInterface> getAllNetworkInterfaces() throws
SocketException {
List<NetworkInterface> validNetworkInterfaces = new LinkedList<>();
Enumeration<NetworkInterface> interfaces =
NetworkInterface.getNetworkInterfaces();
while (interfaces.hasMoreElements()) {
NetworkInterface networkInterface = interfaces.nextElement();
- // ignore
- if (ignoreNetworkInterface(networkInterface)) {
- log.debug("Info NetworkInterface: {}", networkInterface);
- continue;
- }
- log.info("Found valid NetworkInterface: {}", networkInterface);
+ log.info("Found NetworkInterface: {}", networkInterface);
validNetworkInterfaces.add(networkInterface);
}
return validNetworkInterfaces;
}
- /**
- * @param networkInterface {@link NetworkInterface}
- * @return if the specified {@link NetworkInterface} should be ignored,
return <code>true</code>
- * @throws SocketException SocketException if an I/O error occurs.
- */
- public static boolean ignoreNetworkInterface(NetworkInterface
networkInterface) throws SocketException {
- return networkInterface == null
- || networkInterface.isLoopback()
- || networkInterface.isVirtual()
- || !networkInterface.isUp();
- }
-
- private static boolean isSpecifyNetworkInterface(NetworkInterface
networkInterface) {
- String preferredNetworkInterface =
-
PropertyUtils.getString(Constants.DOLPHIN_SCHEDULER_NETWORK_INTERFACE_PREFERRED,
-
System.getProperty(Constants.DOLPHIN_SCHEDULER_NETWORK_INTERFACE_PREFERRED));
- return Objects.equals(networkInterface.getDisplayName(),
preferredNetworkInterface);
+ private static String specifyNetworkInterfaceName() {
+ return PropertyUtils.getString(
+ Constants.DOLPHIN_SCHEDULER_NETWORK_INTERFACE_PREFERRED,
+
System.getProperty(Constants.DOLPHIN_SCHEDULER_NETWORK_INTERFACE_PREFERRED));
}
- private static Optional<NetworkInterface>
findAddress(List<NetworkInterface> validNetworkInterfaces) {
+ private static List<NetworkInterface>
filterByNetworkPriority(List<NetworkInterface> validNetworkInterfaces) {
if (CollectionUtils.isEmpty(validNetworkInterfaces)) {
- return Optional.empty();
+ return Collections.emptyList();
}
String networkPriority =
PropertyUtils.getString(Constants.DOLPHIN_SCHEDULER_NETWORK_PRIORITY_STRATEGY,
NETWORK_PRIORITY_DEFAULT);
@@ -282,28 +314,21 @@ public class NetUtils {
return findAddressByDefaultPolicy(validNetworkInterfaces);
case NETWORK_PRIORITY_INNER:
log.debug("Use inner NetworkInterface acquisition policy");
- return findInnerAddress(validNetworkInterfaces);
+ return
findInnerAddressNetWorkInterface(validNetworkInterfaces);
case NETWORK_PRIORITY_OUTER:
log.debug("Use outer NetworkInterface acquisition policy");
- return findOuterAddress(validNetworkInterfaces);
+ return
findOuterAddressNetworkInterface(validNetworkInterfaces);
default:
log.error("There is no matching network card acquisition
policy!");
- return Optional.empty();
+ return Collections.emptyList();
}
}
- private static Optional<NetworkInterface>
findAddressByDefaultPolicy(List<NetworkInterface> validNetworkInterfaces) {
- Optional<NetworkInterface> innerAddress =
findInnerAddress(validNetworkInterfaces);
- if (innerAddress.isPresent()) {
- log.debug("Found inner NetworkInterface: {}", innerAddress.get());
- return innerAddress;
- }
- Optional<NetworkInterface> outerAddress =
findOuterAddress(validNetworkInterfaces);
- if (outerAddress.isPresent()) {
- log.debug("Found outer NetworkInterface: {}", outerAddress.get());
- return outerAddress;
- }
- return Optional.empty();
+ private static List<NetworkInterface>
findAddressByDefaultPolicy(List<NetworkInterface> validNetworkInterfaces) {
+ List<NetworkInterface> allAddress = new ArrayList<>();
+
allAddress.addAll(findInnerAddressNetWorkInterface(validNetworkInterfaces));
+
allAddress.addAll(findOuterAddressNetworkInterface(validNetworkInterfaces));
+ return allAddress;
}
/**
@@ -311,39 +336,40 @@ public class NetUtils {
*
* @return If no {@link NetworkInterface} is available , return
<code>null</code>
*/
- private static Optional<NetworkInterface>
findInnerAddress(List<NetworkInterface> validNetworkInterfaces) {
+ private static List<NetworkInterface>
findInnerAddressNetWorkInterface(List<NetworkInterface> validNetworkInterfaces)
{
if (CollectionUtils.isEmpty(validNetworkInterfaces)) {
- return Optional.empty();
+ return Collections.emptyList();
}
+ List<NetworkInterface> innerNetworkInterfaces = new ArrayList<>();
for (NetworkInterface ni : validNetworkInterfaces) {
Enumeration<InetAddress> address = ni.getInetAddresses();
while (address.hasMoreElements()) {
InetAddress ip = address.nextElement();
- if (ip.isSiteLocalAddress()
- && !ip.isLoopbackAddress()) {
- return Optional.of(ni);
+ if (ip.isSiteLocalAddress() && !ip.isLoopbackAddress()) {
+ innerNetworkInterfaces.add(ni);
}
}
}
- return Optional.empty();
+ return innerNetworkInterfaces;
}
- private static Optional<NetworkInterface>
findOuterAddress(List<NetworkInterface> validNetworkInterfaces) {
+ private static List<NetworkInterface>
findOuterAddressNetworkInterface(List<NetworkInterface> validNetworkInterfaces)
{
if (CollectionUtils.isEmpty(validNetworkInterfaces)) {
- return Optional.empty();
+ return Collections.emptyList();
}
+
+ List<NetworkInterface> outerNetworkInterfaces = new ArrayList<>();
for (NetworkInterface ni : validNetworkInterfaces) {
Enumeration<InetAddress> address = ni.getInetAddresses();
while (address.hasMoreElements()) {
InetAddress ip = address.nextElement();
- if (!ip.isSiteLocalAddress()
- && !ip.isLoopbackAddress()) {
- return Optional.of(ni);
+ if (!ip.isSiteLocalAddress() && !ip.isLoopbackAddress()) {
+ outerNetworkInterfaces.add(ni);
}
}
}
- return Optional.empty();
+ return outerNetworkInterfaces;
}
}
diff --git
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/NetUtilsTest.java
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/NetUtilsTest.java
index 8a17395784..f3e759e585 100644
---
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/NetUtilsTest.java
+++
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/NetUtilsTest.java
@@ -21,6 +21,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;
+import java.net.Inet4Address;
import java.net.InetAddress;
import org.junit.jupiter.api.Assertions;
@@ -84,33 +85,33 @@ public class NetUtilsTest {
@Test
public void testIsValidAddress() {
Assertions.assertFalse(NetUtils.isValidV4Address(null));
- InetAddress address = mock(InetAddress.class);
+ Inet4Address address = mock(Inet4Address.class);
when(address.isLoopbackAddress()).thenReturn(true);
Assertions.assertFalse(NetUtils.isValidV4Address(address));
- address = mock(InetAddress.class);
+ address = mock(Inet4Address.class);
when(address.getHostAddress()).thenReturn("localhost");
Assertions.assertFalse(NetUtils.isValidV4Address(address));
- address = mock(InetAddress.class);
+ address = mock(Inet4Address.class);
when(address.getHostAddress()).thenReturn("0.0.0.0");
when(address.isAnyLocalAddress()).thenReturn(true);
Assertions.assertFalse(NetUtils.isValidV4Address(address));
- address = mock(InetAddress.class);
+ address = mock(Inet4Address.class);
when(address.getHostAddress()).thenReturn("127.0.0.1");
when(address.isLoopbackAddress()).thenReturn(true);
Assertions.assertFalse(NetUtils.isValidV4Address(address));
- address = mock(InetAddress.class);
+ address = mock(Inet4Address.class);
when(address.getHostAddress()).thenReturn("1.2.3.4");
Assertions.assertTrue(NetUtils.isValidV4Address(address));
- address = mock(InetAddress.class);
+ address = mock(Inet4Address.class);
when(address.getHostAddress()).thenReturn("1.2.3.4:80");
Assertions.assertFalse(NetUtils.isValidV4Address(address));
- address = mock(InetAddress.class);
+ address = mock(Inet4Address.class);
when(address.getHostAddress()).thenReturn("256.0.0.1");
Assertions.assertFalse(NetUtils.isValidV4Address(address));
- address = mock(InetAddress.class);
+ address = mock(Inet4Address.class);
when(address.getHostAddress()).thenReturn("127.0.0.0.1");
Assertions.assertFalse(NetUtils.isValidV4Address(address));
- address = mock(InetAddress.class);
+ address = mock(Inet4Address.class);
when(address.getHostAddress()).thenReturn("-1.2.3.4");
Assertions.assertFalse(NetUtils.isValidV4Address(address));
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java
index a28b026192..824d4778ab 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java
@@ -21,6 +21,7 @@ import
org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask;
import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
@@ -59,6 +60,8 @@ public class MasterHeartBeatTask extends
BaseHeartBeatTask<MasterHeartBeat> {
.memoryUsage(OSUtils.memoryUsagePercentage())
.diskAvailable(OSUtils.diskAvailable())
.processId(processId)
+ .host(NetUtils.getHost())
+ .port(masterConfig.getListenPort())
.build();
}
diff --git
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java
index 6b8046b6e2..ddd80d94a1 100644
---
a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java
+++
b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/RegistryClient.java
@@ -90,12 +90,16 @@ public class RegistryClient {
server.setCreateTime(new
Date(masterHeartBeat.getStartupTime()));
server.setLastHeartbeatTime(new
Date(masterHeartBeat.getReportTime()));
server.setId(masterHeartBeat.getProcessId());
+ server.setHost(masterHeartBeat.getHost());
+ server.setPort(masterHeartBeat.getPort());
break;
case WORKER:
WorkerHeartBeat workerHeartBeat =
JSONUtils.parseObject(heartBeatJson, WorkerHeartBeat.class);
server.setCreateTime(new
Date(workerHeartBeat.getStartupTime()));
server.setLastHeartbeatTime(new
Date(workerHeartBeat.getReportTime()));
server.setId(workerHeartBeat.getProcessId());
+ server.setHost(workerHeartBeat.getHost());
+ server.setPort(workerHeartBeat.getPort());
break;
case ALERT_SERVER:
AlertServerHeartBeat alertServerHeartBeat =
@@ -103,16 +107,16 @@ public class RegistryClient {
server.setCreateTime(new
Date(alertServerHeartBeat.getStartupTime()));
server.setLastHeartbeatTime(new
Date(alertServerHeartBeat.getReportTime()));
server.setId(alertServerHeartBeat.getProcessId());
+ server.setHost(alertServerHeartBeat.getHost());
+ server.setPort(alertServerHeartBeat.getPort());
+ break;
+ default:
+ log.warn("unknown registry node type: {}",
registryNodeType);
}
server.setResInfo(heartBeatJson);
// todo: add host, port in heartBeat Info, so that we don't need
to parse this again
server.setZkDirectory(registryNodeType.getRegistryPath() + "/" +
serverPath);
- // set host and port
- int lastColonIndex = serverPath.lastIndexOf(":");
- // fetch the last one
- server.setHost(serverPath.substring(0, lastColonIndex));
-
server.setPort(Integer.parseInt(serverPath.substring(lastColonIndex + 1)));
serverList.add(server);
}
return serverList;
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
index 7b0dbe60e7..bb9c77e849 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
@@ -22,6 +22,7 @@ import
org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
@@ -75,6 +76,8 @@ public class WorkerHeartBeatTask extends
BaseHeartBeatTask<WorkerHeartBeat> {
.workerWaitingTaskCount(this.workerWaitingTaskCount.get())
.workerExecThreadCount(workerConfig.getExecThreads())
.serverStatus(serverStatus)
+ .host(NetUtils.getHost())
+ .port(workerConfig.getListenPort())
.build();
}