fuweng11 commented on code in PR #12074:
URL: https://github.com/apache/inlong/pull/12074#discussion_r2844113665


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java:
##########
@@ -307,34 +372,150 @@ private void deletePulsarTopic(InlongPulsarInfo 
pulsarInfo, PulsarClusterInfo pu
      * Query latest message from pulsar
      */
     public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo, 
InlongStreamInfo streamInfo,
-            QueryMessageRequest request) throws Exception {
-        List<ClusterInfo> pulsarClusterList = 
clusterService.listByTagAndType(groupInfo.getInlongClusterTag(),
-                ClusterType.PULSAR);
-        List<BriefMQMessage> briefMQMessages = 
Collections.synchronizedList(new ArrayList<>());
-        QueryCountDownLatch queryLatch = new 
QueryCountDownLatch(request.getMessageCount(), pulsarClusterList.size());
+            QueryMessageRequest request) {
+        String groupId = streamInfo.getInlongGroupId();
+        String clusterTag = groupInfo.getInlongClusterTag();
+        List<ClusterInfo> clusterInfos = 
clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR);
+        if (CollectionUtils.isEmpty(clusterInfos)) {
+            log.warn("No pulsar cluster found for clusterTag={} for 
groupId={}", clusterTag, groupId);
+            return Collections.emptyList();
+        }
+
+        // Select clusters and calculate per-cluster query count
+        Integer requestCount = request.getMessageCount();
+        List<ClusterInfo> selectedClusters = 
randomSelectQueryClusters(clusterInfos, requestCount);

Review Comment:
   Data may be concentrated on a specific pulsar, making it impossible for 
other pulsar clusters to retrieve the data. Therefore, it is not feasible to 
randomly select a cluster for querying. Instead, all pulsar clusters need to be 
queried.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to