xinyiZzz commented on code in PR #50939:
URL: https://github.com/apache/doris/pull/50939#discussion_r2094387996


##########
fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java:
##########
@@ -91,127 +86,78 @@ public boolean submit(ConnectContext context) {
         return true;
     }
 
-    // Register one connection with its connection id.
-    // Return -1 means register OK
-    // Return >=0 means register failed, and return value is current 
connection num.
-    public int registerConnection(ConnectContext ctx) {
-        if (numberConnection.incrementAndGet() > maxConnections) {
-            numberConnection.decrementAndGet();
-            return numberConnection.get();
-        }
-        // Check user
-        connByUser.putIfAbsent(ctx.getQualifiedUser(), new AtomicInteger(0));
-        AtomicInteger conns = connByUser.get(ctx.getQualifiedUser());
-        if (conns.incrementAndGet() > 
ctx.getEnv().getAuth().getMaxConn(ctx.getQualifiedUser())) {
-            conns.decrementAndGet();
-            numberConnection.decrementAndGet();
-            return numberConnection.get();
-        }
-        connectionMap.put(ctx.getConnectionId(), ctx);
-        if (ctx.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) {
-            flightToken2ConnectionId.put(ctx.getPeerIdentity(), 
ctx.getConnectionId());
-        }
-        return -1;
-    }
-
-    public void unregisterConnection(ConnectContext ctx) {
-        ctx.closeTxn();
-        if (connectionMap.remove(ctx.getConnectionId()) != null) {
-            AtomicInteger conns = connByUser.get(ctx.getQualifiedUser());
-            if (conns != null) {
-                conns.decrementAndGet();
-            }
-            numberConnection.decrementAndGet();
-            if (ctx.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) {
-                flightToken2ConnectionId.remove(ctx.getPeerIdentity());
-            }
-        }
-    }
-
     public ConnectContext getContext(int connectionId) {
-        return connectionMap.get(connectionId);
-    }
-
-    public ConnectContext getContextWithQueryId(String queryId) {
-        for (ConnectContext context : connectionMap.values()) {
-            if (queryId.equals(DebugUtil.printId(context.queryId))) {
-                return context;
-            }
+        ConnectContext ctx = connectPoolMgr.getContext(connectionId);
+        if (ctx == null) {
+            ctx = flightSqlConnectPoolMgr.getContext(connectionId);
         }
-        return null;
+        return ctx;
     }
 
-    public ConnectContext getContext(String flightToken) {
-        if (flightToken2ConnectionId.containsKey(flightToken)) {
-            int connectionId = flightToken2ConnectionId.get(flightToken);
-            return getContext(connectionId);
+    public ConnectContext getContextWithQueryId(String queryId) {
+        ConnectContext ctx = connectPoolMgr.getContextWithQueryId(queryId);
+        if (ctx == null) {
+            ctx = flightSqlConnectPoolMgr.getContextWithQueryId(queryId);
         }
-        return null;
+        return ctx;
     }
 
-    public void cancelQuery(String queryId, Status cancelReason) {
-        for (ConnectContext ctx : connectionMap.values()) {
-            TUniqueId qid = ctx.queryId();
-            if (qid != null && DebugUtil.printId(qid).equals(queryId)) {
-                ctx.cancelQuery(cancelReason);
-                break;
-            }
+    public boolean cancelQuery(String queryId, Status cancelReason) {
+        boolean ret = connectPoolMgr.cancelQuery(queryId, cancelReason);
+        if (!ret) {
+            ret = flightSqlConnectPoolMgr.cancelQuery(queryId, cancelReason);
         }
+        return ret;
     }
 
     public int getConnectionNum() {
-        return numberConnection.get();
+        return connectPoolMgr.getConnectionNum() + 
flightSqlConnectPoolMgr.getConnectionNum();
     }
 
-    public List<ConnectContext.ThreadInfo> listConnection(String user, boolean 
isFull) {
+    public List<ThreadInfo> listConnection(String user, boolean isFull) {
         List<ConnectContext.ThreadInfo> infos = Lists.newArrayList();
-        for (ConnectContext ctx : connectionMap.values()) {
-            // Check auth
-            if (!ctx.getQualifiedUser().equals(user) && 
!Env.getCurrentEnv().getAccessManager()
-                    .checkGlobalPriv(ConnectContext.get(), 
PrivPredicate.ADMIN)) {
-                continue;
-            }
-
-            infos.add(ctx.toThreadInfo(isFull));
-        }
+        infos.addAll(connectPoolMgr.listConnection(user, isFull));
+        infos.addAll(flightSqlConnectPoolMgr.listConnection(user, isFull));
         return infos;
     }
 
     // used for thrift
     public List<List<String>> listConnectionForRpc(UserIdentity userIdentity, 
boolean isShowFullSql,
             Optional<String> timeZone) {
         List<List<String>> list = new ArrayList<>();
-        long nowMs = System.currentTimeMillis();
-        for (ConnectContext ctx : connectionMap.values()) {
-            // Check auth
-            if (!ctx.getCurrentUserIdentity().equals(userIdentity) && 
!Env.getCurrentEnv()
-                    .getAccessManager()
-                    .checkGlobalPriv(userIdentity, PrivPredicate.GRANT)) {
-                continue;
-            }
-            list.add(ctx.toThreadInfo(isShowFullSql).toRow(-1, nowMs, 
timeZone));
-        }
+        list.addAll(connectPoolMgr.listConnectionForRpc(userIdentity, 
isShowFullSql, timeZone));
+        list.addAll(flightSqlConnectPoolMgr.listConnectionForRpc(userIdentity, 
isShowFullSql, timeZone));
         return list;
     }
 
-    public void putTraceId2QueryId(String traceId, TUniqueId queryId) {
-        traceId2QueryId.put(traceId, queryId);
-    }
-
     public String getQueryIdByTraceId(String traceId) {
-        TUniqueId queryId = traceId2QueryId.get(traceId);
-        return queryId == null ? "" : DebugUtil.printId(queryId);
+        String queryId = connectPoolMgr.getQueryIdByTraceId(traceId);
+        if (Objects.equals(queryId, "")) {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to