This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 19464a33ac [INLONG-11954][SDK] Fix potential data race in Golang SDK 
(#11955)
19464a33ac is described below

commit 19464a33ac35ab003b6e0663618c0ec6b37e646b
Author: gunli <[email protected]>
AuthorDate: Wed Jul 30 14:05:56 2025 +0800

    [INLONG-11954][SDK] Fix potential data race in Golang SDK (#11955)
---
 .../dataproxy-sdk-golang/dataproxy/client.go       | 37 +++++++++++++++-------
 .../dataproxy-sdk-golang/util/id.go                |  2 +-
 2 files changed, 26 insertions(+), 13 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
index c85900222e..1046a10d01 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
@@ -330,16 +330,23 @@ func (c *client) OnTraffic(conn gnet.Conn) (action 
gnet.Action) {
        for {
                total := conn.InboundBuffered()
                if total < heartbeatRspLen {
-                       break
+                       return gnet.None
+               }
+
+               buf, err := conn.Peek(total)
+               if err != nil {
+                       c.metrics.incError(errConnReadFailed.getStrCode())
+                       c.log.Error("read connection stream failed, close it, 
err:", err)
+                       // read failed, close the connection
+                       return gnet.Close
                }
 
-               buf, _ := conn.Peek(total)
                // if it is a heartbeat response, skip it and read the next 
package
                if bytes.Equal(buf[:heartbeatRspLen], heartbeatRsp) {
-                       _, err := conn.Discard(heartbeatRspLen)
+                       _, err = conn.Discard(heartbeatRspLen)
                        if err != nil {
                                
c.metrics.incError(errConnReadFailed.getStrCode())
-                               c.log.Error("discard connection stream failed, 
err", err)
+                               c.log.Error("discard connection stream failed, 
close it, err:", err)
                                // read failed, close the connection
                                return gnet.Close
                        }
@@ -349,30 +356,36 @@ func (c *client) OnTraffic(conn gnet.Conn) (action 
gnet.Action) {
                }
 
                length, payloadOffset, payloadOffsetEnd, err := 
c.framer.ReadFrame(buf)
-               if errors.Is(err, framer.ErrIncompleteFrame) {
-                       break
-               }
-
                if err != nil {
+                       if errors.Is(err, framer.ErrIncompleteFrame) {
+                               return gnet.None
+                       }
+
                        c.metrics.incError(errConnReadFailed.getStrCode())
                        c.log.Error("invalid packet from stream connection, 
close it, err:", err)
                        // read failed, close the connection
                        return gnet.Close
                }
 
-               frame, _ := conn.Peek(length)
-               _, err = conn.Discard(length)
+               frame, err := conn.Peek(length)
                if err != nil {
                        c.metrics.incError(errConnReadFailed.getStrCode())
-                       c.log.Error("discard connection stream failed, err", 
err)
+                       c.log.Error("read connection stream failed, close it, 
err:", err)
                        // read failed, close the connection
                        return gnet.Close
                }
 
                // handle response
                c.onResponse(frame[payloadOffset:payloadOffsetEnd])
+
+               _, err = conn.Discard(length)
+               if err != nil {
+                       c.metrics.incError(errConnReadFailed.getStrCode())
+                       c.log.Error("discard connection stream failed, close 
it, err:", err)
+                       // read failed, close the connection
+                       return gnet.Close
+               }
        }
-       return gnet.None
 }
 
 func (c *client) onResponse(frame []byte) {
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/util/id.go 
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/util/id.go
index 6992141a17..43ce93d0f8 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/util/id.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/util/id.go
@@ -56,8 +56,8 @@ func UInt64UUID() (uint64, error) {
        return cityhash.CityHash64WithSeeds(bytes, uint32(length), 
13329145742295551469, 7926974186468552394), nil
 }
 
-// Deprecated: Use SafeSnowFlakeID instead.
 // SnowFlakeID generates a snowflake ID. If an error occurs, it logs it and 
exits.
+// Deprecated: Use SafeSnowFlakeID instead.
 func SnowFlakeID() string {
        id, err := SafeSnowFlakeID()
        if err != nil {

Reply via email to