This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3cf96858c5 [INLONG-12071][Manager] Fix the incorrect filtering logic 
that checks whether messages match (#12072)
3cf96858c5 is described below

commit 3cf96858c58afd7fe25dca3ce9e40ab98031e157
Author: healchow <[email protected]>
AuthorDate: Wed Feb 25 09:56:19 2026 +0800

    [INLONG-12071][Manager] Fix the incorrect filtering logic that checks 
whether messages match (#12072)
    
    * [INLONG-12071][Manager] Fix the filtering logic that incorrectly 
determines whether messages match
    
    * [INLONG-12071][Manager] Fix the compile warning issues
---
 .../manager/pojo/consume/BriefMQMessage.java       |   2 +-
 .../service/message/DeserializeOperator.java       |  48 ++-
 .../service/message/RawMsgDeserializeOperator.java |   8 +-
 .../resource/queue/pulsar/PulsarOperator.java      |  15 +-
 .../service/resource/queue/pulsar/PulsarUtils.java | 143 ++++----
 .../service/message/DeserializeOperatorTest.java   | 407 +++++++++++++++++++++
 6 files changed, 522 insertions(+), 101 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BriefMQMessage.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BriefMQMessage.java
index 6161684b43..d275d5f7ca 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BriefMQMessage.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BriefMQMessage.java
@@ -72,7 +72,7 @@ public class BriefMQMessage {
 
         private String fieldName;
 
-        private String FieldValue;
+        private String fieldValue;
 
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperator.java
index 1fb898da99..5d6076bba3 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperator.java
@@ -70,27 +70,41 @@ public interface DeserializeOperator {
                 streamInfo.getWrapType()));
     }
 
-    default Boolean checkIfFilter(QueryMessageRequest request, List<FieldInfo> 
streamFieldList) {
-        if (StringUtils.isBlank(request.getFieldName()) || 
StringUtils.isBlank(request.getOperationType())
-                || StringUtils.isBlank(request.getTargetValue())) {
+    /**
+     * Check whether the message should be filtered by the filter condition
+     *
+     * @param request query message request
+     * @param streamFieldList stream field list
+     * @return true if the message should be filtered
+     */
+    default boolean checkIfFilter(QueryMessageRequest request, List<FieldInfo> 
streamFieldList) {
+        String fieldName = request.getFieldName();
+        if (StringUtils.isAnyBlank(fieldName, request.getOperationType())) {
             return false;
         }
-        boolean ifFilter = false;
+
         FieldInfo fieldInfo = streamFieldList.stream()
-                .filter(v -> Objects.equals(v.getFieldName(), 
request.getFieldName())).findFirst()
+                .filter(field -> fieldName.equals(field.getFieldName()))
+                .findFirst()
                 .orElse(null);
-        if (fieldInfo != null) {
-            switch (request.getOperationType()) {
-                case "=":
-                    ifFilter = !Objects.equals(request.getTargetValue(), 
fieldInfo.getFieldValue());
-                    break;
-                case "!=":
-                    ifFilter = Objects.equals(request.getTargetValue(), 
fieldInfo.getFieldValue());
-                    break;
-                case "like":
-                    ifFilter = fieldInfo.getFieldValue() != null
-                            && 
!fieldInfo.getFieldValue().contains(request.getTargetValue());
-            }
+        if (fieldInfo == null) {
+            return false;
+        }
+
+        boolean ifFilter = false;
+        String fieldValue = fieldInfo.getFieldValue();
+        // support targetValue == blank string (null or "") or targetValue != 
blank string
+        String targetValue = request.getTargetValue();
+        switch (request.getOperationType()) {
+            case "=":
+                ifFilter = !Objects.equals(targetValue, fieldValue);
+                break;
+            case "!=":
+                ifFilter = Objects.equals(targetValue, fieldValue);
+                break;
+            case "like":
+                // fieldValue or targetValue is null, like operation is 
invalid, so this record should be filtered
+                ifFilter = fieldValue == null || targetValue == null || 
!fieldValue.contains(targetValue);
         }
         return ifFilter;
     }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
index d8ac5c8a9a..dcb44c1509 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
@@ -35,7 +35,6 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.nio.charset.Charset;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -65,6 +64,7 @@ public class RawMsgDeserializeOperator implements 
DeserializeOperator {
             if (checkIfFilter(request, fieldList)) {
                 return briefMQMessages;
             }
+
             BriefMQMessage briefMQMessage = BriefMQMessage.builder()
                     .id(index)
                     .inlongGroupId(groupId)
@@ -75,12 +75,12 @@ public class RawMsgDeserializeOperator implements 
DeserializeOperator {
                     .body(body)
                     .fieldList(fieldList)
                     .build();
-            briefMQMessages.addAll(Collections.singletonList(briefMQMessage));
+            briefMQMessages.add(briefMQMessage);
             return briefMQMessages;
         } catch (Exception e) {
-            String errMsg = String.format("decode msg failed for groupId=%s, 
streamId=%s", groupId, streamId);
+            String errMsg = String.format("Failed to decode msg for 
groupId=%s, streamId=%s", groupId, streamId);
             log.error(errMsg, e);
-            throw new BusinessException(errMsg);
+            throw new BusinessException(errMsg + ", message: " + 
e.getMessage());
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
index 0e4983214c..95c817bc6a 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
@@ -448,18 +448,21 @@ public class PulsarOperator {
             int index, InlongStreamInfo streamInfo, int messagePosition, 
QueryMessageRequest request) {
         List<BriefMQMessage> briefMQMessages = new ArrayList<>();
         try {
-            ResponseEntity<byte[]> httpResponse =
-                    PulsarUtils.examineMessage(restTemplate, 
pulsarClusterInfo, topicPartition, "latest",
-                            messagePosition);
+            ResponseEntity<byte[]> httpResponse = 
PulsarUtils.examineMessage(restTemplate, pulsarClusterInfo,
+                    topicPartition, "latest", messagePosition);
             PulsarMessageInfo messageInfo = 
PulsarUtils.getMessageFromHttpResponse(httpResponse, topicPartition);
+            if (messageInfo == null) {
+                return briefMQMessages;
+            }
+
             Map<String, String> headers = messageInfo.getProperties();
             if (headers == null) {
                 headers = new HashMap<>();
             }
             MessageWrapType messageWrapType = 
MessageWrapType.forType(streamInfo.getWrapType());
-            if (headers.get(InlongConstants.MSG_ENCODE_VER) != null) {
-                messageWrapType =
-                        
MessageWrapType.valueOf(Integer.parseInt(headers.get(InlongConstants.MSG_ENCODE_VER)));
+            String encodeType = headers.get(InlongConstants.MSG_ENCODE_VER);
+            if (StringUtils.isNotBlank(encodeType)) {
+                messageWrapType = 
MessageWrapType.valueOf(Integer.parseInt(encodeType));
             }
             DeserializeOperator deserializeOperator = 
deserializeOperatorFactory.getInstance(messageWrapType);
             deserializeOperator.decodeMsg(streamInfo, briefMQMessages, 
messageInfo.getBody(), headers, index, request);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java
index 1f60c201bf..0b01ee2709 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java
@@ -19,6 +19,7 @@ package 
org.apache.inlong.manager.service.resource.queue.pulsar;
 
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.util.HttpUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
 import org.apache.inlong.manager.pojo.queue.pulsar.PulsarBrokerEntryMetadata;
 import org.apache.inlong.manager.pojo.queue.pulsar.PulsarLookupTopicInfo;
@@ -30,8 +31,6 @@ import 
org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicMetadata;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
 import com.google.gson.JsonObject;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.ObjectUtils;
@@ -73,7 +72,6 @@ public class PulsarUtils {
     public static final String LOOKUP_TOPIC_PATH = "/lookup/v2/topic";
     private static final DateTimeFormatter DATE_FORMAT = 
DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(
             ZoneId.systemDefault());
-    private static final Gson GSON = new GsonBuilder().create(); // thread safe
 
     private PulsarUtils() {
     }
@@ -174,7 +172,7 @@ public class PulsarUtils {
         MediaType type = MediaType.parseMediaType("application/json; 
charset=UTF-8");
         headers.setContentType(type);
         headers.add("Accept", MediaType.APPLICATION_JSON.toString());
-        String param = GSON.toJson(tenantInfo);
+        String param = JsonUtils.toJsonString(tenantInfo);
         HttpUtils.request(restTemplate, 
clusterInfo.getAdminUrls(QUERY_TENANTS_PATH + "/" + tenant), HttpMethod.PUT,
                 param, headers);
     }
@@ -195,7 +193,7 @@ public class PulsarUtils {
         MediaType type = MediaType.parseMediaType("application/json; 
charset=UTF-8");
         headers.setContentType(type);
         headers.add("Accept", MediaType.APPLICATION_JSON.toString());
-        String param = GSON.toJson(policies);
+        String param = JsonUtils.toJsonString(policies);
         param = param.replaceAll("messageTtlInSeconds", 
"message_ttl_in_seconds")
                 .replaceAll("retentionPolicies", "retention_policies");
         HttpUtils.request(restTemplate, 
clusterInfo.getAdminUrls(QUERY_NAMESPACE_PATH + InlongConstants.SLASH + tenant
@@ -534,10 +532,9 @@ public class PulsarUtils {
         throw new Exception(String.format("examine message failed for topic 
partition=%s", topicPartition));
     }
 
-    public static PulsarMessageInfo 
getMessageFromHttpResponse(ResponseEntity<byte[]> response, String topic)
-            throws Exception {
+    public static PulsarMessageInfo 
getMessageFromHttpResponse(ResponseEntity<byte[]> response, String topic) {
         List<PulsarMessageInfo> messages = 
PulsarUtils.getMessagesFromHttpResponse(response, topic);
-        if (messages.size() > 0) {
+        if (!messages.isEmpty()) {
             return messages.get(0);
         } else {
             return null;
@@ -547,13 +544,11 @@ public class PulsarUtils {
     /**
      * Copy from getMessagesFromHttpResponse method of 
org.apache.pulsar.client.admin.internal.TopicsImpl class.
      *
-     * @param response
-     * @param topic
-     * @return
-     * @throws Exception
+     * @param response response info
+     * @param topic topic name
+     * @return list of pulsar message info
      */
-    public static List<PulsarMessageInfo> 
getMessagesFromHttpResponse(ResponseEntity<byte[]> response, String topic)
-            throws Exception {
+    public static List<PulsarMessageInfo> 
getMessagesFromHttpResponse(ResponseEntity<byte[]> response, String topic) {
         HttpHeaders headers = response.getHeaders();
         String msgId = headers.getFirst("X-Pulsar-Message-ID");
         String brokerEntryTimestamp = 
headers.getFirst("X-Pulsar-Broker-Entry-METADATA-timestamp");
@@ -564,7 +559,7 @@ public class PulsarUtils {
         } else {
             brokerEntryMetadata = new PulsarBrokerEntryMetadata();
             if (brokerEntryTimestamp != null) {
-                
brokerEntryMetadata.setBrokerTimestamp(parse(brokerEntryTimestamp.toString()));
+                
brokerEntryMetadata.setBrokerTimestamp(parse(brokerEntryTimestamp));
             }
             if (brokerEntryIndex != null) {
                 brokerEntryMetadata.setIndex(Long.parseLong(brokerEntryIndex));
@@ -574,131 +569,131 @@ public class PulsarUtils {
         PulsarMessageMetadata messageMetadata = new PulsarMessageMetadata();
         Map<String, String> properties = Maps.newTreeMap();
 
-        Object tmp = headers.getFirst("X-Pulsar-publish-time");
+        String tmp = headers.getFirst("X-Pulsar-publish-time");
         if (tmp != null) {
-            messageMetadata.setPublishTime(parse(tmp.toString()));
+            messageMetadata.setPublishTime(parse(tmp));
         }
 
         tmp = headers.getFirst("X-Pulsar-event-time");
         if (tmp != null) {
-            messageMetadata.setEventTime(parse(tmp.toString()));
+            messageMetadata.setEventTime(parse(tmp));
         }
         tmp = headers.getFirst("X-Pulsar-deliver-at-time");
         if (tmp != null) {
-            messageMetadata.setDeliverAtTime(parse(tmp.toString()));
+            messageMetadata.setDeliverAtTime(parse(tmp));
         }
 
         tmp = headers.getFirst("X-Pulsar-null-value");
         if (tmp != null) {
-            messageMetadata.setNullValue(Boolean.parseBoolean(tmp.toString()));
+            messageMetadata.setNullValue(Boolean.parseBoolean(tmp));
         }
 
         tmp = headers.getFirst("X-Pulsar-producer-name");
         if (tmp != null) {
-            messageMetadata.setProducerName(tmp.toString());
+            messageMetadata.setProducerName(tmp);
         }
 
         tmp = headers.getFirst("X-Pulsar-sequence-id");
         if (tmp != null) {
-            messageMetadata.setSequenceId(Long.parseLong(tmp.toString()));
+            messageMetadata.setSequenceId(Long.parseLong(tmp));
         }
 
         tmp = headers.getFirst("X-Pulsar-replicated-from");
         if (tmp != null) {
-            messageMetadata.setReplicatedFrom(tmp.toString());
+            messageMetadata.setReplicatedFrom(tmp);
         }
 
         tmp = headers.getFirst("X-Pulsar-partition-key");
         if (tmp != null) {
-            messageMetadata.setPartitionKey(tmp.toString());
+            messageMetadata.setPartitionKey(tmp);
         }
 
         tmp = headers.getFirst("X-Pulsar-compression");
         if (tmp != null) {
-            messageMetadata.setCompression(tmp.toString());
+            messageMetadata.setCompression(tmp);
         }
 
         tmp = headers.getFirst("X-Pulsar-uncompressed-size");
         if (tmp != null) {
-            
messageMetadata.setUncompressedSize(Integer.parseInt(tmp.toString()));
+            messageMetadata.setUncompressedSize(Integer.parseInt(tmp));
         }
 
         tmp = headers.getFirst("X-Pulsar-encryption-algo");
         if (tmp != null) {
-            messageMetadata.setEncryptionAlgo(tmp.toString());
+            messageMetadata.setEncryptionAlgo(tmp);
         }
 
         tmp = headers.getFirst("X-Pulsar-partition-key-b64-encoded");
         if (tmp != null) {
-            
messageMetadata.setPartitionKeyB64Encoded(Boolean.parseBoolean(tmp.toString()));
+            
messageMetadata.setPartitionKeyB64Encoded(Boolean.parseBoolean(tmp));
         }
 
         tmp = headers.getFirst("X-Pulsar-marker-type");
         if (tmp != null) {
-            messageMetadata.setMarkerType(Integer.parseInt(tmp.toString()));
+            messageMetadata.setMarkerType(Integer.parseInt(tmp));
         }
 
         tmp = headers.getFirst("X-Pulsar-txnid-least-bits");
         if (tmp != null) {
-            messageMetadata.setTxnidLeastBits(Long.parseLong(tmp.toString()));
+            messageMetadata.setTxnidLeastBits(Long.parseLong(tmp));
         }
 
         tmp = headers.getFirst("X-Pulsar-txnid-most-bits");
         if (tmp != null) {
-            messageMetadata.setTxnidMostBits(Long.parseLong(tmp.toString()));
+            messageMetadata.setTxnidMostBits(Long.parseLong(tmp));
         }
 
         tmp = headers.getFirst("X-Pulsar-highest-sequence-id");
         if (tmp != null) {
-            
messageMetadata.setHighestSequenceId(Long.parseLong(tmp.toString()));
+            messageMetadata.setHighestSequenceId(Long.parseLong(tmp));
         }
 
         tmp = headers.getFirst("X-Pulsar-uuid");
         if (tmp != null) {
-            messageMetadata.setUuid(tmp.toString());
+            messageMetadata.setUuid(tmp);
         }
 
         tmp = headers.getFirst("X-Pulsar-num-chunks-from-msg");
         if (tmp != null) {
-            
messageMetadata.setNumChunksFromMsg(Integer.parseInt(tmp.toString()));
+            messageMetadata.setNumChunksFromMsg(Integer.parseInt(tmp));
         }
 
         tmp = headers.getFirst("X-Pulsar-total-chunk-msg-size");
         if (tmp != null) {
-            
messageMetadata.setTotalChunkMsgSize(Integer.parseInt(tmp.toString()));
+            messageMetadata.setTotalChunkMsgSize(Integer.parseInt(tmp));
         }
 
         tmp = headers.getFirst("X-Pulsar-chunk-id");
         if (tmp != null) {
-            messageMetadata.setChunkId(Integer.parseInt(tmp.toString()));
+            messageMetadata.setChunkId(Integer.parseInt(tmp));
         }
 
         tmp = headers.getFirst("X-Pulsar-null-partition-key");
         if (tmp != null) {
-            
messageMetadata.setNullPartitionKey(Boolean.parseBoolean(tmp.toString()));
+            messageMetadata.setNullPartitionKey(Boolean.parseBoolean(tmp));
         }
 
         tmp = headers.getFirst("X-Pulsar-Base64-encryption-param");
         if (tmp != null) {
-            
messageMetadata.setEncryptionParam(Base64.getDecoder().decode(tmp.toString()));
+            
messageMetadata.setEncryptionParam(Base64.getDecoder().decode(tmp));
         }
 
         tmp = headers.getFirst("X-Pulsar-Base64-ordering-key");
         if (tmp != null) {
-            
messageMetadata.setOrderingKey(Base64.getDecoder().decode(tmp.toString()));
+            messageMetadata.setOrderingKey(Base64.getDecoder().decode(tmp));
         }
 
         tmp = headers.getFirst("X-Pulsar-Base64-schema-version-b64encoded");
         if (tmp != null) {
-            
messageMetadata.setSchemaVersion(Base64.getDecoder().decode(tmp.toString()));
+            messageMetadata.setSchemaVersion(Base64.getDecoder().decode(tmp));
         }
 
         tmp = headers.getFirst("X-Pulsar-Base64-encryption-param");
         if (tmp != null) {
-            
messageMetadata.setEncryptionParam(Base64.getDecoder().decode(tmp.toString()));
+            
messageMetadata.setEncryptionParam(Base64.getDecoder().decode(tmp));
         }
 
-        List<String> tmpList = (List) headers.get("X-Pulsar-replicated-to");
+        List<String> tmpList = headers.get("X-Pulsar-replicated-to");
         if (ObjectUtils.isNotEmpty(tmpList)) {
             if (ObjectUtils.isEmpty(messageMetadata.getReplicateTos())) {
                 messageMetadata.setReplicateTos(Lists.newArrayList(tmpList));
@@ -715,7 +710,7 @@ public class PulsarUtils {
         for (Entry<String, List<String>> entry : headers.entrySet()) {
             if (entry.getKey().contains("X-Pulsar-PROPERTY-")) {
                 String keyName = 
entry.getKey().substring("X-Pulsar-PROPERTY-".length());
-                properties.put(keyName, (String) ((List) 
entry.getValue()).get(0));
+                properties.put(keyName, (String) ((List<?>) 
entry.getValue()).get(0));
             }
         }
 
@@ -726,7 +721,7 @@ public class PulsarUtils {
         boolean isEncrypted = false;
         tmp = headers.getFirst("X-Pulsar-Is-Encrypted");
         if (tmp != null) {
-            isEncrypted = Boolean.parseBoolean(tmp.toString());
+            isEncrypted = Boolean.parseBoolean(tmp);
         }
 
         if (!isEncrypted && headers.get("X-Pulsar-num-batch-message") != null) 
{
@@ -754,13 +749,13 @@ public class PulsarUtils {
     /**
      * Copy from getIndividualMsgsFromBatch method of 
org.apache.pulsar.client.admin.internal.TopicsImpl class.
      *
-     * @param topic
-     * @param msgId
-     * @param data
-     * @param properties
-     * @param metadata
-     * @param brokerMetadata
-     * @return
+     * @param topic topic name
+     * @param msgId message id
+     * @param data batch message data
+     * @param properties message properties
+     * @param metadata message metadata
+     * @param brokerMetadata broker metadata
+     * @return list of pulsar message info
      */
     private static List<PulsarMessageInfo> getIndividualMsgsFromBatch(String 
topic, String msgId, byte[] data,
             Map<String, String> properties, PulsarMessageMetadata metadata, 
PulsarBrokerEntryMetadata brokerMetadata) {
@@ -799,11 +794,11 @@ public class PulsarUtils {
     /**
      * Copy from deSerializeSingleMessageInBatch method of 
org.apache.pulsar.common.protocol.Commands class.
      *
-     * @param uncompressedPayload
-     * @param metadata
-     * @param index
-     * @param batchSize
-     * @return
+     * @param uncompressedPayload byte buffer containing uncompressed payload
+     * @param metadata message metadata
+     * @param index index of the message in the batch
+     * @param batchSize batch size
+     * @return byte buffer
      */
     private static ByteBuffer deSerializeSingleMessageInBatch(ByteBuffer 
uncompressedPayload,
             PulsarMessageMetadata metadata, int index, int batchSize) {
@@ -822,9 +817,9 @@ public class PulsarUtils {
     /**
      * Copy from parseFrom method of 
org.apache.pulsar.common.api.proto.SingleMessageMetadata class.
      *
-     * @param metadata
-     * @param buffer
-     * @param size
+     * @param metadata message metadata
+     * @param buffer byte buffer
+     * @param size message size
      */
     private static void metaDataParseFrom(PulsarMessageMetadata metadata, 
ByteBuffer buffer, int size) {
         int endIdx = size + buffer.position();
@@ -883,8 +878,8 @@ public class PulsarUtils {
     /**
      * Copy from readVarInt method of 
org.apache.pulsar.common.api.proto.LightProtoCodec class.
      *
-     * @param buf
-     * @return
+     * @param buf byte buffer
+     * @return value of int
      */
     private static int readVarInt(ByteBuffer buf) {
         byte tmp = buf.get();
@@ -923,8 +918,8 @@ public class PulsarUtils {
     /**
      * Copy from readVarInt64 method of 
org.apache.pulsar.common.api.proto.LightProtoCodec class.
      *
-     * @param buf
-     * @return
+     * @param buf byte buffer
+     * @return value of long
      */
     private static long readVarInt64(ByteBuffer buf) {
         int shift = 0;
@@ -941,8 +936,8 @@ public class PulsarUtils {
     /**
      * Copy from getTagType method of 
org.apache.pulsar.common.api.proto.LightProtoCodec class.
      *
-     * @param tag
-     * @return
+     * @param tag tag number
+     * @return tag type
      */
     private static int getTagType(int tag) {
         return tag & 7;
@@ -951,8 +946,8 @@ public class PulsarUtils {
     /**
      * Copy from skipUnknownField method of 
org.apache.pulsar.common.api.proto.LightProtoCodec class.
      *
-     * @param tag
-     * @param buffer
+     * @param tag tag number
+     * @param buffer byte buffer
      */
     private static void skipUnknownField(int tag, ByteBuffer buffer) {
         int tagType = getTagType(tag);
@@ -967,21 +962,23 @@ public class PulsarUtils {
                 int len = readVarInt(buffer);
                 buffer.get(new byte[len]);
                 break;
+            case 5:
+                buffer.get(new byte[4]);
+                break;
             case 3:
             case 4:
             default:
-                throw new IllegalArgumentException("Invalid unknonwn tag type: 
" + tagType);
-            case 5:
-                buffer.get(new byte[4]);
+                throw new IllegalArgumentException("Invalid tag type: " + 
tagType);
+
         }
     }
 
     /**
      * Copy from parseFrom method of 
org.apache.pulsar.common.api.proto.KeyValue class.
      *
-     * @param metadata
-     * @param buffer
-     * @param size
+     * @param metadata message metadata
+     * @param buffer byte buffer
+     * @param size message size
      */
     private static void parseFrom(PulsarMessageMetadata metadata, ByteBuffer 
buffer, int size) {
         if (ObjectUtils.isEmpty(metadata.getProperties())) {
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/message/DeserializeOperatorTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/message/DeserializeOperatorTest.java
new file mode 100644
index 0000000000..4908f57119
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/message/DeserializeOperatorTest.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.message;
+
+import org.apache.inlong.common.enums.MessageWrapType;
+import org.apache.inlong.manager.pojo.consume.BriefMQMessage.FieldInfo;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Unit tests for {@link 
DeserializeOperator#checkIfFilter(QueryMessageRequest, List)} method.
+ * This test class verifies the filtering logic for different operation types: 
=, !=, and like.
+ */
+public class DeserializeOperatorTest {
+
+    private DeserializeOperator operator;
+    private List<FieldInfo> fieldList;
+
+    @BeforeEach
+    void setUp() {
+        // Create a simple implementation for testing the default method
+        operator = new DeserializeOperator() {
+
+            @Override
+            public boolean accept(MessageWrapType type) {
+                return false;
+            }
+        };
+
+        // Initialize test field list
+        fieldList = Arrays.asList(
+                
FieldInfo.builder().fieldName("name").fieldValue("John").build(),
+                FieldInfo.builder().fieldName("age").fieldValue("25").build(),
+                
FieldInfo.builder().fieldName("email").fieldValue("[email protected]").build(),
+                
FieldInfo.builder().fieldName("nullField").fieldValue(null).build());
+    }
+
+    // ==================== Tests for precondition checks ====================
+
+    @Test
+    void testCheckIfFilter_whenFieldNameIsNull_shouldReturnFalse() {
+        QueryMessageRequest request = QueryMessageRequest.builder()
+                .fieldName(null)
+                .operationType("=")
+                .targetValue("John")
+                .build();
+
+        boolean result = operator.checkIfFilter(request, fieldList);
+
+        // Should not filter when fieldName is null
+        assertFalse(result);
+    }
+
+    @Test
+    void testCheckIfFilter_whenFieldNameIsBlank_shouldReturnFalse() {
+        QueryMessageRequest request = QueryMessageRequest.builder()
+                .fieldName("  ")
+                .operationType("=")
+                .targetValue("John")
+                .build();
+
+        boolean result = operator.checkIfFilter(request, fieldList);
+
+        // Should not filter when fieldName is blank
+        assertFalse(result);
+    }
+
+    @Test
+    void testCheckIfFilter_whenOperationTypeIsNull_shouldReturnFalse() {
+        QueryMessageRequest request = QueryMessageRequest.builder()
+                .fieldName("name")
+                .operationType(null)
+                .targetValue("John")
+                .build();
+
+        boolean result = operator.checkIfFilter(request, fieldList);
+
+        // Should not filter when operationType is null
+        assertFalse(result);
+    }
+
+    @Test
+    void testCheckIfFilter_whenOperationTypeIsBlank_shouldReturnFalse() {
+        QueryMessageRequest request = QueryMessageRequest.builder()
+                .fieldName("name")
+                .operationType("")
+                .targetValue("John")
+                .build();
+
+        boolean result = operator.checkIfFilter(request, fieldList);
+
+        // Should not filter when operationType is blank
+        assertFalse(result);
+    }
+
+    @Test
+    void testCheckIfFilter_whenFieldNotFound_shouldReturnFalse() {
+        QueryMessageRequest request = QueryMessageRequest.builder()
+                .fieldName("nonExistentField")
+                .operationType("=")
+                .targetValue("value")
+                .build();
+
+        boolean result = operator.checkIfFilter(request, fieldList);
+
+        // Should not filter when field is not found in the list
+        assertFalse(result);
+    }
+
+    @Test
+    void testCheckIfFilter_whenFieldListIsEmpty_shouldReturnFalse() {
+        QueryMessageRequest request = QueryMessageRequest.builder()
+                .fieldName("name")
+                .operationType("=")
+                .targetValue("John")
+                .build();
+
+        boolean result = operator.checkIfFilter(request, 
Collections.emptyList());
+
+        // Should not filter when field list is empty
+        assertFalse(result);
+    }
+
+    // ==================== Tests for equals (=) operation ====================
+
+    @Test
+    void testCheckIfFilter_equalsOperation_whenValuesMatch_shouldReturnFalse() 
{
+        QueryMessageRequest request = QueryMessageRequest.builder()
+                .fieldName("name")
+                .operationType("=")
+                .targetValue("John")
+                .build();
+
+        boolean result = operator.checkIfFilter(request, fieldList);
+
+        // targetValue equals fieldValue, so record should NOT be filtered
+        assertFalse(result);
+    }
+
+    @Test
+    void 
testCheckIfFilter_equalsOperation_whenValuesNotMatch_shouldReturnTrue() {
+        QueryMessageRequest request = QueryMessageRequest.builder()
+                .fieldName("name")
+                .operationType("=")
+                .targetValue("Jane")
+                .build();
+
+        boolean result = operator.checkIfFilter(request, fieldList);
+
+        // targetValue does not equal fieldValue, so record should be filtered
+        assertTrue(result);
+    }
+
+    @Test
+    void 
testCheckIfFilter_equalsOperation_whenTargetValueIsNull_andFieldValueIsNull_shouldReturnFalse()
 {
+        QueryMessageRequest request = QueryMessageRequest.builder()
+                .fieldName("nullField")
+                .operationType("=")
+                .targetValue(null)
+                .build();
+
+        boolean result = operator.checkIfFilter(request, fieldList);
+
+        // Both are null, `!Objects.equals` returns false, so record should 
NOT be filtered
+        assertFalse(result);
+    }
+
+    @Test
+    void 
testCheckIfFilter_equalsOperation_whenTargetValueIsNull_andFieldValueIsNotNull_shouldReturnTrue()
 {
+        QueryMessageRequest request = QueryMessageRequest.builder()
+                .fieldName("name")
+                .operationType("=")
+                .targetValue(null)
+                .build();
+
+        boolean result = operator.checkIfFilter(request, fieldList);
+
+        // targetValue is null but fieldValue is "John", so record should be 
filtered
+        assertTrue(result);
+    }
+
+    @Test
+    void 
testCheckIfFilter_equalsOperation_whenTargetValueIsNotNull_andFieldValueIsNull_shouldReturnTrue()
 {
+        QueryMessageRequest request = QueryMessageRequest.builder()
+                .fieldName("nullField")
+                .operationType("=")
+                .targetValue("someValue")
+                .build();
+
+        boolean result = operator.checkIfFilter(request, fieldList);
+
+        // targetValue is not null but fieldValue is null, so record should be 
filtered
+        assertTrue(result);
+    }
+
+    // ==================== Tests for not equals (!=) operation 
====================
+
+    @Test
+    void 
testCheckIfFilter_notEqualsOperation_whenValuesMatch_shouldReturnTrue() {
+        QueryMessageRequest request = QueryMessageRequest.builder()
+                .fieldName("name")
+                .operationType("!=")
+                .targetValue("John")
+                .build();
+
+        boolean result = operator.checkIfFilter(request, fieldList);
+
+        // targetValue equals fieldValue, so record should be filtered (we 
want records that are NOT equal)
+        assertTrue(result);
+    }
+
+    @Test
+    void 
testCheckIfFilter_notEqualsOperation_whenValuesNotMatch_shouldReturnFalse() {
+        QueryMessageRequest request = QueryMessageRequest.builder()
+                .fieldName("name")
+                .operationType("!=")
+                .targetValue("Jane")
+                .build();
+
+        boolean result = operator.checkIfFilter(request, fieldList);
+
+        // targetValue does not equal fieldValue, so record should NOT be 
filtered
+        assertFalse(result);
+    }
+
+    @Test
+    void 
testCheckIfFilter_notEqualsOperation_whenBothValuesAreNull_shouldReturnTrue() {
+        QueryMessageRequest request = QueryMessageRequest.builder()
+                .fieldName("nullField")
+                .operationType("!=")
+                .targetValue(null)
+                .build();
+
+        boolean result = operator.checkIfFilter(request, fieldList);
+
+        // Both are null, Objects.equals returns true, so record should be 
filtered
+        assertTrue(result);
+    }
+
+    @Test
+    void 
testCheckIfFilter_notEqualsOperation_whenTargetValueIsNull_andFieldValueIsNotNull_shouldReturnFalse()
 {
+        QueryMessageRequest request = QueryMessageRequest.builder()
+                .fieldName("name")
+                .operationType("!=")
+                .targetValue(null)
+                .build();
+
+        boolean result = operator.checkIfFilter(request, fieldList);
+
+        // targetValue is null but fieldValue is "John", they are not equal, 
so record should NOT be filtered
+        assertFalse(result);
+    }
+
+    // ==================== Tests for like operation ====================
+
+    @Test
+    void 
testCheckIfFilter_likeOperation_whenFieldValueContainsTargetValue_shouldReturnFalse()
 {
+        QueryMessageRequest request = QueryMessageRequest.builder()
+                .fieldName("email")
+                .operationType("like")
+                .targetValue("@example")
+                .build();
+
+        boolean result = operator.checkIfFilter(request, fieldList);
+
+        // fieldValue contains targetValue, so record should NOT be filtered
+        assertFalse(result);
+    }
+
+    @Test
+    void 
testCheckIfFilter_likeOperation_whenFieldValueNotContainsTargetValue_shouldReturnTrue()
 {
+        QueryMessageRequest request = QueryMessageRequest.builder()
+                .fieldName("email")
+                .operationType("like")
+                .targetValue("@gmail")
+                .build();
+
+        boolean result = operator.checkIfFilter(request, fieldList);
+
+        // fieldValue does not contain targetValue, so record should be 
filtered
+        assertTrue(result);
+    }
+
+    @Test
+    void 
testCheckIfFilter_likeOperation_whenFieldValueIsNull_shouldReturnTrue() {
+        QueryMessageRequest request = QueryMessageRequest.builder()
+                .fieldName("nullField")
+                .operationType("like")
+                .targetValue("anyValue")
+                .build();
+
+        boolean result = operator.checkIfFilter(request, fieldList);
+
+        // fieldValue is null, like operation is invalid, so record should be 
filtered
+        assertTrue(result);
+    }
+
+    @Test
+    void 
testCheckIfFilter_likeOperation_whenTargetValueIsNull_shouldReturnTrue() {
+        QueryMessageRequest request = QueryMessageRequest.builder()
+                .fieldName("name")
+                .operationType("like")
+                .targetValue(null)
+                .build();
+
+        boolean result = operator.checkIfFilter(request, fieldList);
+
+        // targetValue is null, like operation is invalid, so record should be 
filtered
+        assertTrue(result);
+    }
+
+    @Test
+    void 
testCheckIfFilter_likeOperation_whenTargetValueIsExactMatch_shouldReturnFalse() 
{
+        QueryMessageRequest request = QueryMessageRequest.builder()
+                .fieldName("name")
+                .operationType("like")
+                .targetValue("John")
+                .build();
+
+        boolean result = operator.checkIfFilter(request, fieldList);
+
+        // fieldValue exactly matches targetValue (contains it), so record 
should NOT be filtered
+        assertFalse(result);
+    }
+
+    @Test
+    void 
testCheckIfFilter_likeOperation_whenTargetValueIsPartialMatch_shouldReturnFalse()
 {
+        QueryMessageRequest request = QueryMessageRequest.builder()
+                .fieldName("name")
+                .operationType("like")
+                .targetValue("oh")
+                .build();
+
+        boolean result = operator.checkIfFilter(request, fieldList);
+
+        // fieldValue "John" contains "oh", so record should NOT be filtered
+        assertFalse(result);
+    }
+
+    @Test
+    void testCheckIfFilter_likeOperation_caseSensitive_shouldReturnTrue() {
+        QueryMessageRequest request = QueryMessageRequest.builder()
+                .fieldName("name")
+                .operationType("like")
+                .targetValue("JOHN")
+                .build();
+
+        boolean result = operator.checkIfFilter(request, fieldList);
+
+        // String.contains is case-sensitive, "John" does not contain "JOHN", 
so record should be filtered
+        assertTrue(result);
+    }
+
+    // ==================== Tests for unknown operation type 
====================
+
+    @Test
+    void testCheckIfFilter_unknownOperation_shouldReturnFalse() {
+        QueryMessageRequest request = QueryMessageRequest.builder()
+                .fieldName("name")
+                .operationType("unknown")
+                .targetValue("John")
+                .build();
+
+        boolean result = operator.checkIfFilter(request, fieldList);
+
+        // Unknown operation type, ifFilter remains false (default value)
+        assertFalse(result);
+    }
+
+    @Test
+    void testCheckIfFilter_greaterThanOperation_shouldReturnFalse() {
+        QueryMessageRequest request = QueryMessageRequest.builder()
+                .fieldName("age")
+                .operationType(">")
+                .targetValue("20")
+                .build();
+
+        boolean result = operator.checkIfFilter(request, fieldList);
+
+        // Greater than operation is not implemented, ifFilter remains false
+        assertFalse(result);
+    }
+}


Reply via email to