This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b2876ad335 [INLONG-12073][Manager] Optimize Pulsar message query with
thread pool and proper error handling (#12074)
b2876ad335 is described below
commit b2876ad335b99792ef94894a38f4e9b94242adb8
Author: healchow <[email protected]>
AuthorDate: Wed Feb 25 15:29:10 2026 +0800
[INLONG-12073][Manager] Optimize Pulsar message query with thread pool and
proper error handling (#12074)
* [INLONG-12073][Manager] Optimize Pulsar message query with thread pool
and proper error handling
* [INLONG-12073][Manager] Throw exception for query caller
* [INLONG-12073][Manager] Add unit tests
* [INLONG-12073][Manager] Fix the code formats
* [INLONG-12073][Manager] Remove the randomly select method
* [INLONG-12073][Manager] Use ConcurrentLinkedQueue to improve performance
* [INLONG-12073][Manager] Add name label for manager-schedule module
* [INLONG-12073][Manager] Remove unused properties
---
inlong-manager/manager-schedule/pom.xml | 1 +
.../resource/queue/QueueResourceOperator.java | 3 +-
.../queue/pulsar/PulsarQueueResourceOperator.java | 188 +++++++--
.../resource/queue/pulsar/QueryCountDownLatch.java | 41 +-
.../queue/pulsar/QueryLatestMessagesRunnable.java | 86 ++--
.../service/stream/InlongStreamServiceImpl.java | 9 +-
.../queue/QueryLatestMessagesRunnableTest.java | 439 +++++++++++++++++++++
.../src/main/resources/application.properties | 6 +
8 files changed, 706 insertions(+), 67 deletions(-)
diff --git a/inlong-manager/manager-schedule/pom.xml
b/inlong-manager/manager-schedule/pom.xml
index b975dedbaf..444d314ab4 100644
--- a/inlong-manager/manager-schedule/pom.xml
+++ b/inlong-manager/manager-schedule/pom.xml
@@ -25,6 +25,7 @@
</parent>
<artifactId>manager-schedule</artifactId>
+ <name>Apache InLong - Manager Schedule</name>
<properties>
<quartz.version>2.3.2</quartz.version>
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/QueueResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/QueueResourceOperator.java
index 528884c996..389cb24167 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/QueueResourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/QueueResourceOperator.java
@@ -83,11 +83,10 @@ public interface QueueResourceOperator {
* @param groupInfo inlong group info
* @param streamInfo inlong stream info
* @param request query message request
- * @throws Exception any exception if occurred
* @return query brief mq message info
*/
default List<BriefMQMessage> queryLatestMessages(InlongGroupInfo
groupInfo, InlongStreamInfo streamInfo,
- QueryMessageRequest request) throws Exception {
+ QueryMessageRequest request) {
return null;
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
index 67a1b39f7a..66cbee4fc3 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
@@ -44,17 +44,26 @@ import
org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.stream.InlongStreamService;
import com.google.common.base.Objects;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
@@ -80,7 +89,59 @@ public class PulsarQueueResourceOperator implements
QueueResourceOperator {
@Autowired
private PulsarOperator pulsarOperator;
- private ScheduledExecutorService executor =
Executors.newScheduledThreadPool(10);
+ @Value("${pulsar.query.poolSize:10}")
+ private int poolSize;
+
+ @Value("${pulsar.query.keepAliveSeconds:60}")
+ private long keepAliveSeconds;
+
+ @Value("${pulsar.query.queueCapacity:100}")
+ private int queueCapacity;
+
+ @Value("${pulsar.query.queryTimeoutSeconds:10}")
+ private int queryTimeoutSeconds;
+
+ /**
+ * Thread pool for querying messages from multiple Pulsar clusters
concurrently.
+ * Configuration is loaded from application properties with prefix
'pulsar.query'.
+ */
+ private ExecutorService messageQueryExecutor;
+
+ /**
+ * Initialize the executor service after bean creation.
+ */
+ @PostConstruct
+ public void init() {
+ // Initialize the executor service with same core pool size and max
core pool size
+ this.messageQueryExecutor = new ThreadPoolExecutor(
+ poolSize,
+ poolSize,
+ keepAliveSeconds,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(queueCapacity),
+ new
ThreadFactoryBuilder().setNameFormat("pulsar-message-query-%d").build(),
+ // Use AbortPolicy to throw exception when the queue is full
+ new ThreadPoolExecutor.AbortPolicy());
+ log.info("Init message query executor, poolSize={},
keepAliveSeconds={}, queueCapacity={}",
+ poolSize, keepAliveSeconds, queueCapacity);
+ }
+
+ /**
+ * Shutdown the executor service when the bean is destroyed.
+ */
+ @PreDestroy
+ public void shutdown() {
+ log.info("Shutting down pulsar message query executor");
+ messageQueryExecutor.shutdown();
+ try {
+ if (!messageQueryExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+ messageQueryExecutor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ messageQueryExecutor.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ }
@Override
public boolean accept(String mqType) {
@@ -307,34 +368,117 @@ public class PulsarQueueResourceOperator implements
QueueResourceOperator {
* Query latest message from pulsar
*/
public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo,
InlongStreamInfo streamInfo,
- QueryMessageRequest request) throws Exception {
- List<ClusterInfo> pulsarClusterList =
clusterService.listByTagAndType(groupInfo.getInlongClusterTag(),
- ClusterType.PULSAR);
- List<BriefMQMessage> briefMQMessages =
Collections.synchronizedList(new ArrayList<>());
- QueryCountDownLatch queryLatch = new
QueryCountDownLatch(request.getMessageCount(), pulsarClusterList.size());
+ QueryMessageRequest request) {
+ String groupId = streamInfo.getInlongGroupId();
+ String clusterTag = groupInfo.getInlongClusterTag();
+ List<ClusterInfo> clusterInfos =
clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR);
+ if (CollectionUtils.isEmpty(clusterInfos)) {
+ log.warn("No pulsar cluster found for clusterTag={} for
groupId={}", clusterTag, groupId);
+ return Collections.emptyList();
+ }
+
+ // Select clusters and calculate per-cluster query count
+ Integer requestCount = request.getMessageCount();
+ int clusterSize = clusterInfos.size();
+ QueryCountDownLatch queryLatch = new QueryCountDownLatch(requestCount,
clusterSize);
+ log.debug("Query pulsar message in {} clusters, each cluster query {}
messages", clusterSize, requestCount);
+
+ // Extract common parameters
InlongPulsarInfo inlongPulsarInfo = ((InlongPulsarInfo) groupInfo);
- for (ClusterInfo clusterInfo : pulsarClusterList) {
- QueryLatestMessagesRunnable task = new
QueryLatestMessagesRunnable(inlongPulsarInfo, streamInfo,
- (PulsarClusterInfo) clusterInfo, pulsarOperator, request,
briefMQMessages, queryLatch);
- this.executor.execute(task);
+ String tenant = inlongPulsarInfo.getPulsarTenant();
+ String namespace = inlongPulsarInfo.getMqResource();
+ String topicName = streamInfo.getMqResource();
+ boolean serialQueue =
InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(inlongPulsarInfo.getQueueModule());
+
+ // Submit query tasks to thread pool, each task queries from one
cluster
+ // Use submit() instead of execute() to get Future for cancellation
support
+ List<Future<?>> submittedTasks = new ArrayList<>();
+ // Use ConcurrentLinkedQueue for thread-safe message collection,
+ // its performance is better than Collections.synchronizedList
+ ConcurrentLinkedQueue<BriefMQMessage> messageResultQueue = new
ConcurrentLinkedQueue<>();
+ for (ClusterInfo clusterInfo : clusterInfos) {
+ PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
+ if (StringUtils.isBlank(tenant)) {
+ tenant = pulsarCluster.getPulsarTenant();
+ }
+ String fullTopicName = tenant + "/" + namespace + "/" + topicName;
+ // Create a copy of request with adjusted message count for this
cluster
+ QueryMessageRequest currentRequest =
buildRequestForSingleCluster(request, requestCount);
+ QueryLatestMessagesRunnable task = new
QueryLatestMessagesRunnable(pulsarOperator, streamInfo,
+ pulsarCluster, serialQueue, fullTopicName, currentRequest,
messageResultQueue, queryLatch);
+ try {
+ Future<?> future = this.messageQueryExecutor.submit(task);
+ submittedTasks.add(future);
+ } catch (RejectedExecutionException e) {
+ // Cancel all previously submitted tasks before throwing
exception
+ log.error("Failed to submit query task for groupId={},
cancelling {} submitted tasks",
+ groupId, submittedTasks.size(), e);
+ cancelSubmittedTasks(submittedTasks);
+ throw new BusinessException("Query messages task rejected: too
many concurrent requests");
+ }
+ }
+
+ // Wait for tasks to complete with a configurable timeout
+ String streamId = streamInfo.getInlongStreamId();
+ try {
+ boolean completed = queryLatch.await(queryTimeoutSeconds,
TimeUnit.SECONDS);
+ if (!completed) {
+ log.warn("Query messages timeout for groupId={}, streamId={},
collected {} messages",
+ groupId, streamId, messageResultQueue.size());
+ }
+ } catch (InterruptedException e) {
+ throw new BusinessException(String.format("Query messages task
interrupted for groupId=%s, streamId=%s",
+ groupId, streamId));
+ }
+
+ log.info("Success query pulsar message for groupId={}, streamId={}",
groupId, streamId);
+ List<BriefMQMessage> messageResultList = new
ArrayList<>(messageResultQueue);
+
+ // if query result size is less than request count, return all,
otherwise truncate to request count
+ if (messageResultList.isEmpty() || messageResultList.size() <=
requestCount) {
+ return messageResultList;
}
- queryLatch.await(30, TimeUnit.SECONDS);
- log.info("success query pulsar message for groupId={}, streamId={}",
streamInfo.getInlongGroupId(),
- streamInfo.getInlongStreamId());
-
- int finalMsgCount = Math.min(request.getMessageCount(),
briefMQMessages.size());
- if (finalMsgCount > 0) {
- return new ArrayList<>(briefMQMessages.subList(0, finalMsgCount));
- } else {
- return new ArrayList<>();
+
+ return new ArrayList<>(messageResultList.subList(0, requestCount));
+ }
+
+ /**
+ * Build a new QueryMessageRequest with adjusted message count for a
specific cluster.
+ */
+ private QueryMessageRequest
buildRequestForSingleCluster(QueryMessageRequest original, int messageCount) {
+ return QueryMessageRequest.builder()
+ .groupId(original.getGroupId())
+ .streamId(original.getStreamId())
+ .messageCount(messageCount)
+ .fieldName(original.getFieldName())
+ .operationType(original.getOperationType())
+ .targetValue(original.getTargetValue())
+ .build();
+ }
+
+ /**
+ * Cancel all submitted tasks when an error occurs.
+ * This method attempts to cancel tasks with interrupt flag set to true,
+ * allowing running tasks to be interrupted if they check for interruption.
+ *
+ * @param submittedTasks list of Future objects representing submitted
tasks
+ */
+ private void cancelSubmittedTasks(List<java.util.concurrent.Future<?>>
submittedTasks) {
+ int cancelledCount = 0;
+ for (java.util.concurrent.Future<?> future : submittedTasks) {
+ // mayInterruptIfRunning=true allows interrupting running tasks
+ if (future.cancel(true)) {
+ cancelledCount++;
+ }
}
+ log.info("Cancelled {}/{} submitted tasks", cancelledCount,
submittedTasks.size());
}
/**
* Reset cursor for consumer group
*/
public void resetCursor(InlongGroupInfo groupInfo, InlongStreamEntity
streamEntity, StreamSinkEntity sinkEntity,
- Long resetTime) throws Exception {
+ Long resetTime) {
log.info("begin to reset cursor for sinkId={}", sinkEntity.getId());
InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
List<ClusterInfo> clusterInfos =
clusterService.listByTagAndType(pulsarInfo.getInlongClusterTag(),
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryCountDownLatch.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryCountDownLatch.java
index 08970b39d7..a76dc14f32 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryCountDownLatch.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryCountDownLatch.java
@@ -21,13 +21,20 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
- * QueryCountDownLatch
+ * QueryCountDownLatch for managing query task and data completion.
+ * <p>
+ * This class provides two types of countdown:
+ * <ul>
+ * <li>Task countdown: tracks the number of tasks completed (regardless of
success or failure)</li>
+ * <li>Data countdown: tracks the number of data items retrieved</li>
+ * </ul>
+ * The flagLatch is released when either all tasks complete or enough data is
collected.
*/
public class QueryCountDownLatch {
- private CountDownLatch dataLatch;
- private CountDownLatch taskLatch;
- private CountDownLatch flagLatch;
+ private final CountDownLatch dataLatch;
+ private final CountDownLatch taskLatch;
+ private final CountDownLatch flagLatch;
public QueryCountDownLatch(int dataSize, int taskSize) {
this.dataLatch = new CountDownLatch(dataSize);
@@ -35,12 +42,34 @@ public class QueryCountDownLatch {
this.flagLatch = new CountDownLatch(1);
}
- public void countDown(int dataDownSize) {
+ /**
+ * Called when a task completes (regardless of success or failure).
+ * This should be called in a finally block to ensure it's always executed.
+ */
+ public void taskCountDown() {
this.taskLatch.countDown();
+ checkAndRelease();
+ }
+
+ /**
+ * Called when data items are successfully retrieved.
+ *
+ * @param dataDownSize the number of data items retrieved
+ */
+ public void dataCountDown(int dataDownSize) {
for (int i = 0; i < dataDownSize; i++) {
this.dataLatch.countDown();
}
- if (this.taskLatch.getCount() == 0 || this.dataLatch.getCount() == 0) {
+ checkAndRelease();
+ }
+
+ /**
+ * Check if the flagLatch should be released.
+ * Release when all tasks complete or enough data is collected.
+ */
+ private synchronized void checkAndRelease() {
+ if (this.flagLatch.getCount() > 0
+ && (this.taskLatch.getCount() == 0 ||
this.dataLatch.getCount() == 0)) {
this.flagLatch.countDown();
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java
index 4fb6b58e49..37eabf65f3 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java
@@ -17,63 +17,89 @@
package org.apache.inlong.manager.service.resource.queue.pulsar;
-import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
-import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
/**
- * QueryLatestMessagesRunnable
+ * Runnable task for querying latest messages from a Pulsar cluster.
*/
public class QueryLatestMessagesRunnable implements Runnable {
- private InlongPulsarInfo inlongPulsarInfo;
- private InlongStreamInfo streamInfo;
- private PulsarClusterInfo clusterInfo;
- private PulsarOperator pulsarOperator;
- private QueryMessageRequest queryMessageRequest;
- private List<BriefMQMessage> briefMQMessages;
- private QueryCountDownLatch latch;
+ private static final Logger LOG =
LoggerFactory.getLogger(QueryLatestMessagesRunnable.class);
- public QueryLatestMessagesRunnable(InlongPulsarInfo inlongPulsarInfo,
+ private final PulsarOperator pulsarOperator;
+ private final InlongStreamInfo streamInfo;
+ private final PulsarClusterInfo clusterInfo;
+ private final boolean serialQueue;
+ private final String fullTopicName;
+ private final QueryMessageRequest queryMessageRequest;
+ private final ConcurrentLinkedQueue<BriefMQMessage> messageResultQueue;
+ private final QueryCountDownLatch latch;
+
+ public QueryLatestMessagesRunnable(
+ PulsarOperator pulsarOperator,
InlongStreamInfo streamInfo,
PulsarClusterInfo clusterInfo,
- PulsarOperator pulsarOperator,
+ boolean serialQueue,
+ String fullTopicName,
QueryMessageRequest queryMessageRequest,
- List<BriefMQMessage> briefMQMessages,
+ ConcurrentLinkedQueue<BriefMQMessage> messageResultQueue,
QueryCountDownLatch latch) {
- this.inlongPulsarInfo = inlongPulsarInfo;
+ this.pulsarOperator = pulsarOperator;
this.streamInfo = streamInfo;
this.clusterInfo = clusterInfo;
- this.pulsarOperator = pulsarOperator;
+ this.serialQueue = serialQueue;
+ this.fullTopicName = fullTopicName;
this.queryMessageRequest = queryMessageRequest;
- this.briefMQMessages = briefMQMessages;
+ this.messageResultQueue = messageResultQueue;
this.latch = latch;
}
@Override
public void run() {
- String tenant = inlongPulsarInfo.getPulsarTenant();
- if (StringUtils.isBlank(tenant)) {
- tenant = clusterInfo.getPulsarTenant();
- }
+ String clusterName = clusterInfo.getName();
+ LOG.debug("Begin to query messages from cluster={}, topic={}",
clusterName, fullTopicName);
+ try {
+ // Check for interruption before starting the query
+ if (Thread.currentThread().isInterrupted()) {
+ LOG.info("Task interrupted before query, cluster={},
topic={}", clusterName, fullTopicName);
+ return;
+ }
+
+ List<BriefMQMessage> messages =
pulsarOperator.queryLatestMessage(clusterInfo, fullTopicName,
+ queryMessageRequest, streamInfo, serialQueue);
+
+ // Check for interruption after query completes
+ // (IO operations not support interruption, so we check the flag
manually after the blocking call returns)
+ if (Thread.currentThread().isInterrupted()) {
+ LOG.info("Task interrupted after query, discarding results,
cluster={}, topic={}",
+ clusterName, fullTopicName);
+ return;
+ }
- String namespace = inlongPulsarInfo.getMqResource();
- String topicName = streamInfo.getMqResource();
- String fullTopicName = tenant + "/" + namespace + "/" + topicName;
- boolean serial =
InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(inlongPulsarInfo.getQueueModule());
- List<BriefMQMessage> messages =
- pulsarOperator.queryLatestMessage(clusterInfo, fullTopicName,
queryMessageRequest, streamInfo, serial);
- if (CollectionUtils.isNotEmpty(messages)) {
- briefMQMessages.addAll(messages);
- this.latch.countDown(messages.size());
+ if (CollectionUtils.isNotEmpty(messages)) {
+ messageResultQueue.addAll(messages);
+ this.latch.dataCountDown(messages.size());
+ LOG.debug("Successfully queried {} messages from cluster={},
topic={}",
+ messages.size(), clusterName, fullTopicName);
+ } else {
+ LOG.debug("No messages found from cluster={}, topic={}",
clusterName, fullTopicName);
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to query messages from cluster={}, groupId={},
streamId={}",
+ clusterName, queryMessageRequest.getGroupId(),
queryMessageRequest.getStreamId(), e);
+ } finally {
+ // Ensure taskCountDown is always called, regardless of success or
failure
+ this.latch.taskCountDown();
}
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index 46f8fe5e94..8b11651d2a 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -981,13 +981,8 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
InlongGroupOperator instance =
groupOperatorFactory.getInstance(groupEntity.getMqType());
InlongGroupInfo groupInfo = instance.getFromEntity(groupEntity);
InlongStreamInfo inlongStreamInfo = get(request.getGroupId(),
request.getStreamId());
- List<BriefMQMessage> messageList = new ArrayList<>();
QueueResourceOperator queueOperator =
queueOperatorFactory.getInstance(groupEntity.getMqType());
- try {
- messageList = queueOperator.queryLatestMessages(groupInfo,
inlongStreamInfo, request);
- } catch (Exception e) {
- LOGGER.error("query message error ", e);
- }
- return messageList;
+ // Do not catch exception, throws it to caller
+ return queueOperator.queryLatestMessages(groupInfo, inlongStreamInfo,
request);
}
}
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/queue/QueryLatestMessagesRunnableTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/queue/QueryLatestMessagesRunnableTest.java
new file mode 100644
index 0000000000..fe90083ba3
--- /dev/null
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/queue/QueryLatestMessagesRunnableTest.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.queue;
+
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
+import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
+import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarOperator;
+import
org.apache.inlong.manager.service.resource.queue.pulsar.QueryCountDownLatch;
+import
org.apache.inlong.manager.service.resource.queue.pulsar.QueryLatestMessagesRunnable;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test class for {@link QueryLatestMessagesRunnable}.
+ * Tests focus on interrupt handling behavior.
+ */
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
+public class QueryLatestMessagesRunnableTest {
+
+ @Mock
+ private PulsarOperator pulsarOperator;
+
+ @Mock
+ private InlongStreamInfo streamInfo;
+
+ @Mock
+ private PulsarClusterInfo clusterInfo;
+
+ private QueryMessageRequest queryMessageRequest;
+ private ConcurrentLinkedQueue<BriefMQMessage> messageResultQueue;
+ private QueryCountDownLatch queryLatch;
+
+ private static final String CLUSTER_NAME = "test-cluster";
+ private static final String FULL_TOPIC_NAME =
"public/test-namespace/test-topic";
+ private static final String GROUP_ID = "test-group";
+ private static final String STREAM_ID = "test-stream";
+
+ @BeforeEach
+ public void setUp() {
+ when(clusterInfo.getName()).thenReturn(CLUSTER_NAME);
+
+ queryMessageRequest = QueryMessageRequest.builder()
+ .groupId(GROUP_ID)
+ .streamId(STREAM_ID)
+ .messageCount(10)
+ .build();
+
+ messageResultQueue = new ConcurrentLinkedQueue<>();
+ queryLatch = new QueryCountDownLatch(10, 1);
+ }
+
+ /**
+ * Test: Task executes successfully when not interrupted.
+ * Verifies that messages are added to result list and latch is counted
down.
+ */
+ @Test
+ public void testSuccessfulQueryWithoutInterruption() {
+ // 准备 mock 返回数据
+ List<BriefMQMessage> mockMessages = new ArrayList<>();
+ BriefMQMessage message = new BriefMQMessage();
+ message.setBody("test message");
+ mockMessages.add(message);
+
+ when(pulsarOperator.queryLatestMessage(any(), anyString(), any(),
any(), anyBoolean()))
+ .thenReturn(mockMessages);
+
+ QueryLatestMessagesRunnable task = new QueryLatestMessagesRunnable(
+ pulsarOperator, streamInfo, clusterInfo, false,
FULL_TOPIC_NAME,
+ queryMessageRequest, messageResultQueue, queryLatch);
+
+ // 同步执行任务
+ task.run();
+
+ // 验证:查询被执行
+ verify(pulsarOperator, times(1)).queryLatestMessage(any(),
anyString(), any(), any(), anyBoolean());
+ // 验证:结果被添加到列表
+ assertEquals(1, messageResultQueue.size());
+ }
+
+ /**
+ * Test: Task exits immediately when thread is interrupted before query
starts.
+ * Verifies that no query is performed and latch is still counted down.
+ */
+ @Test
+ public void testInterruptionBeforeQuery() throws InterruptedException {
+ QueryLatestMessagesRunnable task = new QueryLatestMessagesRunnable(
+ pulsarOperator, streamInfo, clusterInfo, false,
FULL_TOPIC_NAME,
+ queryMessageRequest, messageResultQueue, queryLatch);
+
+ // 创建一个在执行任务前就被中断的线程
+ Thread testThread = new Thread(() -> {
+ // 在执行任务前中断线程
+ Thread.currentThread().interrupt();
+ task.run();
+ });
+
+ testThread.start();
+ testThread.join(5000);
+
+ // 验证:查询未被执行(因为在查询前就检测到中断)
+ verify(pulsarOperator, never()).queryLatestMessage(any(), anyString(),
any(), any(), anyBoolean());
+ // 验证:结果列表为空
+ assertTrue(messageResultQueue.isEmpty());
+ }
+
+ /**
+ * Test: Task discards results when interrupted after query completes.
+ * Verifies that results are not added to the list even if query returned
data.
+ */
+ @Test
+ public void testInterruptionAfterQuery() throws InterruptedException {
+ // 模拟查询操作会检查并设置中断标志
+ List<BriefMQMessage> mockMessages = new ArrayList<>();
+ BriefMQMessage message = new BriefMQMessage();
+ message.setBody("test message");
+ mockMessages.add(message);
+
+ CountDownLatch queryStartedLatch = new CountDownLatch(1);
+ CountDownLatch interruptSetLatch = new CountDownLatch(1);
+
+ when(pulsarOperator.queryLatestMessage(any(), anyString(), any(),
any(), anyBoolean()))
+ .thenAnswer(invocation -> {
+ // 通知主线程查询已开始
+ queryStartedLatch.countDown();
+ // 等待主线程设置中断标志
+ interruptSetLatch.await(5, TimeUnit.SECONDS);
+ // 返回结果(模拟 IO 操作不响应中断)
+ return mockMessages;
+ });
+
+ QueryLatestMessagesRunnable task = new QueryLatestMessagesRunnable(
+ pulsarOperator, streamInfo, clusterInfo, false,
FULL_TOPIC_NAME,
+ queryMessageRequest, messageResultQueue, queryLatch);
+
+ // 在线程池中执行任务
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ Future<?> future = executor.submit(task);
+
+ // 等待查询开始
+ assertTrue(queryStartedLatch.await(5, TimeUnit.SECONDS));
+
+ // 取消任务(设置中断标志)
+ future.cancel(true);
+ // 通知任务可以继续
+ interruptSetLatch.countDown();
+
+ // 等待任务完成
+ Thread.sleep(500);
+
+ // 验证:查询被执行
+ verify(pulsarOperator, times(1)).queryLatestMessage(any(),
anyString(), any(), any(), anyBoolean());
+ // 验证:结果被丢弃(因为查询后检测到中断标志)
+ assertTrue(messageResultQueue.isEmpty());
+
+ executor.shutdownNow();
+ }
+
+ /**
+ * Test: Task handles empty query results correctly.
+ * Verifies that no exception is thrown and latch is counted down.
+ */
+ @Test
+ public void testEmptyQueryResults() {
+ when(pulsarOperator.queryLatestMessage(any(), anyString(), any(),
any(), anyBoolean()))
+ .thenReturn(Collections.emptyList());
+
+ QueryLatestMessagesRunnable task = new QueryLatestMessagesRunnable(
+ pulsarOperator, streamInfo, clusterInfo, false,
FULL_TOPIC_NAME,
+ queryMessageRequest, messageResultQueue, queryLatch);
+
+ task.run();
+
+ // 验证:查询被执行
+ verify(pulsarOperator, times(1)).queryLatestMessage(any(),
anyString(), any(), any(), anyBoolean());
+ // 验证:结果列表为空
+ assertTrue(messageResultQueue.isEmpty());
+ }
+
+ /**
+ * Test: Task handles null query results correctly.
+ * Verifies that no exception is thrown and latch is counted down.
+ */
+ @Test
+ public void testNullQueryResults() {
+ when(pulsarOperator.queryLatestMessage(any(), anyString(), any(),
any(), anyBoolean()))
+ .thenReturn(null);
+
+ QueryLatestMessagesRunnable task = new QueryLatestMessagesRunnable(
+ pulsarOperator, streamInfo, clusterInfo, false,
FULL_TOPIC_NAME,
+ queryMessageRequest, messageResultQueue, queryLatch);
+
+ task.run();
+
+ // 验证:查询被执行
+ verify(pulsarOperator, times(1)).queryLatestMessage(any(),
anyString(), any(), any(), anyBoolean());
+ // 验证:结果列表为空
+ assertTrue(messageResultQueue.isEmpty());
+ }
+
+ /**
+ * Test: Task handles query exception gracefully.
+ * Verifies that exception is caught and latch is still counted down.
+ */
+ @Test
+ public void testQueryException() {
+ when(pulsarOperator.queryLatestMessage(any(), anyString(), any(),
any(), anyBoolean()))
+ .thenThrow(new RuntimeException("Simulated query failure"));
+
+ QueryLatestMessagesRunnable task = new QueryLatestMessagesRunnable(
+ pulsarOperator, streamInfo, clusterInfo, false,
FULL_TOPIC_NAME,
+ queryMessageRequest, messageResultQueue, queryLatch);
+
+ // 不应抛出异常
+ task.run();
+
+ // 验证:查询被执行
+ verify(pulsarOperator, times(1)).queryLatestMessage(any(),
anyString(), any(), any(), anyBoolean());
+ // 验证:结果列表为空
+ assertTrue(messageResultQueue.isEmpty());
+ }
+
+ /**
+ * Test: Multiple tasks can be cancelled together.
+ * Simulates the scenario where RejectedExecutionException occurs and all
submitted tasks need to be cancelled.
+ */
+ @Test
+ public void testMultipleTaskCancellation() throws InterruptedException {
+ int taskCount = 5;
+ ConcurrentLinkedQueue<BriefMQMessage> sharedResultList = new
ConcurrentLinkedQueue<>();
+ QueryCountDownLatch sharedLatch = new QueryCountDownLatch(50,
taskCount);
+
+ // 模拟慢查询
+ CountDownLatch allTasksStarted = new CountDownLatch(taskCount);
+ CountDownLatch proceedLatch = new CountDownLatch(1);
+
+ List<BriefMQMessage> mockMessages = new ArrayList<>();
+ BriefMQMessage message = new BriefMQMessage();
+ message.setBody("test message");
+ mockMessages.add(message);
+
+ when(pulsarOperator.queryLatestMessage(any(), anyString(), any(),
any(), anyBoolean()))
+ .thenAnswer(invocation -> {
+ allTasksStarted.countDown();
+ // 等待信号继续
+ proceedLatch.await(10, TimeUnit.SECONDS);
+ return mockMessages;
+ });
+
+ ExecutorService executor = Executors.newFixedThreadPool(taskCount);
+ List<Future<?>> futures = new ArrayList<>();
+
+ // 提交多个任务
+ for (int i = 0; i < taskCount; i++) {
+ QueryLatestMessagesRunnable task = new QueryLatestMessagesRunnable(
+ pulsarOperator, streamInfo, clusterInfo, false,
FULL_TOPIC_NAME,
+ queryMessageRequest, sharedResultList, sharedLatch);
+ futures.add(executor.submit(task));
+ }
+
+ // 等待所有任务开始执行
+ assertTrue(allTasksStarted.await(5, TimeUnit.SECONDS));
+
+ // 取消所有任务
+ int cancelledCount = 0;
+ for (Future<?> future : futures) {
+ if (future.cancel(true)) {
+ cancelledCount++;
+ }
+ }
+
+ // 允许任务继续
+ proceedLatch.countDown();
+
+ // 关闭线程池
+ executor.shutdown();
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+
+ // 验证:结果被丢弃(因为中断标志已设置)
+ assertTrue(sharedResultList.isEmpty());
+ }
+
+ /**
+ * Test: Verifies interrupt flag is checked at the right points.
+ * This test ensures the interrupt check happens both before and after the
query.
+ */
+ @Test
+ public void testInterruptCheckPoints() throws InterruptedException {
+ List<BriefMQMessage> mockMessages = new ArrayList<>();
+ BriefMQMessage message = new BriefMQMessage();
+ message.setBody("test message");
+ mockMessages.add(message);
+
+ // 记录查询被调用的次数
+ final int[] queryCallCount = {0};
+
+ when(pulsarOperator.queryLatestMessage(any(), anyString(), any(),
any(), anyBoolean()))
+ .thenAnswer(invocation -> {
+ queryCallCount[0]++;
+ return mockMessages;
+ });
+
+ // 场景1: 正常执行(不中断)
+ QueryLatestMessagesRunnable normalTask = new
QueryLatestMessagesRunnable(
+ pulsarOperator, streamInfo, clusterInfo, false,
FULL_TOPIC_NAME,
+ queryMessageRequest, messageResultQueue, queryLatch);
+ normalTask.run();
+
+ assertEquals(1, queryCallCount[0], "Query should be called once in
normal execution");
+ assertEquals(1, messageResultQueue.size(), "Result should be added in
normal execution");
+
+ // 重置
+ queryCallCount[0] = 0;
+ messageResultQueue.clear();
+ queryLatch = new QueryCountDownLatch(10, 1);
+
+ // 场景2: 查询前中断
+ Thread preInterruptThread = new Thread(() -> {
+ Thread.currentThread().interrupt();
+ QueryLatestMessagesRunnable task = new QueryLatestMessagesRunnable(
+ pulsarOperator, streamInfo, clusterInfo, false,
FULL_TOPIC_NAME,
+ queryMessageRequest, messageResultQueue, queryLatch);
+ task.run();
+ });
+ preInterruptThread.start();
+ preInterruptThread.join(5000);
+
+ assertEquals(0, queryCallCount[0], "Query should not be called when
interrupted before");
+ assertTrue(messageResultQueue.isEmpty(), "Result should not be added
when interrupted before");
+ }
+
+ /**
+ * Test: Verifies that RejectedExecutionException message contains
'rejected' keyword.
+ * This ensures the exception can be properly identified when queue is
full.
+ */
+ @Test
+ public void testRejectedExecutionExceptionContainsRejectKeyword() {
+ // Create a thread pool with minimal capacity to trigger rejection
+ ExecutorService tinyExecutor = new
java.util.concurrent.ThreadPoolExecutor(
+ 1, 1, 0L, TimeUnit.MILLISECONDS,
+ new java.util.concurrent.LinkedBlockingQueue<>(1),
+ new java.util.concurrent.ThreadPoolExecutor.AbortPolicy());
+
+ CountDownLatch blockingLatch = new CountDownLatch(1);
+ try {
+ // Submit blocking tasks to fill the pool and queue
+ tinyExecutor.submit(() -> {
+ try {
+ blockingLatch.await(30, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+
+ tinyExecutor.submit(() -> {
+ try {
+ blockingLatch.await(30, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+
+ // This submission should be rejected (pool full, queue full)
+ tinyExecutor.submit(() -> {
+ // This task should never execute
+ });
+
+ // If we reach here, the test failed
+ org.junit.jupiter.api.Assertions.fail("Expected
RejectedExecutionException was not thrown");
+ } catch (java.util.concurrent.RejectedExecutionException e) {
+ // Verify the exception message contains 'reject' keyword
(case-insensitive)
+ String message = e.getMessage();
+ assertTrue(message != null &&
message.toLowerCase().contains("reject"),
+ "RejectedExecutionException message should contain
'reject' keyword, but was: " + message);
+ } finally {
+ blockingLatch.countDown();
+ tinyExecutor.shutdownNow();
+ }
+ }
+
+ /**
+ * Test: Verifies that BusinessException thrown by
PulsarQueueResourceOperator contains 'reject' keyword.
+ * This simulates the scenario where the queue is full and task submission
is rejected.
+ */
+ @Test
+ public void testBusinessExceptionContainsRejectKeywordWhenQueueFull() {
+ // Simulate the BusinessException that would be thrown when
RejectedExecutionException occurs
+ String expectedMessage = "Query messages task rejected: too many
concurrent requests";
+ BusinessException exception = new BusinessException(expectedMessage);
+
+ // Verify the exception message contains 'reject' keyword
+ assertTrue(exception.getMessage().contains("reject"),
+ "BusinessException message should contain 'reject' keyword");
+ }
+}
diff --git
a/inlong-manager/manager-web/src/main/resources/application.properties
b/inlong-manager/manager-web/src/main/resources/application.properties
index a498ac4d15..07a1e53806 100644
--- a/inlong-manager/manager-web/src/main/resources/application.properties
+++ b/inlong-manager/manager-web/src/main/resources/application.properties
@@ -64,5 +64,11 @@ openapi.auth.enabled=false
audit.admin.ids=3,4,5,6
audit.user.ids=3,4,5,6
+# Pulsar message query thread pool configuration
+pulsar.query.poolSize=10
+pulsar.query.keepAliveSeconds=60
+pulsar.query.queueCapacity=100
+pulsar.query.queryTimeoutSeconds=10
+
# tencent cloud log service endpoint, The Operator cls resource by it
cls.manager.endpoint=127.0.0.1
\ No newline at end of file