This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 888c6aab52 [INLONG-10184][SDK] The header in ProxyEvent need contain 
both 'rtms' and 'auditVersion'. (#10185)
888c6aab52 is described below

commit 888c6aab5210b06042a5ada254766e62123b3308
Author: Mingyu Bao <[email protected]>
AuthorDate: Sat May 11 21:52:53 2024 +0800

    [INLONG-10184][SDK] The header in ProxyEvent need contain both 'rtms' and 
'auditVersion'. (#10185)
---
 .../org/apache/inlong/sdk/commons/protocol/ProxyEvent.java   | 12 ++++++++++++
 1 file changed, 12 insertions(+)

diff --git 
a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java
 
b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java
index c76ab8974c..dfe27d2a2b 100644
--- 
a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java
+++ 
b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java
@@ -17,10 +17,12 @@
 
 package org.apache.inlong.sdk.commons.protocol;
 
+import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObj;
 
 import org.apache.commons.lang3.math.NumberUtils;
 
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -83,6 +85,16 @@ public class ProxyEvent extends SdkEvent {
         headers.put(EventConstants.INLONG_STREAM_ID, inlongStreamId);
         headers.put(EventConstants.HEADER_KEY_MSG_TIME, 
String.valueOf(msgTime));
         headers.put(EventConstants.HEADER_KEY_SOURCE_IP, sourceIp);
+        if (obj != null && obj.getParamsList() != null) {
+            List<ProxySdk.MapFieldEntry> list = obj.getParamsList();
+            for (ProxySdk.MapFieldEntry entry : list) {
+                if 
(AttributeConstants.MSG_RPT_TIME.equalsIgnoreCase(entry.getKey())) {
+                    headers.put(AttributeConstants.MSG_RPT_TIME, 
entry.getValue());
+                } else if 
(AttributeConstants.AUDIT_VERSION.equalsIgnoreCase(entry.getKey())) {
+                    headers.put(AttributeConstants.AUDIT_VERSION, 
entry.getValue());
+                }
+            }
+        }
 
         this.sourceTime = System.currentTimeMillis();
         this.getHeaders().put(EventConstants.HEADER_KEY_SOURCE_TIME, 
String.valueOf(sourceTime));

Reply via email to