This is an automated email from the ASF dual-hosted git repository.

yuzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new f2fb16d  unmarshalMsgID support ipv6 (#1203)
f2fb16d is described below

commit f2fb16d33e689a1c57ea7681314d147ad61a3704
Author: SHI <[email protected]>
AuthorDate: Wed Nov 12 15:17:07 2025 +0800

    unmarshalMsgID support ipv6 (#1203)
    
    Co-authored-by: shixiaoxiao <[email protected]>
---
 internal/utils/net.go     |  5 +++--
 primitive/message.go      | 39 +++++++++++++++++++++++++++++++++------
 primitive/message_test.go | 18 ++++++++++++++++++
 3 files changed, 54 insertions(+), 8 deletions(-)

diff --git a/internal/utils/net.go b/internal/utils/net.go
index a4eeb56..1588148 100644
--- a/internal/utils/net.go
+++ b/internal/utils/net.go
@@ -20,10 +20,11 @@ package utils
 import (
        "bytes"
        "fmt"
-       "github.com/apache/rocketmq-client-go/v2/errors"
        "net"
        "strconv"
        "time"
+
+       "github.com/apache/rocketmq-client-go/v2/errors"
 )
 
 var (
@@ -66,5 +67,5 @@ func FakeIP() []byte {
 }
 
 func GetAddressByBytes(data []byte) string {
-       return net.IPv4(data[0], data[1], data[2], data[3]).String()
+       return net.IP(data).String()
 }
diff --git a/primitive/message.go b/primitive/message.go
index 8b390c4..123ded5 100644
--- a/primitive/message.go
+++ b/primitive/message.go
@@ -482,19 +482,46 @@ func UnmarshalMsgID(id []byte) (*MessageID, error) {
        if len(id) < 32 {
                return nil, fmt.Errorf("%s len < 32", string(id))
        }
+
        var (
                ipBytes     = make([]byte, 4)
                portBytes   = make([]byte, 4)
                offsetBytes = make([]byte, 8)
        )
-       hex.Decode(ipBytes, id[0:8])
-       hex.Decode(portBytes, id[8:16])
-       hex.Decode(offsetBytes, id[16:32])
+       if len(id) == 32 {
+               hex.Decode(ipBytes, id[0:8])
+               hex.Decode(portBytes, id[8:16])
+               hex.Decode(offsetBytes, id[16:32])
+       } else {
+               ipBytes = make([]byte, 16)
+               portBytes = make([]byte, 4)
+               offsetBytes = make([]byte, 8)
+               hex.Decode(ipBytes, id[0:32])
+               hex.Decode(portBytes, id[32:40])
+               hex.Decode(offsetBytes, id[40:56])
+       }
+
+       addr := utils.GetAddressByBytes(ipBytes)
+       port := int(binary.BigEndian.Uint32(portBytes))
+       offset := int64(binary.BigEndian.Uint64(offsetBytes))
+
+       if addr == "" {
+               return nil, fmt.Errorf("addr is empty")
+       }
+
+       if port < 0 || port > 65535 {
+               return nil, fmt.Errorf("port > 65535, acutal port is %d", port)
+       }
+
+       if len(id) != 32 {
+               // DialContext require ipv6 format: [ipv6]:port
+               addr = fmt.Sprintf("[%s]", addr)
+       }
 
        return &MessageID{
-               Addr:   utils.GetAddressByBytes(ipBytes),
-               Port:   int(binary.BigEndian.Uint32(portBytes)),
-               Offset: int64(binary.BigEndian.Uint64(offsetBytes)),
+               Addr:   addr,
+               Port:   port,
+               Offset: offset,
        }, nil
 }
 
diff --git a/primitive/message_test.go b/primitive/message_test.go
index 7248df7..65cc03b 100644
--- a/primitive/message_test.go
+++ b/primitive/message_test.go
@@ -37,6 +37,24 @@ func TestMessageID(t *testing.T) {
        t.Log(msgID)
 }
 
+func TestIpv6MessageID(t *testing.T) {
+       id := []byte("FDBDDC4100010136C800000000000024000078BF0000000000004F45")
+       msgID, err := UnmarshalMsgID(id)
+       if err != nil {
+               t.Fatalf("unmarshal msg id error, ms is: %s", err.Error())
+       }
+       if msgID.Addr != "[fdbd:dc41:1:136:c800::24]" {
+               t.Fatalf("parse messageID %s error", id)
+       }
+       if msgID.Port != 30911 {
+               t.Fatalf("parse messageID %s error", id)
+       }
+       if msgID.Offset != 20293 {
+               t.Fatalf("parse messageID %s error", id)
+       }
+       t.Log(msgID)
+}
+
 func TestMessageKey(t *testing.T) {
        msg := &Message{}
        expected := "testKey"

Reply via email to