This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6596da9551 [INLONG-11384][DataProxy] Adjust the message body check 
logic for MsgType=5 (#11385)
6596da9551 is described below

commit 6596da9551585c9421a425a84cb7d1860be7091d
Author: Goson Zhang <[email protected]>
AuthorDate: Tue Oct 22 09:41:56 2024 +0800

    [INLONG-11384][DataProxy] Adjust the message body check logic for MsgType=5 
(#11385)
    
    Co-authored-by: gosonzhang <[email protected]>
---
 .../inlong/common/enums/DataProxyErrCode.java      |  1 +
 .../inlong/dataproxy/consts/StatConstants.java     |  1 +
 .../dataproxy/source/v0msg/CodecTextMsg.java       | 41 ++++++++++++++--------
 3 files changed, 28 insertions(+), 15 deletions(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
index c4ef15ce86..9ea36ea935 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
@@ -68,6 +68,7 @@ public enum DataProxyErrCode {
     DUPLICATED_MESSAGE(120, "Duplicated message"),
     GROUPID_OR_STREAMID_NOT_CONFIGURE(121, "GroupId or StreamId not found in 
configure"),
     GROUPID_OR_STREAMID_INCONSTANT(122, "GroupId or StreamId inconstant"),
+    MSG_BODY_ITEMS_INVALID(123, "Msg body items invalid"),
 
     ATTR_ORDER_CONTROL_CONFLICT_ERROR(150, "Require order send but isAck is 
false"),
     ATTR_PROXY_CONTROL_CONFLICT_ERROR(151, "Require proxy send but isAck is 
false"),
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
index 8617f74b8a..01db0527e9 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
@@ -76,6 +76,7 @@ public class StatConstants {
     public static final java.lang.String EVENT_MSG_BIN_LEN_MALFORMED = 
"msg.bin.len.malformed";
     public static final java.lang.String EVENT_MSG_TXT_LEN_MALFORMED = 
"msg.txt.len.malformed";
     public static final java.lang.String EVENT_MSG_ITEM_LEN_MALFORMED = 
"msg.item.len.malformed";
+    public static final java.lang.String EVENT_MSG_TYPE_5_LEN_MALFORMED = 
"msg.type5.len.malformed";
     public static final java.lang.String EVENT_MSG_ATTR_INVALID = 
"msg.attr.invalid";
     public static final java.lang.String EVENT_MSG_ORDER_ACK_INVALID = 
"msg.attr.order.noack";
     public static final java.lang.String EVENT_MSG_PROXY_ACK_INVALID = 
"msg.attr.proxy.noack";
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java
index fc83805900..746cac9ca0 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java
@@ -121,22 +121,33 @@ public class CodecTextMsg extends AbsV0MsgCodec {
         }
         // check body items
         if (MsgType.MSG_MULTI_BODY.equals(MsgType.valueOf(msgType))) {
-            int readPos = 0;
-            int singleMsgLen;
-            ByteBuffer bodyBuffer = ByteBuffer.wrap(bodyData);
-            int remaining = bodyBuffer.remaining();
-            while (remaining > 0) {
-                singleMsgLen = bodyBuffer.getInt(readPos);
-                if (singleMsgLen <= 0 || singleMsgLen > remaining) {
-                    
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_ITEM_LEN_MALFORMED);
-                    this.errCode = DataProxyErrCode.BODY_EXCEED_MAX_LEN;
-                    this.errMsg = String.format(
-                            "Malformed data len, singleMsgLen(%d), buffer 
remaining(%d), attr: (%s)",
-                            singleMsgLen, remaining, origAttr);
-                    return false;
+            int totalCnt = 0;
+            int singleMsgLen = 0;
+            int nexPossition = 0;
+            ByteBuffer bodyBuffer = ByteBuffer.wrap(this.bodyData);
+            if (bodyBuffer.limit() <= 4) {
+                
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_TYPE_5_LEN_MALFORMED);
+                this.errCode = DataProxyErrCode.MSG_BODY_ITEMS_INVALID;
+                this.errMsg = String.format(
+                        "Malformed data len, bodyLen(%d), attr: (%s)", 
bodyBuffer.limit(), origAttr);
+                return false;
+            }
+            while (nexPossition < bodyBuffer.limit()) {
+                bodyBuffer.position(nexPossition);
+                singleMsgLen = bodyBuffer.getInt();
+                if (singleMsgLen <= 0 || singleMsgLen > 
bodyBuffer.remaining()) {
+                    break;
                 }
-                readPos += 4 + singleMsgLen;
-                remaining -= 4 + singleMsgLen;
+                totalCnt++;
+                nexPossition += 4 + singleMsgLen;
+            }
+            if (totalCnt == 0) {
+                
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_TYPE_5_LEN_MALFORMED);
+                this.errCode = DataProxyErrCode.MSG_BODY_ITEMS_INVALID;
+                this.errMsg = String.format(
+                        "Malformed data len, singleMsgLen(%d), buffer 
remaining(%d), attr: (%s)",
+                        singleMsgLen, bodyBuffer.remaining(), origAttr);
+                return false;
             }
         }
         return true;

Reply via email to