This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b2876ad335 [INLONG-12073][Manager] Optimize Pulsar message query with 
thread pool and proper error handling (#12074)
b2876ad335 is described below

commit b2876ad335b99792ef94894a38f4e9b94242adb8
Author: healchow <[email protected]>
AuthorDate: Wed Feb 25 15:29:10 2026 +0800

    [INLONG-12073][Manager] Optimize Pulsar message query with thread pool and 
proper error handling (#12074)
    
    * [INLONG-12073][Manager] Optimize Pulsar message query with thread pool 
and proper error handling
    
    * [INLONG-12073][Manager] Throw exception for query caller
    
    * [INLONG-12073][Manager] Add unit tests
    
    * [INLONG-12073][Manager] Fix the code formats
    
    * [INLONG-12073][Manager] Remove the randomly select method
    
    * [INLONG-12073][Manager] Use ConcurrentLinkedQueue to improve performance
    
    * [INLONG-12073][Manager] Add name label for manager-schedule module
    
    * [INLONG-12073][Manager] Remove unused properties
---
 inlong-manager/manager-schedule/pom.xml            |   1 +
 .../resource/queue/QueueResourceOperator.java      |   3 +-
 .../queue/pulsar/PulsarQueueResourceOperator.java  | 188 +++++++--
 .../resource/queue/pulsar/QueryCountDownLatch.java |  41 +-
 .../queue/pulsar/QueryLatestMessagesRunnable.java  |  86 ++--
 .../service/stream/InlongStreamServiceImpl.java    |   9 +-
 .../queue/QueryLatestMessagesRunnableTest.java     | 439 +++++++++++++++++++++
 .../src/main/resources/application.properties      |   6 +
 8 files changed, 706 insertions(+), 67 deletions(-)

diff --git a/inlong-manager/manager-schedule/pom.xml 
b/inlong-manager/manager-schedule/pom.xml
index b975dedbaf..444d314ab4 100644
--- a/inlong-manager/manager-schedule/pom.xml
+++ b/inlong-manager/manager-schedule/pom.xml
@@ -25,6 +25,7 @@
     </parent>
 
     <artifactId>manager-schedule</artifactId>
+    <name>Apache InLong - Manager Schedule</name>
 
     <properties>
         <quartz.version>2.3.2</quartz.version>
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/QueueResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/QueueResourceOperator.java
index 528884c996..389cb24167 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/QueueResourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/QueueResourceOperator.java
@@ -83,11 +83,10 @@ public interface QueueResourceOperator {
      * @param groupInfo inlong group info
      * @param streamInfo inlong stream info
      * @param request query message request
-     * @throws Exception any exception if occurred
      * @return query brief mq message info
      */
     default List<BriefMQMessage> queryLatestMessages(InlongGroupInfo 
groupInfo, InlongStreamInfo streamInfo,
-            QueryMessageRequest request) throws Exception {
+            QueryMessageRequest request) {
         return null;
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
index 67a1b39f7a..66cbee4fc3 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
@@ -44,17 +44,26 @@ import 
org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.apache.inlong.manager.service.stream.InlongStreamService;
 
 import com.google.common.base.Objects;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -80,7 +89,59 @@ public class PulsarQueueResourceOperator implements 
QueueResourceOperator {
     @Autowired
     private PulsarOperator pulsarOperator;
 
-    private ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(10);
+    @Value("${pulsar.query.poolSize:10}")
+    private int poolSize;
+
+    @Value("${pulsar.query.keepAliveSeconds:60}")
+    private long keepAliveSeconds;
+
+    @Value("${pulsar.query.queueCapacity:100}")
+    private int queueCapacity;
+
+    @Value("${pulsar.query.queryTimeoutSeconds:10}")
+    private int queryTimeoutSeconds;
+
+    /**
+     * Thread pool for querying messages from multiple Pulsar clusters 
concurrently.
+     * Configuration is loaded from application properties with prefix 
'pulsar.query'.
+     */
+    private ExecutorService messageQueryExecutor;
+
+    /**
+     * Initialize the executor service after bean creation.
+     */
+    @PostConstruct
+    public void init() {
+        // Initialize the executor service with same core pool size and max 
core pool size
+        this.messageQueryExecutor = new ThreadPoolExecutor(
+                poolSize,
+                poolSize,
+                keepAliveSeconds,
+                TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(queueCapacity),
+                new 
ThreadFactoryBuilder().setNameFormat("pulsar-message-query-%d").build(),
+                // Use AbortPolicy to throw exception when the queue is full
+                new ThreadPoolExecutor.AbortPolicy());
+        log.info("Init message query executor, poolSize={}, 
keepAliveSeconds={}, queueCapacity={}",
+                poolSize, keepAliveSeconds, queueCapacity);
+    }
+
+    /**
+     * Shutdown the executor service when the bean is destroyed.
+     */
+    @PreDestroy
+    public void shutdown() {
+        log.info("Shutting down pulsar message query executor");
+        messageQueryExecutor.shutdown();
+        try {
+            if (!messageQueryExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+                messageQueryExecutor.shutdownNow();
+            }
+        } catch (InterruptedException e) {
+            messageQueryExecutor.shutdownNow();
+            Thread.currentThread().interrupt();
+        }
+    }
 
     @Override
     public boolean accept(String mqType) {
@@ -307,34 +368,117 @@ public class PulsarQueueResourceOperator implements 
QueueResourceOperator {
      * Query latest message from pulsar
      */
     public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo, 
InlongStreamInfo streamInfo,
-            QueryMessageRequest request) throws Exception {
-        List<ClusterInfo> pulsarClusterList = 
clusterService.listByTagAndType(groupInfo.getInlongClusterTag(),
-                ClusterType.PULSAR);
-        List<BriefMQMessage> briefMQMessages = 
Collections.synchronizedList(new ArrayList<>());
-        QueryCountDownLatch queryLatch = new 
QueryCountDownLatch(request.getMessageCount(), pulsarClusterList.size());
+            QueryMessageRequest request) {
+        String groupId = streamInfo.getInlongGroupId();
+        String clusterTag = groupInfo.getInlongClusterTag();
+        List<ClusterInfo> clusterInfos = 
clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR);
+        if (CollectionUtils.isEmpty(clusterInfos)) {
+            log.warn("No pulsar cluster found for clusterTag={} for 
groupId={}", clusterTag, groupId);
+            return Collections.emptyList();
+        }
+
+        // Select clusters and calculate per-cluster query count
+        Integer requestCount = request.getMessageCount();
+        int clusterSize = clusterInfos.size();
+        QueryCountDownLatch queryLatch = new QueryCountDownLatch(requestCount, 
clusterSize);
+        log.debug("Query pulsar message in {} clusters, each cluster query {} 
messages", clusterSize, requestCount);
+
+        // Extract common parameters
         InlongPulsarInfo inlongPulsarInfo = ((InlongPulsarInfo) groupInfo);
-        for (ClusterInfo clusterInfo : pulsarClusterList) {
-            QueryLatestMessagesRunnable task = new 
QueryLatestMessagesRunnable(inlongPulsarInfo, streamInfo,
-                    (PulsarClusterInfo) clusterInfo, pulsarOperator, request, 
briefMQMessages, queryLatch);
-            this.executor.execute(task);
+        String tenant = inlongPulsarInfo.getPulsarTenant();
+        String namespace = inlongPulsarInfo.getMqResource();
+        String topicName = streamInfo.getMqResource();
+        boolean serialQueue = 
InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(inlongPulsarInfo.getQueueModule());
+
+        // Submit query tasks to thread pool, each task queries from one 
cluster
+        // Use submit() instead of execute() to get Future for cancellation 
support
+        List<Future<?>> submittedTasks = new ArrayList<>();
+        // Use ConcurrentLinkedQueue for thread-safe message collection,
+        // its performance is better than Collections.synchronizedList
+        ConcurrentLinkedQueue<BriefMQMessage> messageResultQueue = new 
ConcurrentLinkedQueue<>();
+        for (ClusterInfo clusterInfo : clusterInfos) {
+            PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
+            if (StringUtils.isBlank(tenant)) {
+                tenant = pulsarCluster.getPulsarTenant();
+            }
+            String fullTopicName = tenant + "/" + namespace + "/" + topicName;
+            // Create a copy of request with adjusted message count for this 
cluster
+            QueryMessageRequest currentRequest = 
buildRequestForSingleCluster(request, requestCount);
+            QueryLatestMessagesRunnable task = new 
QueryLatestMessagesRunnable(pulsarOperator, streamInfo,
+                    pulsarCluster, serialQueue, fullTopicName, currentRequest, 
messageResultQueue, queryLatch);
+            try {
+                Future<?> future = this.messageQueryExecutor.submit(task);
+                submittedTasks.add(future);
+            } catch (RejectedExecutionException e) {
+                // Cancel all previously submitted tasks before throwing 
exception
+                log.error("Failed to submit query task for groupId={}, 
cancelling {} submitted tasks",
+                        groupId, submittedTasks.size(), e);
+                cancelSubmittedTasks(submittedTasks);
+                throw new BusinessException("Query messages task rejected: too 
many concurrent requests");
+            }
+        }
+
+        // Wait for tasks to complete with a configurable timeout
+        String streamId = streamInfo.getInlongStreamId();
+        try {
+            boolean completed = queryLatch.await(queryTimeoutSeconds, 
TimeUnit.SECONDS);
+            if (!completed) {
+                log.warn("Query messages timeout for groupId={}, streamId={}, 
collected {} messages",
+                        groupId, streamId, messageResultQueue.size());
+            }
+        } catch (InterruptedException e) {
+            throw new BusinessException(String.format("Query messages task 
interrupted for groupId=%s, streamId=%s",
+                    groupId, streamId));
+        }
+
+        log.info("Success query pulsar message for groupId={}, streamId={}", 
groupId, streamId);
+        List<BriefMQMessage> messageResultList = new 
ArrayList<>(messageResultQueue);
+
+        // if query result size is less than request count, return all, 
otherwise truncate to request count
+        if (messageResultList.isEmpty() || messageResultList.size() <= 
requestCount) {
+            return messageResultList;
         }
-        queryLatch.await(30, TimeUnit.SECONDS);
-        log.info("success query pulsar message for groupId={}, streamId={}", 
streamInfo.getInlongGroupId(),
-                streamInfo.getInlongStreamId());
-
-        int finalMsgCount = Math.min(request.getMessageCount(), 
briefMQMessages.size());
-        if (finalMsgCount > 0) {
-            return new ArrayList<>(briefMQMessages.subList(0, finalMsgCount));
-        } else {
-            return new ArrayList<>();
+
+        return new ArrayList<>(messageResultList.subList(0, requestCount));
+    }
+
+    /**
+     * Build a new QueryMessageRequest with adjusted message count for a 
specific cluster.
+     */
+    private QueryMessageRequest 
buildRequestForSingleCluster(QueryMessageRequest original, int messageCount) {
+        return QueryMessageRequest.builder()
+                .groupId(original.getGroupId())
+                .streamId(original.getStreamId())
+                .messageCount(messageCount)
+                .fieldName(original.getFieldName())
+                .operationType(original.getOperationType())
+                .targetValue(original.getTargetValue())
+                .build();
+    }
+
+    /**
+     * Cancel all submitted tasks when an error occurs.
+     * This method attempts to cancel tasks with interrupt flag set to true,
+     * allowing running tasks to be interrupted if they check for interruption.
+     *
+     * @param submittedTasks list of Future objects representing submitted 
tasks
+     */
+    private void cancelSubmittedTasks(List<java.util.concurrent.Future<?>> 
submittedTasks) {
+        int cancelledCount = 0;
+        for (java.util.concurrent.Future<?> future : submittedTasks) {
+            // mayInterruptIfRunning=true allows interrupting running tasks
+            if (future.cancel(true)) {
+                cancelledCount++;
+            }
         }
+        log.info("Cancelled {}/{} submitted tasks", cancelledCount, 
submittedTasks.size());
     }
 
     /**
      * Reset cursor for consumer group
      */
     public void resetCursor(InlongGroupInfo groupInfo, InlongStreamEntity 
streamEntity, StreamSinkEntity sinkEntity,
-            Long resetTime) throws Exception {
+            Long resetTime) {
         log.info("begin to reset cursor for sinkId={}", sinkEntity.getId());
         InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
         List<ClusterInfo> clusterInfos = 
clusterService.listByTagAndType(pulsarInfo.getInlongClusterTag(),
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryCountDownLatch.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryCountDownLatch.java
index 08970b39d7..a76dc14f32 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryCountDownLatch.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryCountDownLatch.java
@@ -21,13 +21,20 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 /**
- * QueryCountDownLatch
+ * QueryCountDownLatch for managing query task and data completion.
+ * <p>
+ * This class provides two types of countdown:
+ * <ul>
+ *   <li>Task countdown: tracks the number of tasks completed (regardless of 
success or failure)</li>
+ *   <li>Data countdown: tracks the number of data items retrieved</li>
+ * </ul>
+ * The flagLatch is released when either all tasks complete or enough data is 
collected.
  */
 public class QueryCountDownLatch {
 
-    private CountDownLatch dataLatch;
-    private CountDownLatch taskLatch;
-    private CountDownLatch flagLatch;
+    private final CountDownLatch dataLatch;
+    private final CountDownLatch taskLatch;
+    private final CountDownLatch flagLatch;
 
     public QueryCountDownLatch(int dataSize, int taskSize) {
         this.dataLatch = new CountDownLatch(dataSize);
@@ -35,12 +42,34 @@ public class QueryCountDownLatch {
         this.flagLatch = new CountDownLatch(1);
     }
 
-    public void countDown(int dataDownSize) {
+    /**
+     * Called when a task completes (regardless of success or failure).
+     * This should be called in a finally block to ensure it's always executed.
+     */
+    public void taskCountDown() {
         this.taskLatch.countDown();
+        checkAndRelease();
+    }
+
+    /**
+     * Called when data items are successfully retrieved.
+     *
+     * @param dataDownSize the number of data items retrieved
+     */
+    public void dataCountDown(int dataDownSize) {
         for (int i = 0; i < dataDownSize; i++) {
             this.dataLatch.countDown();
         }
-        if (this.taskLatch.getCount() == 0 || this.dataLatch.getCount() == 0) {
+        checkAndRelease();
+    }
+
+    /**
+     * Check if the flagLatch should be released.
+     * Release when all tasks complete or enough data is collected.
+     */
+    private synchronized void checkAndRelease() {
+        if (this.flagLatch.getCount() > 0
+                && (this.taskLatch.getCount() == 0 || 
this.dataLatch.getCount() == 0)) {
             this.flagLatch.countDown();
         }
     }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java
index 4fb6b58e49..37eabf65f3 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java
@@ -17,63 +17,89 @@
 
 package org.apache.inlong.manager.service.resource.queue.pulsar;
 
-import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
 import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
-import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
 
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
- * QueryLatestMessagesRunnable
+ * Runnable task for querying latest messages from a Pulsar cluster.
  */
 public class QueryLatestMessagesRunnable implements Runnable {
 
-    private InlongPulsarInfo inlongPulsarInfo;
-    private InlongStreamInfo streamInfo;
-    private PulsarClusterInfo clusterInfo;
-    private PulsarOperator pulsarOperator;
-    private QueryMessageRequest queryMessageRequest;
-    private List<BriefMQMessage> briefMQMessages;
-    private QueryCountDownLatch latch;
+    private static final Logger LOG = 
LoggerFactory.getLogger(QueryLatestMessagesRunnable.class);
 
-    public QueryLatestMessagesRunnable(InlongPulsarInfo inlongPulsarInfo,
+    private final PulsarOperator pulsarOperator;
+    private final InlongStreamInfo streamInfo;
+    private final PulsarClusterInfo clusterInfo;
+    private final boolean serialQueue;
+    private final String fullTopicName;
+    private final QueryMessageRequest queryMessageRequest;
+    private final ConcurrentLinkedQueue<BriefMQMessage> messageResultQueue;
+    private final QueryCountDownLatch latch;
+
+    public QueryLatestMessagesRunnable(
+            PulsarOperator pulsarOperator,
             InlongStreamInfo streamInfo,
             PulsarClusterInfo clusterInfo,
-            PulsarOperator pulsarOperator,
+            boolean serialQueue,
+            String fullTopicName,
             QueryMessageRequest queryMessageRequest,
-            List<BriefMQMessage> briefMQMessages,
+            ConcurrentLinkedQueue<BriefMQMessage> messageResultQueue,
             QueryCountDownLatch latch) {
-        this.inlongPulsarInfo = inlongPulsarInfo;
+        this.pulsarOperator = pulsarOperator;
         this.streamInfo = streamInfo;
         this.clusterInfo = clusterInfo;
-        this.pulsarOperator = pulsarOperator;
+        this.serialQueue = serialQueue;
+        this.fullTopicName = fullTopicName;
         this.queryMessageRequest = queryMessageRequest;
-        this.briefMQMessages = briefMQMessages;
+        this.messageResultQueue = messageResultQueue;
         this.latch = latch;
     }
 
     @Override
     public void run() {
-        String tenant = inlongPulsarInfo.getPulsarTenant();
-        if (StringUtils.isBlank(tenant)) {
-            tenant = clusterInfo.getPulsarTenant();
-        }
+        String clusterName = clusterInfo.getName();
+        LOG.debug("Begin to query messages from cluster={}, topic={}", 
clusterName, fullTopicName);
+        try {
+            // Check for interruption before starting the query
+            if (Thread.currentThread().isInterrupted()) {
+                LOG.info("Task interrupted before query, cluster={}, 
topic={}", clusterName, fullTopicName);
+                return;
+            }
+
+            List<BriefMQMessage> messages = 
pulsarOperator.queryLatestMessage(clusterInfo, fullTopicName,
+                    queryMessageRequest, streamInfo, serialQueue);
+
+            // Check for interruption after query completes
+            // (IO operations not support interruption, so we check the flag 
manually after the blocking call returns)
+            if (Thread.currentThread().isInterrupted()) {
+                LOG.info("Task interrupted after query, discarding results, 
cluster={}, topic={}",
+                        clusterName, fullTopicName);
+                return;
+            }
 
-        String namespace = inlongPulsarInfo.getMqResource();
-        String topicName = streamInfo.getMqResource();
-        String fullTopicName = tenant + "/" + namespace + "/" + topicName;
-        boolean serial = 
InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(inlongPulsarInfo.getQueueModule());
-        List<BriefMQMessage> messages =
-                pulsarOperator.queryLatestMessage(clusterInfo, fullTopicName, 
queryMessageRequest, streamInfo, serial);
-        if (CollectionUtils.isNotEmpty(messages)) {
-            briefMQMessages.addAll(messages);
-            this.latch.countDown(messages.size());
+            if (CollectionUtils.isNotEmpty(messages)) {
+                messageResultQueue.addAll(messages);
+                this.latch.dataCountDown(messages.size());
+                LOG.debug("Successfully queried {} messages from cluster={}, 
topic={}",
+                        messages.size(), clusterName, fullTopicName);
+            } else {
+                LOG.debug("No messages found from cluster={}, topic={}", 
clusterName, fullTopicName);
+            }
+        } catch (Exception e) {
+            LOG.error("Failed to query messages from cluster={}, groupId={}, 
streamId={}",
+                    clusterName, queryMessageRequest.getGroupId(), 
queryMessageRequest.getStreamId(), e);
+        } finally {
+            // Ensure taskCountDown is always called, regardless of success or 
failure
+            this.latch.taskCountDown();
         }
     }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index 46f8fe5e94..8b11651d2a 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -981,13 +981,8 @@ public class InlongStreamServiceImpl implements 
InlongStreamService {
         InlongGroupOperator instance = 
groupOperatorFactory.getInstance(groupEntity.getMqType());
         InlongGroupInfo groupInfo = instance.getFromEntity(groupEntity);
         InlongStreamInfo inlongStreamInfo = get(request.getGroupId(), 
request.getStreamId());
-        List<BriefMQMessage> messageList = new ArrayList<>();
         QueueResourceOperator queueOperator = 
queueOperatorFactory.getInstance(groupEntity.getMqType());
-        try {
-            messageList = queueOperator.queryLatestMessages(groupInfo, 
inlongStreamInfo, request);
-        } catch (Exception e) {
-            LOGGER.error("query message error ", e);
-        }
-        return messageList;
+        // Do not catch exception, throws it to caller
+        return queueOperator.queryLatestMessages(groupInfo, inlongStreamInfo, 
request);
     }
 }
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/queue/QueryLatestMessagesRunnableTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/queue/QueryLatestMessagesRunnableTest.java
new file mode 100644
index 0000000000..fe90083ba3
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/queue/QueryLatestMessagesRunnableTest.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.queue;
+
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
+import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
+import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarOperator;
+import 
org.apache.inlong.manager.service.resource.queue.pulsar.QueryCountDownLatch;
+import 
org.apache.inlong.manager.service.resource.queue.pulsar.QueryLatestMessagesRunnable;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test class for {@link QueryLatestMessagesRunnable}.
+ * Tests focus on interrupt handling behavior.
+ */
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
+public class QueryLatestMessagesRunnableTest {
+
+    @Mock
+    private PulsarOperator pulsarOperator;
+
+    @Mock
+    private InlongStreamInfo streamInfo;
+
+    @Mock
+    private PulsarClusterInfo clusterInfo;
+
+    private QueryMessageRequest queryMessageRequest;
+    private ConcurrentLinkedQueue<BriefMQMessage> messageResultQueue;
+    private QueryCountDownLatch queryLatch;
+
+    private static final String CLUSTER_NAME = "test-cluster";
+    private static final String FULL_TOPIC_NAME = 
"public/test-namespace/test-topic";
+    private static final String GROUP_ID = "test-group";
+    private static final String STREAM_ID = "test-stream";
+
+    @BeforeEach
+    public void setUp() {
+        when(clusterInfo.getName()).thenReturn(CLUSTER_NAME);
+
+        queryMessageRequest = QueryMessageRequest.builder()
+                .groupId(GROUP_ID)
+                .streamId(STREAM_ID)
+                .messageCount(10)
+                .build();
+
+        messageResultQueue = new ConcurrentLinkedQueue<>();
+        queryLatch = new QueryCountDownLatch(10, 1);
+    }
+
+    /**
+     * Test: Task executes successfully when not interrupted.
+     * Verifies that messages are added to result list and latch is counted 
down.
+     */
+    @Test
+    public void testSuccessfulQueryWithoutInterruption() {
+        // 准备 mock 返回数据
+        List<BriefMQMessage> mockMessages = new ArrayList<>();
+        BriefMQMessage message = new BriefMQMessage();
+        message.setBody("test message");
+        mockMessages.add(message);
+
+        when(pulsarOperator.queryLatestMessage(any(), anyString(), any(), 
any(), anyBoolean()))
+                .thenReturn(mockMessages);
+
+        QueryLatestMessagesRunnable task = new QueryLatestMessagesRunnable(
+                pulsarOperator, streamInfo, clusterInfo, false, 
FULL_TOPIC_NAME,
+                queryMessageRequest, messageResultQueue, queryLatch);
+
+        // 同步执行任务
+        task.run();
+
+        // 验证:查询被执行
+        verify(pulsarOperator, times(1)).queryLatestMessage(any(), 
anyString(), any(), any(), anyBoolean());
+        // 验证:结果被添加到列表
+        assertEquals(1, messageResultQueue.size());
+    }
+
+    /**
+     * Test: Task exits immediately when thread is interrupted before query 
starts.
+     * Verifies that no query is performed and latch is still counted down.
+     */
+    @Test
+    public void testInterruptionBeforeQuery() throws InterruptedException {
+        QueryLatestMessagesRunnable task = new QueryLatestMessagesRunnable(
+                pulsarOperator, streamInfo, clusterInfo, false, 
FULL_TOPIC_NAME,
+                queryMessageRequest, messageResultQueue, queryLatch);
+
+        // 创建一个在执行任务前就被中断的线程
+        Thread testThread = new Thread(() -> {
+            // 在执行任务前中断线程
+            Thread.currentThread().interrupt();
+            task.run();
+        });
+
+        testThread.start();
+        testThread.join(5000);
+
+        // 验证:查询未被执行(因为在查询前就检测到中断)
+        verify(pulsarOperator, never()).queryLatestMessage(any(), anyString(), 
any(), any(), anyBoolean());
+        // 验证:结果列表为空
+        assertTrue(messageResultQueue.isEmpty());
+    }
+
+    /**
+     * Test: Task discards results when interrupted after query completes.
+     * Verifies that results are not added to the list even if query returned 
data.
+     */
+    @Test
+    public void testInterruptionAfterQuery() throws InterruptedException {
+        // 模拟查询操作会检查并设置中断标志
+        List<BriefMQMessage> mockMessages = new ArrayList<>();
+        BriefMQMessage message = new BriefMQMessage();
+        message.setBody("test message");
+        mockMessages.add(message);
+
+        CountDownLatch queryStartedLatch = new CountDownLatch(1);
+        CountDownLatch interruptSetLatch = new CountDownLatch(1);
+
+        when(pulsarOperator.queryLatestMessage(any(), anyString(), any(), 
any(), anyBoolean()))
+                .thenAnswer(invocation -> {
+                    // 通知主线程查询已开始
+                    queryStartedLatch.countDown();
+                    // 等待主线程设置中断标志
+                    interruptSetLatch.await(5, TimeUnit.SECONDS);
+                    // 返回结果(模拟 IO 操作不响应中断)
+                    return mockMessages;
+                });
+
+        QueryLatestMessagesRunnable task = new QueryLatestMessagesRunnable(
+                pulsarOperator, streamInfo, clusterInfo, false, 
FULL_TOPIC_NAME,
+                queryMessageRequest, messageResultQueue, queryLatch);
+
+        // 在线程池中执行任务
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        Future<?> future = executor.submit(task);
+
+        // 等待查询开始
+        assertTrue(queryStartedLatch.await(5, TimeUnit.SECONDS));
+
+        // 取消任务(设置中断标志)
+        future.cancel(true);
+        // 通知任务可以继续
+        interruptSetLatch.countDown();
+
+        // 等待任务完成
+        Thread.sleep(500);
+
+        // 验证:查询被执行
+        verify(pulsarOperator, times(1)).queryLatestMessage(any(), 
anyString(), any(), any(), anyBoolean());
+        // 验证:结果被丢弃(因为查询后检测到中断标志)
+        assertTrue(messageResultQueue.isEmpty());
+
+        executor.shutdownNow();
+    }
+
+    /**
+     * Test: Task handles empty query results correctly.
+     * Verifies that no exception is thrown and latch is counted down.
+     */
+    @Test
+    public void testEmptyQueryResults() {
+        when(pulsarOperator.queryLatestMessage(any(), anyString(), any(), 
any(), anyBoolean()))
+                .thenReturn(Collections.emptyList());
+
+        QueryLatestMessagesRunnable task = new QueryLatestMessagesRunnable(
+                pulsarOperator, streamInfo, clusterInfo, false, 
FULL_TOPIC_NAME,
+                queryMessageRequest, messageResultQueue, queryLatch);
+
+        task.run();
+
+        // 验证:查询被执行
+        verify(pulsarOperator, times(1)).queryLatestMessage(any(), 
anyString(), any(), any(), anyBoolean());
+        // 验证:结果列表为空
+        assertTrue(messageResultQueue.isEmpty());
+    }
+
+    /**
+     * Test: Task handles null query results correctly.
+     * Verifies that no exception is thrown and latch is counted down.
+     */
+    @Test
+    public void testNullQueryResults() {
+        when(pulsarOperator.queryLatestMessage(any(), anyString(), any(), 
any(), anyBoolean()))
+                .thenReturn(null);
+
+        QueryLatestMessagesRunnable task = new QueryLatestMessagesRunnable(
+                pulsarOperator, streamInfo, clusterInfo, false, 
FULL_TOPIC_NAME,
+                queryMessageRequest, messageResultQueue, queryLatch);
+
+        task.run();
+
+        // 验证:查询被执行
+        verify(pulsarOperator, times(1)).queryLatestMessage(any(), 
anyString(), any(), any(), anyBoolean());
+        // 验证:结果列表为空
+        assertTrue(messageResultQueue.isEmpty());
+    }
+
+    /**
+     * Test: Task handles query exception gracefully.
+     * Verifies that exception is caught and latch is still counted down.
+     */
+    @Test
+    public void testQueryException() {
+        when(pulsarOperator.queryLatestMessage(any(), anyString(), any(), 
any(), anyBoolean()))
+                .thenThrow(new RuntimeException("Simulated query failure"));
+
+        QueryLatestMessagesRunnable task = new QueryLatestMessagesRunnable(
+                pulsarOperator, streamInfo, clusterInfo, false, 
FULL_TOPIC_NAME,
+                queryMessageRequest, messageResultQueue, queryLatch);
+
+        // 不应抛出异常
+        task.run();
+
+        // 验证:查询被执行
+        verify(pulsarOperator, times(1)).queryLatestMessage(any(), 
anyString(), any(), any(), anyBoolean());
+        // 验证:结果列表为空
+        assertTrue(messageResultQueue.isEmpty());
+    }
+
+    /**
+     * Test: Multiple tasks can be cancelled together.
+     * Simulates the scenario where RejectedExecutionException occurs and all 
submitted tasks need to be cancelled.
+     */
+    @Test
+    public void testMultipleTaskCancellation() throws InterruptedException {
+        int taskCount = 5;
+        ConcurrentLinkedQueue<BriefMQMessage> sharedResultList = new 
ConcurrentLinkedQueue<>();
+        QueryCountDownLatch sharedLatch = new QueryCountDownLatch(50, 
taskCount);
+
+        // 模拟慢查询
+        CountDownLatch allTasksStarted = new CountDownLatch(taskCount);
+        CountDownLatch proceedLatch = new CountDownLatch(1);
+
+        List<BriefMQMessage> mockMessages = new ArrayList<>();
+        BriefMQMessage message = new BriefMQMessage();
+        message.setBody("test message");
+        mockMessages.add(message);
+
+        when(pulsarOperator.queryLatestMessage(any(), anyString(), any(), 
any(), anyBoolean()))
+                .thenAnswer(invocation -> {
+                    allTasksStarted.countDown();
+                    // 等待信号继续
+                    proceedLatch.await(10, TimeUnit.SECONDS);
+                    return mockMessages;
+                });
+
+        ExecutorService executor = Executors.newFixedThreadPool(taskCount);
+        List<Future<?>> futures = new ArrayList<>();
+
+        // 提交多个任务
+        for (int i = 0; i < taskCount; i++) {
+            QueryLatestMessagesRunnable task = new QueryLatestMessagesRunnable(
+                    pulsarOperator, streamInfo, clusterInfo, false, 
FULL_TOPIC_NAME,
+                    queryMessageRequest, sharedResultList, sharedLatch);
+            futures.add(executor.submit(task));
+        }
+
+        // 等待所有任务开始执行
+        assertTrue(allTasksStarted.await(5, TimeUnit.SECONDS));
+
+        // 取消所有任务
+        int cancelledCount = 0;
+        for (Future<?> future : futures) {
+            if (future.cancel(true)) {
+                cancelledCount++;
+            }
+        }
+
+        // 允许任务继续
+        proceedLatch.countDown();
+
+        // 关闭线程池
+        executor.shutdown();
+        executor.awaitTermination(5, TimeUnit.SECONDS);
+
+        // 验证:结果被丢弃(因为中断标志已设置)
+        assertTrue(sharedResultList.isEmpty());
+    }
+
+    /**
+     * Test: Verifies interrupt flag is checked at the right points.
+     * This test ensures the interrupt check happens both before and after the 
query.
+     */
+    @Test
+    public void testInterruptCheckPoints() throws InterruptedException {
+        List<BriefMQMessage> mockMessages = new ArrayList<>();
+        BriefMQMessage message = new BriefMQMessage();
+        message.setBody("test message");
+        mockMessages.add(message);
+
+        // 记录查询被调用的次数
+        final int[] queryCallCount = {0};
+
+        when(pulsarOperator.queryLatestMessage(any(), anyString(), any(), 
any(), anyBoolean()))
+                .thenAnswer(invocation -> {
+                    queryCallCount[0]++;
+                    return mockMessages;
+                });
+
+        // 场景1: 正常执行(不中断)
+        QueryLatestMessagesRunnable normalTask = new 
QueryLatestMessagesRunnable(
+                pulsarOperator, streamInfo, clusterInfo, false, 
FULL_TOPIC_NAME,
+                queryMessageRequest, messageResultQueue, queryLatch);
+        normalTask.run();
+
+        assertEquals(1, queryCallCount[0], "Query should be called once in 
normal execution");
+        assertEquals(1, messageResultQueue.size(), "Result should be added in 
normal execution");
+
+        // 重置
+        queryCallCount[0] = 0;
+        messageResultQueue.clear();
+        queryLatch = new QueryCountDownLatch(10, 1);
+
+        // 场景2: 查询前中断
+        Thread preInterruptThread = new Thread(() -> {
+            Thread.currentThread().interrupt();
+            QueryLatestMessagesRunnable task = new QueryLatestMessagesRunnable(
+                    pulsarOperator, streamInfo, clusterInfo, false, 
FULL_TOPIC_NAME,
+                    queryMessageRequest, messageResultQueue, queryLatch);
+            task.run();
+        });
+        preInterruptThread.start();
+        preInterruptThread.join(5000);
+
+        assertEquals(0, queryCallCount[0], "Query should not be called when 
interrupted before");
+        assertTrue(messageResultQueue.isEmpty(), "Result should not be added 
when interrupted before");
+    }
+
+    /**
+     * Test: Verifies that RejectedExecutionException message contains 
'rejected' keyword.
+     * This ensures the exception can be properly identified when queue is 
full.
+     */
+    @Test
+    public void testRejectedExecutionExceptionContainsRejectKeyword() {
+        // Create a thread pool with minimal capacity to trigger rejection
+        ExecutorService tinyExecutor = new 
java.util.concurrent.ThreadPoolExecutor(
+                1, 1, 0L, TimeUnit.MILLISECONDS,
+                new java.util.concurrent.LinkedBlockingQueue<>(1),
+                new java.util.concurrent.ThreadPoolExecutor.AbortPolicy());
+
+        CountDownLatch blockingLatch = new CountDownLatch(1);
+        try {
+            // Submit blocking tasks to fill the pool and queue
+            tinyExecutor.submit(() -> {
+                try {
+                    blockingLatch.await(30, TimeUnit.SECONDS);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            });
+
+            tinyExecutor.submit(() -> {
+                try {
+                    blockingLatch.await(30, TimeUnit.SECONDS);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            });
+
+            // This submission should be rejected (pool full, queue full)
+            tinyExecutor.submit(() -> {
+                // This task should never execute
+            });
+
+            // If we reach here, the test failed
+            org.junit.jupiter.api.Assertions.fail("Expected 
RejectedExecutionException was not thrown");
+        } catch (java.util.concurrent.RejectedExecutionException e) {
+            // Verify the exception message contains 'reject' keyword 
(case-insensitive)
+            String message = e.getMessage();
+            assertTrue(message != null && 
message.toLowerCase().contains("reject"),
+                    "RejectedExecutionException message should contain 
'reject' keyword, but was: " + message);
+        } finally {
+            blockingLatch.countDown();
+            tinyExecutor.shutdownNow();
+        }
+    }
+
+    /**
+     * Test: Verifies that BusinessException thrown by 
PulsarQueueResourceOperator contains 'reject' keyword.
+     * This simulates the scenario where the queue is full and task submission 
is rejected.
+     */
+    @Test
+    public void testBusinessExceptionContainsRejectKeywordWhenQueueFull() {
+        // Simulate the BusinessException that would be thrown when 
RejectedExecutionException occurs
+        String expectedMessage = "Query messages task rejected: too many 
concurrent requests";
+        BusinessException exception = new BusinessException(expectedMessage);
+
+        // Verify the exception message contains 'reject' keyword
+        assertTrue(exception.getMessage().contains("reject"),
+                "BusinessException message should contain 'reject' keyword");
+    }
+}
diff --git 
a/inlong-manager/manager-web/src/main/resources/application.properties 
b/inlong-manager/manager-web/src/main/resources/application.properties
index a498ac4d15..07a1e53806 100644
--- a/inlong-manager/manager-web/src/main/resources/application.properties
+++ b/inlong-manager/manager-web/src/main/resources/application.properties
@@ -64,5 +64,11 @@ openapi.auth.enabled=false
 audit.admin.ids=3,4,5,6
 audit.user.ids=3,4,5,6
 
+# Pulsar message query thread pool configuration
+pulsar.query.poolSize=10
+pulsar.query.keepAliveSeconds=60
+pulsar.query.queueCapacity=100
+pulsar.query.queryTimeoutSeconds=10
+
 # tencent cloud log service endpoint, The Operator cls resource by it
 cls.manager.endpoint=127.0.0.1
\ No newline at end of file


Reply via email to