This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new a8779c0d4e [ISSUE #8961] Automatic recognition of address scheme in
Topic Route by host (#8962)
a8779c0d4e is described below
commit a8779c0d4e815835bc17f708a0215cb5877b4004
Author: dingshuangxi888 <[email protected]>
AuthorDate: Fri Nov 22 15:34:36 2024 +0800
[ISSUE #8961] Automatic recognition of address scheme in Topic Route by
host (#8962)
* automatic recognition of address scheme in topic route by host.
---
.../rocketmq/common/utils/IPAddressUtils.java | 8 +++
.../org/apache/rocketmq/proxy/common/Address.java | 20 ++++++++
.../remoting/activity/GetTopicRouteActivity.java | 2 +-
.../proxy/service/route/ProxyTopicRouteData.java | 4 +-
.../apache/rocketmq/proxy/common/AddressTest.java | 60 ++++++++++++++++++++++
5 files changed, 91 insertions(+), 3 deletions(-)
diff --git
a/common/src/main/java/org/apache/rocketmq/common/utils/IPAddressUtils.java
b/common/src/main/java/org/apache/rocketmq/common/utils/IPAddressUtils.java
index ca66bc93be..5133219d9c 100644
--- a/common/src/main/java/org/apache/rocketmq/common/utils/IPAddressUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/IPAddressUtils.java
@@ -35,6 +35,14 @@ public class IPAddressUtils {
return VALIDATOR.isValid(ip);
}
+ public static boolean isValidIPv4(String ip) {
+ return VALIDATOR.isValidInet4Address(ip);
+ }
+
+ public static boolean isValidIPv6(String ip) {
+ return VALIDATOR.isValidInet6Address(ip);
+ }
+
public static boolean isValidCidr(String cidr) {
return isValidIPv4Cidr(cidr) || isValidIPv6Cidr(cidr);
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/Address.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/Address.java
index 2fc1dab40e..1f247194e2 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/Address.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/Address.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.proxy.common;
import com.google.common.net.HostAndPort;
import java.util.Objects;
+import org.apache.rocketmq.common.utils.IPAddressUtils;
public class Address {
@@ -31,6 +32,11 @@ public class Address {
private AddressScheme addressScheme;
private HostAndPort hostAndPort;
+ public Address(HostAndPort hostAndPort) {
+ this.addressScheme = buildScheme(hostAndPort);
+ this.hostAndPort = hostAndPort;
+ }
+
public Address(AddressScheme addressScheme, HostAndPort hostAndPort) {
this.addressScheme = addressScheme;
this.hostAndPort = hostAndPort;
@@ -52,6 +58,20 @@ public class Address {
this.hostAndPort = hostAndPort;
}
+ private AddressScheme buildScheme(HostAndPort hostAndPort) {
+ if (hostAndPort == null) {
+ return AddressScheme.UNRECOGNIZED;
+ }
+ String address = hostAndPort.getHost();
+ if (IPAddressUtils.isValidIPv4(address)) {
+ return AddressScheme.IPv4;
+ }
+ if (IPAddressUtils.isValidIPv6(address)) {
+ return AddressScheme.IPv6;
+ }
+ return AddressScheme.DOMAIN_NAME;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java
index 9972c26c99..56ec34fae6 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java
@@ -50,7 +50,7 @@ public class GetTopicRouteActivity extends
AbstractRemotingActivity {
(GetRouteInfoRequestHeader)
request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
List<Address> addressList = new ArrayList<>();
// AddressScheme is just a placeholder and will not affect topic route
result in this case.
- addressList.add(new Address(Address.AddressScheme.IPv4,
HostAndPort.fromParts(proxyConfig.getRemotingAccessAddr(),
proxyConfig.getRemotingListenPort())));
+ addressList.add(new
Address(HostAndPort.fromParts(proxyConfig.getRemotingAccessAddr(),
proxyConfig.getRemotingListenPort())));
ProxyTopicRouteData proxyTopicRouteData =
messagingProcessor.getTopicRouteDataForProxy(context, addressList,
requestHeader.getTopic());
TopicRouteData topicRouteData =
proxyTopicRouteData.buildTopicRouteData();
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ProxyTopicRouteData.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ProxyTopicRouteData.java
index b5e65818ac..4c33580ada 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ProxyTopicRouteData.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ProxyTopicRouteData.java
@@ -43,7 +43,7 @@ public class ProxyTopicRouteData {
brokerData.getBrokerAddrs().forEach((brokerId, brokerAddr) -> {
HostAndPort brokerHostAndPort =
HostAndPort.fromString(brokerAddr);
- proxyBrokerData.getBrokerAddrs().put(brokerId,
Lists.newArrayList(new Address(Address.AddressScheme.IPv4, brokerHostAndPort)));
+ proxyBrokerData.getBrokerAddrs().put(brokerId,
Lists.newArrayList(new Address(brokerHostAndPort)));
});
this.brokerDatas.add(proxyBrokerData);
}
@@ -61,7 +61,7 @@ public class ProxyTopicRouteData {
HostAndPort brokerHostAndPort =
HostAndPort.fromString(brokerAddr);
HostAndPort proxyHostAndPort =
HostAndPort.fromParts(brokerHostAndPort.getHost(), port);
- proxyBrokerData.getBrokerAddrs().put(brokerId,
Lists.newArrayList(new Address(Address.AddressScheme.IPv4, proxyHostAndPort)));
+ proxyBrokerData.getBrokerAddrs().put(brokerId,
Lists.newArrayList(new Address(proxyHostAndPort)));
});
this.brokerDatas.add(proxyBrokerData);
}
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/common/AddressTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/common/AddressTest.java
new file mode 100644
index 0000000000..b0df5bafc1
--- /dev/null
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/common/AddressTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.proxy.common;
+
+import com.google.common.net.HostAndPort;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+public class AddressTest {
+
+ @Test
+ public void testConstructorWithIPv4() {
+ HostAndPort hostAndPort = HostAndPort.fromString("192.168.1.1:8080");
+ Address address = new Address(hostAndPort);
+
+ assertEquals(Address.AddressScheme.IPv4, address.getAddressScheme());
+ assertEquals(hostAndPort, address.getHostAndPort());
+ }
+
+ @Test
+ public void testConstructorWithIPv6() {
+ HostAndPort hostAndPort = HostAndPort.fromString("[2001:db8::1]:8080");
+ Address address = new Address(hostAndPort);
+
+ assertEquals(Address.AddressScheme.IPv6, address.getAddressScheme());
+ assertEquals(hostAndPort, address.getHostAndPort());
+ }
+
+ @Test
+ public void testConstructorWithDomainName() {
+ HostAndPort hostAndPort = HostAndPort.fromString("example.com:8080");
+ Address address = new Address(hostAndPort);
+
+ assertEquals(Address.AddressScheme.DOMAIN_NAME,
address.getAddressScheme());
+ assertEquals(hostAndPort, address.getHostAndPort());
+ }
+
+ @Test
+ public void testConstructorWithNullHostAndPort() {
+ Address address = new Address(null);
+
+ assertEquals(Address.AddressScheme.UNRECOGNIZED,
address.getAddressScheme());
+ assertNull(address.getHostAndPort());
+ }
+}
\ No newline at end of file