This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6596da9551 [INLONG-11384][DataProxy] Adjust the message body check
logic for MsgType=5 (#11385)
6596da9551 is described below
commit 6596da9551585c9421a425a84cb7d1860be7091d
Author: Goson Zhang <[email protected]>
AuthorDate: Tue Oct 22 09:41:56 2024 +0800
[INLONG-11384][DataProxy] Adjust the message body check logic for MsgType=5
(#11385)
Co-authored-by: gosonzhang <[email protected]>
---
.../inlong/common/enums/DataProxyErrCode.java | 1 +
.../inlong/dataproxy/consts/StatConstants.java | 1 +
.../dataproxy/source/v0msg/CodecTextMsg.java | 41 ++++++++++++++--------
3 files changed, 28 insertions(+), 15 deletions(-)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
index c4ef15ce86..9ea36ea935 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyErrCode.java
@@ -68,6 +68,7 @@ public enum DataProxyErrCode {
DUPLICATED_MESSAGE(120, "Duplicated message"),
GROUPID_OR_STREAMID_NOT_CONFIGURE(121, "GroupId or StreamId not found in
configure"),
GROUPID_OR_STREAMID_INCONSTANT(122, "GroupId or StreamId inconstant"),
+ MSG_BODY_ITEMS_INVALID(123, "Msg body items invalid"),
ATTR_ORDER_CONTROL_CONFLICT_ERROR(150, "Require order send but isAck is
false"),
ATTR_PROXY_CONTROL_CONFLICT_ERROR(151, "Require proxy send but isAck is
false"),
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
index 8617f74b8a..01db0527e9 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
@@ -76,6 +76,7 @@ public class StatConstants {
public static final java.lang.String EVENT_MSG_BIN_LEN_MALFORMED =
"msg.bin.len.malformed";
public static final java.lang.String EVENT_MSG_TXT_LEN_MALFORMED =
"msg.txt.len.malformed";
public static final java.lang.String EVENT_MSG_ITEM_LEN_MALFORMED =
"msg.item.len.malformed";
+ public static final java.lang.String EVENT_MSG_TYPE_5_LEN_MALFORMED =
"msg.type5.len.malformed";
public static final java.lang.String EVENT_MSG_ATTR_INVALID =
"msg.attr.invalid";
public static final java.lang.String EVENT_MSG_ORDER_ACK_INVALID =
"msg.attr.order.noack";
public static final java.lang.String EVENT_MSG_PROXY_ACK_INVALID =
"msg.attr.proxy.noack";
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java
index fc83805900..746cac9ca0 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java
@@ -121,22 +121,33 @@ public class CodecTextMsg extends AbsV0MsgCodec {
}
// check body items
if (MsgType.MSG_MULTI_BODY.equals(MsgType.valueOf(msgType))) {
- int readPos = 0;
- int singleMsgLen;
- ByteBuffer bodyBuffer = ByteBuffer.wrap(bodyData);
- int remaining = bodyBuffer.remaining();
- while (remaining > 0) {
- singleMsgLen = bodyBuffer.getInt(readPos);
- if (singleMsgLen <= 0 || singleMsgLen > remaining) {
-
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_ITEM_LEN_MALFORMED);
- this.errCode = DataProxyErrCode.BODY_EXCEED_MAX_LEN;
- this.errMsg = String.format(
- "Malformed data len, singleMsgLen(%d), buffer
remaining(%d), attr: (%s)",
- singleMsgLen, remaining, origAttr);
- return false;
+ int totalCnt = 0;
+ int singleMsgLen = 0;
+ int nexPossition = 0;
+ ByteBuffer bodyBuffer = ByteBuffer.wrap(this.bodyData);
+ if (bodyBuffer.limit() <= 4) {
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_TYPE_5_LEN_MALFORMED);
+ this.errCode = DataProxyErrCode.MSG_BODY_ITEMS_INVALID;
+ this.errMsg = String.format(
+ "Malformed data len, bodyLen(%d), attr: (%s)",
bodyBuffer.limit(), origAttr);
+ return false;
+ }
+ while (nexPossition < bodyBuffer.limit()) {
+ bodyBuffer.position(nexPossition);
+ singleMsgLen = bodyBuffer.getInt();
+ if (singleMsgLen <= 0 || singleMsgLen >
bodyBuffer.remaining()) {
+ break;
}
- readPos += 4 + singleMsgLen;
- remaining -= 4 + singleMsgLen;
+ totalCnt++;
+ nexPossition += 4 + singleMsgLen;
+ }
+ if (totalCnt == 0) {
+
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_TYPE_5_LEN_MALFORMED);
+ this.errCode = DataProxyErrCode.MSG_BODY_ITEMS_INVALID;
+ this.errMsg = String.format(
+ "Malformed data len, singleMsgLen(%d), buffer
remaining(%d), attr: (%s)",
+ singleMsgLen, bodyBuffer.remaining(), origAttr);
+ return false;
}
}
return true;