This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 8956768406 [ISSUE #9497] Fix IndexOutOfBoundsException in
getEarliestMessageTime when running in IPv6 environment (#9498)
8956768406 is described below
commit 8956768406da0c9fc50dd36e030a5cf4de90a38b
Author: SHI <[email protected]>
AuthorDate: Mon Jun 30 13:54:07 2025 +0800
[ISSUE #9497] Fix IndexOutOfBoundsException in getEarliestMessageTime when
running in IPv6 environment (#9498)
Co-authored-by: shixiaoxiao <[email protected]>
---
.../apache/rocketmq/common/utils/NetworkUtil.java | 26 ++++++++++++++++++++++
.../rocketmq/common/utils/IPAddressUtilsTest.java | 17 ++++++++++++++
.../apache/rocketmq/store/DefaultMessageStore.java | 5 ++---
3 files changed, 45 insertions(+), 3 deletions(-)
diff --git
a/common/src/main/java/org/apache/rocketmq/common/utils/NetworkUtil.java
b/common/src/main/java/org/apache/rocketmq/common/utils/NetworkUtil.java
index 2dc2a890e7..6d454925f6 100644
--- a/common/src/main/java/org/apache/rocketmq/common/utils/NetworkUtil.java
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/NetworkUtil.java
@@ -179,6 +179,18 @@ public class NetworkUtil {
}
}
+ public static String denormalizeHostAddress(final String bracketedAddress)
{
+ if (bracketedAddress == null) {
+ return null;
+ }
+
+ if (bracketedAddress.startsWith("[") &&
bracketedAddress.endsWith("]")) {
+ return bracketedAddress.substring(1, bracketedAddress.length() -
1);
+ } else {
+ return bracketedAddress;
+ }
+ }
+
public static SocketAddress string2SocketAddress(final String addr) {
int split = addr.lastIndexOf(":");
String host = addr.substring(0, split);
@@ -211,4 +223,18 @@ public class NetworkUtil {
}
return false;
}
+
+ // valid various ipv6 format like:
+ // with scope 2001:0db8:85a3:0000:0000:8a2e:0370:7334%eth0
+ // with bracketed [2001:0db8:85a3:0000:0000:8a2e:0370:7334]
+ public static boolean validCommonInet6Address(String ipOrCidr) {
+ String ipWithoutBracketed = denormalizeHostAddress(ipOrCidr);
+ if (ipWithoutBracketed != null && ipWithoutBracketed.length() != 0) {
+ InetAddressValidator validator =
InetAddressValidator.getInstance();
+ if
(validator.isValidInet6Address(ipWithoutBracketed.split("%")[0])) {
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git
a/common/src/test/java/org/apache/rocketmq/common/utils/IPAddressUtilsTest.java
b/common/src/test/java/org/apache/rocketmq/common/utils/IPAddressUtilsTest.java
index 404250e6c0..7830bc3aed 100644
---
a/common/src/test/java/org/apache/rocketmq/common/utils/IPAddressUtilsTest.java
+++
b/common/src/test/java/org/apache/rocketmq/common/utils/IPAddressUtilsTest.java
@@ -18,6 +18,8 @@ package org.apache.rocketmq.common.utils;
import org.junit.Test;
+import static
org.apache.rocketmq.common.utils.NetworkUtil.validCommonInet6Address;
+
public class IPAddressUtilsTest {
@Test
@@ -72,4 +74,19 @@ public class IPAddressUtilsTest {
assert IPAddressUtils.isValidIPOrCidr(ipv4Cidr);
assert IPAddressUtils.isValidIPOrCidr(ipv6Cidr);
}
+
+ @Test
+ public void isValidIPv6Common() {
+ String ipv6WithoutScope = "2001:0db8:85a3:0000:0000:8a2e:0370:7334";
+ assert validCommonInet6Address(ipv6WithoutScope);
+ String ipv6WithScope = "2001:0db8:85a3:0000:0000:8a2e:0370:7334%eth0";
+ assert validCommonInet6Address(ipv6WithScope);
+ String ipv6WithBracketedAndScope =
"[2001:0db8:85a3:0000:0000:8a2e:0370:7334%eth0]";
+ assert validCommonInet6Address(ipv6WithBracketedAndScope);
+ String ipv4 = "192.168.1.0";
+ assert !validCommonInet6Address(ipv4);
+ String ipv4Cidr = "192.168.1.0/24";
+ assert !validCommonInet6Address(ipv4Cidr);
+ }
+
}
\ No newline at end of file
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index cbf0150187..ea0a814caf 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -57,7 +57,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.validator.routines.InetAddressValidator;
import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.BrokerConfig;
@@ -80,6 +79,7 @@ import org.apache.rocketmq.common.running.RunningStats;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.CleanupPolicyUtils;
+import org.apache.rocketmq.common.utils.NetworkUtil;
import org.apache.rocketmq.common.utils.QueueTypeUtils;
import org.apache.rocketmq.common.utils.ServiceProvider;
import org.apache.rocketmq.common.utils.ThreadUtils;
@@ -1174,8 +1174,7 @@ public class DefaultMessageStore implements MessageStore {
minPhyOffset += DLedgerEntry.BODY_OFFSET;
}
int size = MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSITION + 8;
- InetAddressValidator validator = InetAddressValidator.getInstance();
- if (validator.isValidInet6Address(this.brokerConfig.getBrokerIP1())) {
+ if
(NetworkUtil.validCommonInet6Address(this.brokerConfig.getBrokerIP1())) {
size = MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSITION + 20;
}
return this.getCommitLog().pickupStoreTimestamp(minPhyOffset, size);