This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 49f141a46c [INLONG-11843][SDK] Validate input message (#11844)
49f141a46c is described below
commit 49f141a46c61722f8d271bf3ab8ecbb19120168e
Author: gunli <[email protected]>
AuthorDate: Sun Apr 27 21:28:28 2025 +0800
[INLONG-11843][SDK] Validate input message (#11844)
---
.../dataproxy-sdk-golang/dataproxy/client.go | 14 ++++++++++++++
.../dataproxy-sdk-golang/dataproxy/message.go | 13 +++++++++++++
2 files changed, 27 insertions(+)
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
index a4aec4c36b..c85900222e 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/client.go
@@ -39,6 +39,7 @@ var (
ErrInvalidURL = errors.New("invalid URL")
ErrNoEndpoint = errors.New("service has no endpoints")
ErrNoAvailableWorker = errors.New("no available worker")
+ ErrInvalidMessage = errors.New("invalid message,
GroupID/StreamID/Payload is empty or contains illegal characters")
)
// Client is the interface of a DataProxy client
@@ -216,6 +217,11 @@ func (c *client) Dial(addr string, ctx any) (gnet.Conn,
error) {
}
func (c *client) Send(ctx context.Context, msg Message) error {
+ if !msg.IsValid() {
+ c.log.Error("invalid message", ErrInvalidGroupID)
+ return ErrInvalidMessage
+ }
+
worker, err := c.getWorker()
if err != nil {
return ErrNoAvailableWorker
@@ -224,6 +230,14 @@ func (c *client) Send(ctx context.Context, msg Message)
error {
}
func (c *client) SendAsync(ctx context.Context, msg Message, cb Callback) {
+ if !msg.IsValid() {
+ if cb != nil {
+ cb(msg, ErrInvalidMessage)
+ }
+ c.log.Error("invalid message", ErrInvalidGroupID)
+ return
+ }
+
worker, err := c.getWorker()
if err != nil {
if cb != nil {
diff --git
a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/message.go
b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/message.go
index 717544b9e9..b41cd46a36 100755
--- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/message.go
+++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-golang/dataproxy/message.go
@@ -16,6 +16,10 @@
package dataproxy
+import (
+ "strings"
+)
+
// Callback is the callback function signature of the DataProxy producer
type Callback func(message Message, err error)
@@ -27,3 +31,12 @@ type Message struct {
Headers map[string]string // message headers, won't be sent to the
server right now
MetaData interface{} // any data you want, won't be sent to the
server, but you can get it in the callback
}
+
+// IsValid checks if the message is valid
+func (m *Message) IsValid() bool {
+ if strings.ContainsAny(m.GroupID, " \t\n\r") ||
strings.ContainsAny(m.StreamID, " \t\n\r") || m.GroupID == "" || m.StreamID ==
"" || len(m.Payload) == 0 {
+ return false
+ }
+
+ return true
+}