This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3cf96858c5 [INLONG-12071][Manager] Fix the incorrect filtering logic
that checks whether messages match (#12072)
3cf96858c5 is described below
commit 3cf96858c58afd7fe25dca3ce9e40ab98031e157
Author: healchow <[email protected]>
AuthorDate: Wed Feb 25 09:56:19 2026 +0800
[INLONG-12071][Manager] Fix the incorrect filtering logic that checks
whether messages match (#12072)
* [INLONG-12071][Manager] Fix the filtering logic that incorrectly
determines whether messages match
* [INLONG-12071][Manager] Fix the compile warning issues
---
.../manager/pojo/consume/BriefMQMessage.java | 2 +-
.../service/message/DeserializeOperator.java | 48 ++-
.../service/message/RawMsgDeserializeOperator.java | 8 +-
.../resource/queue/pulsar/PulsarOperator.java | 15 +-
.../service/resource/queue/pulsar/PulsarUtils.java | 143 ++++----
.../service/message/DeserializeOperatorTest.java | 407 +++++++++++++++++++++
6 files changed, 522 insertions(+), 101 deletions(-)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BriefMQMessage.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BriefMQMessage.java
index 6161684b43..d275d5f7ca 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BriefMQMessage.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BriefMQMessage.java
@@ -72,7 +72,7 @@ public class BriefMQMessage {
private String fieldName;
- private String FieldValue;
+ private String fieldValue;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperator.java
index 1fb898da99..5d6076bba3 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperator.java
@@ -70,27 +70,41 @@ public interface DeserializeOperator {
streamInfo.getWrapType()));
}
- default Boolean checkIfFilter(QueryMessageRequest request, List<FieldInfo>
streamFieldList) {
- if (StringUtils.isBlank(request.getFieldName()) ||
StringUtils.isBlank(request.getOperationType())
- || StringUtils.isBlank(request.getTargetValue())) {
+ /**
+ * Check whether the message should be filtered by the filter condition
+ *
+ * @param request query message request
+ * @param streamFieldList stream field list
+ * @return true if the message should be filtered
+ */
+ default boolean checkIfFilter(QueryMessageRequest request, List<FieldInfo>
streamFieldList) {
+ String fieldName = request.getFieldName();
+ if (StringUtils.isAnyBlank(fieldName, request.getOperationType())) {
return false;
}
- boolean ifFilter = false;
+
FieldInfo fieldInfo = streamFieldList.stream()
- .filter(v -> Objects.equals(v.getFieldName(),
request.getFieldName())).findFirst()
+ .filter(field -> fieldName.equals(field.getFieldName()))
+ .findFirst()
.orElse(null);
- if (fieldInfo != null) {
- switch (request.getOperationType()) {
- case "=":
- ifFilter = !Objects.equals(request.getTargetValue(),
fieldInfo.getFieldValue());
- break;
- case "!=":
- ifFilter = Objects.equals(request.getTargetValue(),
fieldInfo.getFieldValue());
- break;
- case "like":
- ifFilter = fieldInfo.getFieldValue() != null
- &&
!fieldInfo.getFieldValue().contains(request.getTargetValue());
- }
+ if (fieldInfo == null) {
+ return false;
+ }
+
+ boolean ifFilter = false;
+ String fieldValue = fieldInfo.getFieldValue();
+ // support targetValue == blank string (null or "") or targetValue !=
blank string
+ String targetValue = request.getTargetValue();
+ switch (request.getOperationType()) {
+ case "=":
+ ifFilter = !Objects.equals(targetValue, fieldValue);
+ break;
+ case "!=":
+ ifFilter = Objects.equals(targetValue, fieldValue);
+ break;
+ case "like":
+ // fieldValue or targetValue is null, like operation is
invalid, so this record should be filtered
+ ifFilter = fieldValue == null || targetValue == null ||
!fieldValue.contains(targetValue);
}
return ifFilter;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
index d8ac5c8a9a..dcb44c1509 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
@@ -35,7 +35,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.charset.Charset;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -65,6 +64,7 @@ public class RawMsgDeserializeOperator implements
DeserializeOperator {
if (checkIfFilter(request, fieldList)) {
return briefMQMessages;
}
+
BriefMQMessage briefMQMessage = BriefMQMessage.builder()
.id(index)
.inlongGroupId(groupId)
@@ -75,12 +75,12 @@ public class RawMsgDeserializeOperator implements
DeserializeOperator {
.body(body)
.fieldList(fieldList)
.build();
- briefMQMessages.addAll(Collections.singletonList(briefMQMessage));
+ briefMQMessages.add(briefMQMessage);
return briefMQMessages;
} catch (Exception e) {
- String errMsg = String.format("decode msg failed for groupId=%s,
streamId=%s", groupId, streamId);
+ String errMsg = String.format("Failed to decode msg for
groupId=%s, streamId=%s", groupId, streamId);
log.error(errMsg, e);
- throw new BusinessException(errMsg);
+ throw new BusinessException(errMsg + ", message: " +
e.getMessage());
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
index 0e4983214c..95c817bc6a 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
@@ -448,18 +448,21 @@ public class PulsarOperator {
int index, InlongStreamInfo streamInfo, int messagePosition,
QueryMessageRequest request) {
List<BriefMQMessage> briefMQMessages = new ArrayList<>();
try {
- ResponseEntity<byte[]> httpResponse =
- PulsarUtils.examineMessage(restTemplate,
pulsarClusterInfo, topicPartition, "latest",
- messagePosition);
+ ResponseEntity<byte[]> httpResponse =
PulsarUtils.examineMessage(restTemplate, pulsarClusterInfo,
+ topicPartition, "latest", messagePosition);
PulsarMessageInfo messageInfo =
PulsarUtils.getMessageFromHttpResponse(httpResponse, topicPartition);
+ if (messageInfo == null) {
+ return briefMQMessages;
+ }
+
Map<String, String> headers = messageInfo.getProperties();
if (headers == null) {
headers = new HashMap<>();
}
MessageWrapType messageWrapType =
MessageWrapType.forType(streamInfo.getWrapType());
- if (headers.get(InlongConstants.MSG_ENCODE_VER) != null) {
- messageWrapType =
-
MessageWrapType.valueOf(Integer.parseInt(headers.get(InlongConstants.MSG_ENCODE_VER)));
+ String encodeType = headers.get(InlongConstants.MSG_ENCODE_VER);
+ if (StringUtils.isNotBlank(encodeType)) {
+ messageWrapType =
MessageWrapType.valueOf(Integer.parseInt(encodeType));
}
DeserializeOperator deserializeOperator =
deserializeOperatorFactory.getInstance(messageWrapType);
deserializeOperator.decodeMsg(streamInfo, briefMQMessages,
messageInfo.getBody(), headers, index, request);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java
index 1f60c201bf..0b01ee2709 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarUtils.java
@@ -19,6 +19,7 @@ package
org.apache.inlong.manager.service.resource.queue.pulsar;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.util.HttpUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarBrokerEntryMetadata;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarLookupTopicInfo;
@@ -30,8 +31,6 @@ import
org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicMetadata;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
@@ -73,7 +72,6 @@ public class PulsarUtils {
public static final String LOOKUP_TOPIC_PATH = "/lookup/v2/topic";
private static final DateTimeFormatter DATE_FORMAT =
DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(
ZoneId.systemDefault());
- private static final Gson GSON = new GsonBuilder().create(); // thread safe
private PulsarUtils() {
}
@@ -174,7 +172,7 @@ public class PulsarUtils {
MediaType type = MediaType.parseMediaType("application/json;
charset=UTF-8");
headers.setContentType(type);
headers.add("Accept", MediaType.APPLICATION_JSON.toString());
- String param = GSON.toJson(tenantInfo);
+ String param = JsonUtils.toJsonString(tenantInfo);
HttpUtils.request(restTemplate,
clusterInfo.getAdminUrls(QUERY_TENANTS_PATH + "/" + tenant), HttpMethod.PUT,
param, headers);
}
@@ -195,7 +193,7 @@ public class PulsarUtils {
MediaType type = MediaType.parseMediaType("application/json;
charset=UTF-8");
headers.setContentType(type);
headers.add("Accept", MediaType.APPLICATION_JSON.toString());
- String param = GSON.toJson(policies);
+ String param = JsonUtils.toJsonString(policies);
param = param.replaceAll("messageTtlInSeconds",
"message_ttl_in_seconds")
.replaceAll("retentionPolicies", "retention_policies");
HttpUtils.request(restTemplate,
clusterInfo.getAdminUrls(QUERY_NAMESPACE_PATH + InlongConstants.SLASH + tenant
@@ -534,10 +532,9 @@ public class PulsarUtils {
throw new Exception(String.format("examine message failed for topic
partition=%s", topicPartition));
}
- public static PulsarMessageInfo
getMessageFromHttpResponse(ResponseEntity<byte[]> response, String topic)
- throws Exception {
+ public static PulsarMessageInfo
getMessageFromHttpResponse(ResponseEntity<byte[]> response, String topic) {
List<PulsarMessageInfo> messages =
PulsarUtils.getMessagesFromHttpResponse(response, topic);
- if (messages.size() > 0) {
+ if (!messages.isEmpty()) {
return messages.get(0);
} else {
return null;
@@ -547,13 +544,11 @@ public class PulsarUtils {
/**
* Copy from getMessagesFromHttpResponse method of
org.apache.pulsar.client.admin.internal.TopicsImpl class.
*
- * @param response
- * @param topic
- * @return
- * @throws Exception
+ * @param response response info
+ * @param topic topic name
+ * @return list of pulsar message info
*/
- public static List<PulsarMessageInfo>
getMessagesFromHttpResponse(ResponseEntity<byte[]> response, String topic)
- throws Exception {
+ public static List<PulsarMessageInfo>
getMessagesFromHttpResponse(ResponseEntity<byte[]> response, String topic) {
HttpHeaders headers = response.getHeaders();
String msgId = headers.getFirst("X-Pulsar-Message-ID");
String brokerEntryTimestamp =
headers.getFirst("X-Pulsar-Broker-Entry-METADATA-timestamp");
@@ -564,7 +559,7 @@ public class PulsarUtils {
} else {
brokerEntryMetadata = new PulsarBrokerEntryMetadata();
if (brokerEntryTimestamp != null) {
-
brokerEntryMetadata.setBrokerTimestamp(parse(brokerEntryTimestamp.toString()));
+
brokerEntryMetadata.setBrokerTimestamp(parse(brokerEntryTimestamp));
}
if (brokerEntryIndex != null) {
brokerEntryMetadata.setIndex(Long.parseLong(brokerEntryIndex));
@@ -574,131 +569,131 @@ public class PulsarUtils {
PulsarMessageMetadata messageMetadata = new PulsarMessageMetadata();
Map<String, String> properties = Maps.newTreeMap();
- Object tmp = headers.getFirst("X-Pulsar-publish-time");
+ String tmp = headers.getFirst("X-Pulsar-publish-time");
if (tmp != null) {
- messageMetadata.setPublishTime(parse(tmp.toString()));
+ messageMetadata.setPublishTime(parse(tmp));
}
tmp = headers.getFirst("X-Pulsar-event-time");
if (tmp != null) {
- messageMetadata.setEventTime(parse(tmp.toString()));
+ messageMetadata.setEventTime(parse(tmp));
}
tmp = headers.getFirst("X-Pulsar-deliver-at-time");
if (tmp != null) {
- messageMetadata.setDeliverAtTime(parse(tmp.toString()));
+ messageMetadata.setDeliverAtTime(parse(tmp));
}
tmp = headers.getFirst("X-Pulsar-null-value");
if (tmp != null) {
- messageMetadata.setNullValue(Boolean.parseBoolean(tmp.toString()));
+ messageMetadata.setNullValue(Boolean.parseBoolean(tmp));
}
tmp = headers.getFirst("X-Pulsar-producer-name");
if (tmp != null) {
- messageMetadata.setProducerName(tmp.toString());
+ messageMetadata.setProducerName(tmp);
}
tmp = headers.getFirst("X-Pulsar-sequence-id");
if (tmp != null) {
- messageMetadata.setSequenceId(Long.parseLong(tmp.toString()));
+ messageMetadata.setSequenceId(Long.parseLong(tmp));
}
tmp = headers.getFirst("X-Pulsar-replicated-from");
if (tmp != null) {
- messageMetadata.setReplicatedFrom(tmp.toString());
+ messageMetadata.setReplicatedFrom(tmp);
}
tmp = headers.getFirst("X-Pulsar-partition-key");
if (tmp != null) {
- messageMetadata.setPartitionKey(tmp.toString());
+ messageMetadata.setPartitionKey(tmp);
}
tmp = headers.getFirst("X-Pulsar-compression");
if (tmp != null) {
- messageMetadata.setCompression(tmp.toString());
+ messageMetadata.setCompression(tmp);
}
tmp = headers.getFirst("X-Pulsar-uncompressed-size");
if (tmp != null) {
-
messageMetadata.setUncompressedSize(Integer.parseInt(tmp.toString()));
+ messageMetadata.setUncompressedSize(Integer.parseInt(tmp));
}
tmp = headers.getFirst("X-Pulsar-encryption-algo");
if (tmp != null) {
- messageMetadata.setEncryptionAlgo(tmp.toString());
+ messageMetadata.setEncryptionAlgo(tmp);
}
tmp = headers.getFirst("X-Pulsar-partition-key-b64-encoded");
if (tmp != null) {
-
messageMetadata.setPartitionKeyB64Encoded(Boolean.parseBoolean(tmp.toString()));
+
messageMetadata.setPartitionKeyB64Encoded(Boolean.parseBoolean(tmp));
}
tmp = headers.getFirst("X-Pulsar-marker-type");
if (tmp != null) {
- messageMetadata.setMarkerType(Integer.parseInt(tmp.toString()));
+ messageMetadata.setMarkerType(Integer.parseInt(tmp));
}
tmp = headers.getFirst("X-Pulsar-txnid-least-bits");
if (tmp != null) {
- messageMetadata.setTxnidLeastBits(Long.parseLong(tmp.toString()));
+ messageMetadata.setTxnidLeastBits(Long.parseLong(tmp));
}
tmp = headers.getFirst("X-Pulsar-txnid-most-bits");
if (tmp != null) {
- messageMetadata.setTxnidMostBits(Long.parseLong(tmp.toString()));
+ messageMetadata.setTxnidMostBits(Long.parseLong(tmp));
}
tmp = headers.getFirst("X-Pulsar-highest-sequence-id");
if (tmp != null) {
-
messageMetadata.setHighestSequenceId(Long.parseLong(tmp.toString()));
+ messageMetadata.setHighestSequenceId(Long.parseLong(tmp));
}
tmp = headers.getFirst("X-Pulsar-uuid");
if (tmp != null) {
- messageMetadata.setUuid(tmp.toString());
+ messageMetadata.setUuid(tmp);
}
tmp = headers.getFirst("X-Pulsar-num-chunks-from-msg");
if (tmp != null) {
-
messageMetadata.setNumChunksFromMsg(Integer.parseInt(tmp.toString()));
+ messageMetadata.setNumChunksFromMsg(Integer.parseInt(tmp));
}
tmp = headers.getFirst("X-Pulsar-total-chunk-msg-size");
if (tmp != null) {
-
messageMetadata.setTotalChunkMsgSize(Integer.parseInt(tmp.toString()));
+ messageMetadata.setTotalChunkMsgSize(Integer.parseInt(tmp));
}
tmp = headers.getFirst("X-Pulsar-chunk-id");
if (tmp != null) {
- messageMetadata.setChunkId(Integer.parseInt(tmp.toString()));
+ messageMetadata.setChunkId(Integer.parseInt(tmp));
}
tmp = headers.getFirst("X-Pulsar-null-partition-key");
if (tmp != null) {
-
messageMetadata.setNullPartitionKey(Boolean.parseBoolean(tmp.toString()));
+ messageMetadata.setNullPartitionKey(Boolean.parseBoolean(tmp));
}
tmp = headers.getFirst("X-Pulsar-Base64-encryption-param");
if (tmp != null) {
-
messageMetadata.setEncryptionParam(Base64.getDecoder().decode(tmp.toString()));
+
messageMetadata.setEncryptionParam(Base64.getDecoder().decode(tmp));
}
tmp = headers.getFirst("X-Pulsar-Base64-ordering-key");
if (tmp != null) {
-
messageMetadata.setOrderingKey(Base64.getDecoder().decode(tmp.toString()));
+ messageMetadata.setOrderingKey(Base64.getDecoder().decode(tmp));
}
tmp = headers.getFirst("X-Pulsar-Base64-schema-version-b64encoded");
if (tmp != null) {
-
messageMetadata.setSchemaVersion(Base64.getDecoder().decode(tmp.toString()));
+ messageMetadata.setSchemaVersion(Base64.getDecoder().decode(tmp));
}
tmp = headers.getFirst("X-Pulsar-Base64-encryption-param");
if (tmp != null) {
-
messageMetadata.setEncryptionParam(Base64.getDecoder().decode(tmp.toString()));
+
messageMetadata.setEncryptionParam(Base64.getDecoder().decode(tmp));
}
- List<String> tmpList = (List) headers.get("X-Pulsar-replicated-to");
+ List<String> tmpList = headers.get("X-Pulsar-replicated-to");
if (ObjectUtils.isNotEmpty(tmpList)) {
if (ObjectUtils.isEmpty(messageMetadata.getReplicateTos())) {
messageMetadata.setReplicateTos(Lists.newArrayList(tmpList));
@@ -715,7 +710,7 @@ public class PulsarUtils {
for (Entry<String, List<String>> entry : headers.entrySet()) {
if (entry.getKey().contains("X-Pulsar-PROPERTY-")) {
String keyName =
entry.getKey().substring("X-Pulsar-PROPERTY-".length());
- properties.put(keyName, (String) ((List)
entry.getValue()).get(0));
+ properties.put(keyName, (String) ((List<?>)
entry.getValue()).get(0));
}
}
@@ -726,7 +721,7 @@ public class PulsarUtils {
boolean isEncrypted = false;
tmp = headers.getFirst("X-Pulsar-Is-Encrypted");
if (tmp != null) {
- isEncrypted = Boolean.parseBoolean(tmp.toString());
+ isEncrypted = Boolean.parseBoolean(tmp);
}
if (!isEncrypted && headers.get("X-Pulsar-num-batch-message") != null)
{
@@ -754,13 +749,13 @@ public class PulsarUtils {
/**
* Copy from getIndividualMsgsFromBatch method of
org.apache.pulsar.client.admin.internal.TopicsImpl class.
*
- * @param topic
- * @param msgId
- * @param data
- * @param properties
- * @param metadata
- * @param brokerMetadata
- * @return
+ * @param topic topic name
+ * @param msgId message id
+ * @param data batch message data
+ * @param properties message properties
+ * @param metadata message metadata
+ * @param brokerMetadata broker metadata
+ * @return list of pulsar message info
*/
private static List<PulsarMessageInfo> getIndividualMsgsFromBatch(String
topic, String msgId, byte[] data,
Map<String, String> properties, PulsarMessageMetadata metadata,
PulsarBrokerEntryMetadata brokerMetadata) {
@@ -799,11 +794,11 @@ public class PulsarUtils {
/**
* Copy from deSerializeSingleMessageInBatch method of
org.apache.pulsar.common.protocol.Commands class.
*
- * @param uncompressedPayload
- * @param metadata
- * @param index
- * @param batchSize
- * @return
+ * @param uncompressedPayload byte buffer containing uncompressed payload
+ * @param metadata message metadata
+ * @param index index of the message in the batch
+ * @param batchSize batch size
+ * @return byte buffer
*/
private static ByteBuffer deSerializeSingleMessageInBatch(ByteBuffer
uncompressedPayload,
PulsarMessageMetadata metadata, int index, int batchSize) {
@@ -822,9 +817,9 @@ public class PulsarUtils {
/**
* Copy from parseFrom method of
org.apache.pulsar.common.api.proto.SingleMessageMetadata class.
*
- * @param metadata
- * @param buffer
- * @param size
+ * @param metadata message metadata
+ * @param buffer byte buffer
+ * @param size message size
*/
private static void metaDataParseFrom(PulsarMessageMetadata metadata,
ByteBuffer buffer, int size) {
int endIdx = size + buffer.position();
@@ -883,8 +878,8 @@ public class PulsarUtils {
/**
* Copy from readVarInt method of
org.apache.pulsar.common.api.proto.LightProtoCodec class.
*
- * @param buf
- * @return
+ * @param buf byte buffer
+ * @return value of int
*/
private static int readVarInt(ByteBuffer buf) {
byte tmp = buf.get();
@@ -923,8 +918,8 @@ public class PulsarUtils {
/**
* Copy from readVarInt64 method of
org.apache.pulsar.common.api.proto.LightProtoCodec class.
*
- * @param buf
- * @return
+ * @param buf byte buffer
+ * @return value of long
*/
private static long readVarInt64(ByteBuffer buf) {
int shift = 0;
@@ -941,8 +936,8 @@ public class PulsarUtils {
/**
* Copy from getTagType method of
org.apache.pulsar.common.api.proto.LightProtoCodec class.
*
- * @param tag
- * @return
+ * @param tag tag number
+ * @return tag type
*/
private static int getTagType(int tag) {
return tag & 7;
@@ -951,8 +946,8 @@ public class PulsarUtils {
/**
* Copy from skipUnknownField method of
org.apache.pulsar.common.api.proto.LightProtoCodec class.
*
- * @param tag
- * @param buffer
+ * @param tag tag number
+ * @param buffer byte buffer
*/
private static void skipUnknownField(int tag, ByteBuffer buffer) {
int tagType = getTagType(tag);
@@ -967,21 +962,23 @@ public class PulsarUtils {
int len = readVarInt(buffer);
buffer.get(new byte[len]);
break;
+ case 5:
+ buffer.get(new byte[4]);
+ break;
case 3:
case 4:
default:
- throw new IllegalArgumentException("Invalid unknonwn tag type:
" + tagType);
- case 5:
- buffer.get(new byte[4]);
+ throw new IllegalArgumentException("Invalid tag type: " +
tagType);
+
}
}
/**
* Copy from parseFrom method of
org.apache.pulsar.common.api.proto.KeyValue class.
*
- * @param metadata
- * @param buffer
- * @param size
+ * @param metadata message metadata
+ * @param buffer byte buffer
+ * @param size message size
*/
private static void parseFrom(PulsarMessageMetadata metadata, ByteBuffer
buffer, int size) {
if (ObjectUtils.isEmpty(metadata.getProperties())) {
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/message/DeserializeOperatorTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/message/DeserializeOperatorTest.java
new file mode 100644
index 0000000000..4908f57119
--- /dev/null
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/message/DeserializeOperatorTest.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.message;
+
+import org.apache.inlong.common.enums.MessageWrapType;
+import org.apache.inlong.manager.pojo.consume.BriefMQMessage.FieldInfo;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Unit tests for {@link
DeserializeOperator#checkIfFilter(QueryMessageRequest, List)} method.
+ * This test class verifies the filtering logic for different operation types:
=, !=, and like.
+ */
+public class DeserializeOperatorTest {
+
+ private DeserializeOperator operator;
+ private List<FieldInfo> fieldList;
+
+ @BeforeEach
+ void setUp() {
+ // Create a simple implementation for testing the default method
+ operator = new DeserializeOperator() {
+
+ @Override
+ public boolean accept(MessageWrapType type) {
+ return false;
+ }
+ };
+
+ // Initialize test field list
+ fieldList = Arrays.asList(
+
FieldInfo.builder().fieldName("name").fieldValue("John").build(),
+ FieldInfo.builder().fieldName("age").fieldValue("25").build(),
+
FieldInfo.builder().fieldName("email").fieldValue("[email protected]").build(),
+
FieldInfo.builder().fieldName("nullField").fieldValue(null).build());
+ }
+
+ // ==================== Tests for precondition checks ====================
+
+ @Test
+ void testCheckIfFilter_whenFieldNameIsNull_shouldReturnFalse() {
+ QueryMessageRequest request = QueryMessageRequest.builder()
+ .fieldName(null)
+ .operationType("=")
+ .targetValue("John")
+ .build();
+
+ boolean result = operator.checkIfFilter(request, fieldList);
+
+ // Should not filter when fieldName is null
+ assertFalse(result);
+ }
+
+ @Test
+ void testCheckIfFilter_whenFieldNameIsBlank_shouldReturnFalse() {
+ QueryMessageRequest request = QueryMessageRequest.builder()
+ .fieldName(" ")
+ .operationType("=")
+ .targetValue("John")
+ .build();
+
+ boolean result = operator.checkIfFilter(request, fieldList);
+
+ // Should not filter when fieldName is blank
+ assertFalse(result);
+ }
+
+ @Test
+ void testCheckIfFilter_whenOperationTypeIsNull_shouldReturnFalse() {
+ QueryMessageRequest request = QueryMessageRequest.builder()
+ .fieldName("name")
+ .operationType(null)
+ .targetValue("John")
+ .build();
+
+ boolean result = operator.checkIfFilter(request, fieldList);
+
+ // Should not filter when operationType is null
+ assertFalse(result);
+ }
+
+ @Test
+ void testCheckIfFilter_whenOperationTypeIsBlank_shouldReturnFalse() {
+ QueryMessageRequest request = QueryMessageRequest.builder()
+ .fieldName("name")
+ .operationType("")
+ .targetValue("John")
+ .build();
+
+ boolean result = operator.checkIfFilter(request, fieldList);
+
+ // Should not filter when operationType is blank
+ assertFalse(result);
+ }
+
+ @Test
+ void testCheckIfFilter_whenFieldNotFound_shouldReturnFalse() {
+ QueryMessageRequest request = QueryMessageRequest.builder()
+ .fieldName("nonExistentField")
+ .operationType("=")
+ .targetValue("value")
+ .build();
+
+ boolean result = operator.checkIfFilter(request, fieldList);
+
+ // Should not filter when field is not found in the list
+ assertFalse(result);
+ }
+
+ @Test
+ void testCheckIfFilter_whenFieldListIsEmpty_shouldReturnFalse() {
+ QueryMessageRequest request = QueryMessageRequest.builder()
+ .fieldName("name")
+ .operationType("=")
+ .targetValue("John")
+ .build();
+
+ boolean result = operator.checkIfFilter(request,
Collections.emptyList());
+
+ // Should not filter when field list is empty
+ assertFalse(result);
+ }
+
+ // ==================== Tests for equals (=) operation ====================
+
+ @Test
+ void testCheckIfFilter_equalsOperation_whenValuesMatch_shouldReturnFalse()
{
+ QueryMessageRequest request = QueryMessageRequest.builder()
+ .fieldName("name")
+ .operationType("=")
+ .targetValue("John")
+ .build();
+
+ boolean result = operator.checkIfFilter(request, fieldList);
+
+ // targetValue equals fieldValue, so record should NOT be filtered
+ assertFalse(result);
+ }
+
+ @Test
+ void
testCheckIfFilter_equalsOperation_whenValuesNotMatch_shouldReturnTrue() {
+ QueryMessageRequest request = QueryMessageRequest.builder()
+ .fieldName("name")
+ .operationType("=")
+ .targetValue("Jane")
+ .build();
+
+ boolean result = operator.checkIfFilter(request, fieldList);
+
+ // targetValue does not equal fieldValue, so record should be filtered
+ assertTrue(result);
+ }
+
+ @Test
+ void
testCheckIfFilter_equalsOperation_whenTargetValueIsNull_andFieldValueIsNull_shouldReturnFalse()
{
+ QueryMessageRequest request = QueryMessageRequest.builder()
+ .fieldName("nullField")
+ .operationType("=")
+ .targetValue(null)
+ .build();
+
+ boolean result = operator.checkIfFilter(request, fieldList);
+
+ // Both are null, `!Objects.equals` returns false, so record should
NOT be filtered
+ assertFalse(result);
+ }
+
+ @Test
+ void
testCheckIfFilter_equalsOperation_whenTargetValueIsNull_andFieldValueIsNotNull_shouldReturnTrue()
{
+ QueryMessageRequest request = QueryMessageRequest.builder()
+ .fieldName("name")
+ .operationType("=")
+ .targetValue(null)
+ .build();
+
+ boolean result = operator.checkIfFilter(request, fieldList);
+
+ // targetValue is null but fieldValue is "John", so record should be
filtered
+ assertTrue(result);
+ }
+
+ @Test
+ void
testCheckIfFilter_equalsOperation_whenTargetValueIsNotNull_andFieldValueIsNull_shouldReturnTrue()
{
+ QueryMessageRequest request = QueryMessageRequest.builder()
+ .fieldName("nullField")
+ .operationType("=")
+ .targetValue("someValue")
+ .build();
+
+ boolean result = operator.checkIfFilter(request, fieldList);
+
+ // targetValue is not null but fieldValue is null, so record should be
filtered
+ assertTrue(result);
+ }
+
+ // ==================== Tests for not equals (!=) operation
====================
+
+ @Test
+ void
testCheckIfFilter_notEqualsOperation_whenValuesMatch_shouldReturnTrue() {
+ QueryMessageRequest request = QueryMessageRequest.builder()
+ .fieldName("name")
+ .operationType("!=")
+ .targetValue("John")
+ .build();
+
+ boolean result = operator.checkIfFilter(request, fieldList);
+
+ // targetValue equals fieldValue, so record should be filtered (we
want records that are NOT equal)
+ assertTrue(result);
+ }
+
+ @Test
+ void
testCheckIfFilter_notEqualsOperation_whenValuesNotMatch_shouldReturnFalse() {
+ QueryMessageRequest request = QueryMessageRequest.builder()
+ .fieldName("name")
+ .operationType("!=")
+ .targetValue("Jane")
+ .build();
+
+ boolean result = operator.checkIfFilter(request, fieldList);
+
+ // targetValue does not equal fieldValue, so record should NOT be
filtered
+ assertFalse(result);
+ }
+
+ @Test
+ void
testCheckIfFilter_notEqualsOperation_whenBothValuesAreNull_shouldReturnTrue() {
+ QueryMessageRequest request = QueryMessageRequest.builder()
+ .fieldName("nullField")
+ .operationType("!=")
+ .targetValue(null)
+ .build();
+
+ boolean result = operator.checkIfFilter(request, fieldList);
+
+ // Both are null, Objects.equals returns true, so record should be
filtered
+ assertTrue(result);
+ }
+
+ @Test
+ void
testCheckIfFilter_notEqualsOperation_whenTargetValueIsNull_andFieldValueIsNotNull_shouldReturnFalse()
{
+ QueryMessageRequest request = QueryMessageRequest.builder()
+ .fieldName("name")
+ .operationType("!=")
+ .targetValue(null)
+ .build();
+
+ boolean result = operator.checkIfFilter(request, fieldList);
+
+ // targetValue is null but fieldValue is "John", they are not equal,
so record should NOT be filtered
+ assertFalse(result);
+ }
+
+ // ==================== Tests for like operation ====================
+
+ @Test
+ void
testCheckIfFilter_likeOperation_whenFieldValueContainsTargetValue_shouldReturnFalse()
{
+ QueryMessageRequest request = QueryMessageRequest.builder()
+ .fieldName("email")
+ .operationType("like")
+ .targetValue("@example")
+ .build();
+
+ boolean result = operator.checkIfFilter(request, fieldList);
+
+ // fieldValue contains targetValue, so record should NOT be filtered
+ assertFalse(result);
+ }
+
+ @Test
+ void
testCheckIfFilter_likeOperation_whenFieldValueNotContainsTargetValue_shouldReturnTrue()
{
+ QueryMessageRequest request = QueryMessageRequest.builder()
+ .fieldName("email")
+ .operationType("like")
+ .targetValue("@gmail")
+ .build();
+
+ boolean result = operator.checkIfFilter(request, fieldList);
+
+ // fieldValue does not contain targetValue, so record should be
filtered
+ assertTrue(result);
+ }
+
+ @Test
+ void
testCheckIfFilter_likeOperation_whenFieldValueIsNull_shouldReturnTrue() {
+ QueryMessageRequest request = QueryMessageRequest.builder()
+ .fieldName("nullField")
+ .operationType("like")
+ .targetValue("anyValue")
+ .build();
+
+ boolean result = operator.checkIfFilter(request, fieldList);
+
+ // fieldValue is null, like operation is invalid, so record should be
filtered
+ assertTrue(result);
+ }
+
+ @Test
+ void
testCheckIfFilter_likeOperation_whenTargetValueIsNull_shouldReturnTrue() {
+ QueryMessageRequest request = QueryMessageRequest.builder()
+ .fieldName("name")
+ .operationType("like")
+ .targetValue(null)
+ .build();
+
+ boolean result = operator.checkIfFilter(request, fieldList);
+
+ // targetValue is null, like operation is invalid, so record should be
filtered
+ assertTrue(result);
+ }
+
+ @Test
+ void
testCheckIfFilter_likeOperation_whenTargetValueIsExactMatch_shouldReturnFalse()
{
+ QueryMessageRequest request = QueryMessageRequest.builder()
+ .fieldName("name")
+ .operationType("like")
+ .targetValue("John")
+ .build();
+
+ boolean result = operator.checkIfFilter(request, fieldList);
+
+ // fieldValue exactly matches targetValue (contains it), so record
should NOT be filtered
+ assertFalse(result);
+ }
+
+ @Test
+ void
testCheckIfFilter_likeOperation_whenTargetValueIsPartialMatch_shouldReturnFalse()
{
+ QueryMessageRequest request = QueryMessageRequest.builder()
+ .fieldName("name")
+ .operationType("like")
+ .targetValue("oh")
+ .build();
+
+ boolean result = operator.checkIfFilter(request, fieldList);
+
+ // fieldValue "John" contains "oh", so record should NOT be filtered
+ assertFalse(result);
+ }
+
+ @Test
+ void testCheckIfFilter_likeOperation_caseSensitive_shouldReturnTrue() {
+ QueryMessageRequest request = QueryMessageRequest.builder()
+ .fieldName("name")
+ .operationType("like")
+ .targetValue("JOHN")
+ .build();
+
+ boolean result = operator.checkIfFilter(request, fieldList);
+
+ // String.contains is case-sensitive, "John" does not contain "JOHN",
so record should be filtered
+ assertTrue(result);
+ }
+
+ // ==================== Tests for unknown operation type
====================
+
+ @Test
+ void testCheckIfFilter_unknownOperation_shouldReturnFalse() {
+ QueryMessageRequest request = QueryMessageRequest.builder()
+ .fieldName("name")
+ .operationType("unknown")
+ .targetValue("John")
+ .build();
+
+ boolean result = operator.checkIfFilter(request, fieldList);
+
+ // Unknown operation type, ifFilter remains false (default value)
+ assertFalse(result);
+ }
+
+ @Test
+ void testCheckIfFilter_greaterThanOperation_shouldReturnFalse() {
+ QueryMessageRequest request = QueryMessageRequest.builder()
+ .fieldName("age")
+ .operationType(">")
+ .targetValue("20")
+ .build();
+
+ boolean result = operator.checkIfFilter(request, fieldList);
+
+ // Greater than operation is not implemented, ifFilter remains false
+ assertFalse(result);
+ }
+}