This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 19464a33ac [INLONG-11954][SDK] Fix potential data race in Golang SDK
(#11955)
19464a33ac is described below
commit 19464a33ac35ab003b6e0663618c0ec6b37e646b
Author: gunli <[email protected]>
AuthorDate: Wed Jul 30 14:05:56 2025 +0800
[INLONG-11954][SDK] Fix potential data race in Golang SDK (#11955)
---
.../dataproxy-sdk-golang/dataproxy/client.go | 37 +++++++++++++++-------
.../dataproxy-sdk-golang/util/id.go | 2 +-
2 files changed, 26 insertions(+), 13 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
index c85900222e..1046a10d01 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
@@ -330,16 +330,23 @@ func (c *client) OnTraffic(conn gnet.Conn) (action
gnet.Action) {
for {
total := conn.InboundBuffered()
if total < heartbeatRspLen {
- break
+ return gnet.None
+ }
+
+ buf, err := conn.Peek(total)
+ if err != nil {
+ c.metrics.incError(errConnReadFailed.getStrCode())
+ c.log.Error("read connection stream failed, close it,
err:", err)
+ // read failed, close the connection
+ return gnet.Close
}
- buf, _ := conn.Peek(total)
// if it is a heartbeat response, skip it and read the next
package
if bytes.Equal(buf[:heartbeatRspLen], heartbeatRsp) {
- _, err := conn.Discard(heartbeatRspLen)
+ _, err = conn.Discard(heartbeatRspLen)
if err != nil {
c.metrics.incError(errConnReadFailed.getStrCode())
- c.log.Error("discard connection stream failed,
err", err)
+ c.log.Error("discard connection stream failed,
close it, err:", err)
// read failed, close the connection
return gnet.Close
}
@@ -349,30 +356,36 @@ func (c *client) OnTraffic(conn gnet.Conn) (action
gnet.Action) {
}
length, payloadOffset, payloadOffsetEnd, err :=
c.framer.ReadFrame(buf)
- if errors.Is(err, framer.ErrIncompleteFrame) {
- break
- }
-
if err != nil {
+ if errors.Is(err, framer.ErrIncompleteFrame) {
+ return gnet.None
+ }
+
c.metrics.incError(errConnReadFailed.getStrCode())
c.log.Error("invalid packet from stream connection,
close it, err:", err)
// read failed, close the connection
return gnet.Close
}
- frame, _ := conn.Peek(length)
- _, err = conn.Discard(length)
+ frame, err := conn.Peek(length)
if err != nil {
c.metrics.incError(errConnReadFailed.getStrCode())
- c.log.Error("discard connection stream failed, err",
err)
+ c.log.Error("read connection stream failed, close it,
err:", err)
// read failed, close the connection
return gnet.Close
}
// handle response
c.onResponse(frame[payloadOffset:payloadOffsetEnd])
+
+ _, err = conn.Discard(length)
+ if err != nil {
+ c.metrics.incError(errConnReadFailed.getStrCode())
+ c.log.Error("discard connection stream failed, close
it, err:", err)
+ // read failed, close the connection
+ return gnet.Close
+ }
}
- return gnet.None
}
func (c *client) onResponse(frame []byte) {
diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/util/id.go
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/util/id.go
index 6992141a17..43ce93d0f8 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/util/id.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/util/id.go
@@ -56,8 +56,8 @@ func UInt64UUID() (uint64, error) {
return cityhash.CityHash64WithSeeds(bytes, uint32(length),
13329145742295551469, 7926974186468552394), nil
}
-// Deprecated: Use SafeSnowFlakeID instead.
// SnowFlakeID generates a snowflake ID. If an error occurs, it logs it and
exits.
+// Deprecated: Use SafeSnowFlakeID instead.
func SnowFlakeID() string {
id, err := SafeSnowFlakeID()
if err != nil {