This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/master by this push:
     new ef268fb33c Optimize monitor module (#5216)
ef268fb33c is described below

commit ef268fb33c0f1d1c4f8c15a139628e443fb02998
Author: Liboyistu <[email protected]>
AuthorDate: Mon Sep 22 17:14:51 2025 +0800

    Optimize monitor module (#5216)
    
    Co-authored-by: peacewong <[email protected]>
---
 .../linkis/monitor/config/ListenerConfig.java      |   4 +-
 .../department/dao/UserDepartmentInfoMapper.java}  |  34 ++--
 .../department/entity/UserDepartmentInfo.java      | 150 +++++++++++++++++
 .../ClientSingleton.java}                          |  92 ++---------
 .../linkis/monitor/jobhistory/QueryUtils.java      |   2 +-
 .../monitor/scheduled/EntranceTaskMonitor.java     |   7 +-
 .../monitor/scheduled/JobHistoryMonitor.java       |  67 +++++++-
 .../linkis/monitor/scheduled/ResourceMonitor.java  |   2 +-
 .../monitor/scheduled/UserDepartmentInfoSync.java  | 146 +++++++++++++++++
 .../linkis/monitor/scheduled/UserModeMonitor.java  |   7 +-
 .../apache/linkis/monitor/until/HttpsUntils.java   | 181 ++++++++++-----------
 .../linkis/monitor/until/JobMonitorUtils.java      |   8 +-
 .../apache/linkis/monitor/until/ThreadUtils.java   |   9 +
 .../resources/mapper/common/JobHistoryMapper.xml   |   4 +-
 .../mapper/common/UserDepartmentInfoMapper.xml     |  57 +++++++
 .../LinkisJobHistoryScanSpringConfiguration.scala  |   5 +
 .../linkis/monitor/client/MonitorHTTPClient.scala  |  42 ++++-
 .../client/MonitorHTTPClientClientImpl.scala       |   4 +-
 .../monitor/client/MonitorResourceClient.scala     |   4 +-
 .../monitor/client/MonitorResourceClientImpl.scala |   6 +-
 .../linkis/monitor/constants/Constants.scala       |  15 ++
 .../linkis/monitor/factory/MapperFactory.scala     |  11 ++
 .../monitor/jobhistory/JobHistoryDataFetcher.scala |  94 +++++------
 .../analyze/JobHistoryAnalyzeAlertSender.scala}    |  21 ++-
 .../analyze/JobHistoryAnalyzeHitEvent.scala}       |   6 +-
 .../jobhistory/analyze/JobHistoryAnalyzeRule.scala |  60 +++++++
 .../jobhistory/jobtime/JobTimeExceedRule.scala     |   3 +-
 .../jobtime/StarrocksTimeExceedAlertSender.scala   |  84 ++++++++++
 .../jobtime/StarrocksTimeExceedHitEvent.scala}     |   6 +-
 .../jobtime/StarrocksTimeExceedRule.scala          | 128 +++++++++++++++
 .../jobtime/StarrocksTimeKillAlertSender.scala}    |  14 +-
 .../jobhistory/jobtime/StarrocksTimeKillRule.scala | 125 ++++++++++++++
 ...{EmsListAction.scala => AnalyzeJobAction.scala} |  44 ++---
 .../monitor/request/DataSourceParamsAction.scala   |  92 +++++++++++
 .../linkis/monitor/request/EmsListAction.scala     |   2 +-
 .../monitor/request/EntranceTaskAction.scala       |   2 +-
 .../linkis/monitor/request/KeyvalueAction.scala    |  80 +++++++++
 .../linkis/monitor/request/KillJobAction.scala     |  80 +++++++++
 .../monitor/request/MonitorResourceAction.scala    |   3 +-
 .../apache/linkis/monitor/request/UserAction.scala |  10 +-
 .../monitor/response/AnalyzeJobResultAction.scala} |  20 ++-
 .../linkis/monitor/response/KeyvalueResult.scala}  |  23 ++-
 .../monitor/response/KillJobResultAction.scala}    |  20 ++-
 .../utils/alert/ims/MonitorAlertUtils.scala        |  31 ++--
 .../linkis/monitor/utils/job/JohistoryUtils.scala} |  28 +++-
 45 files changed, 1458 insertions(+), 375 deletions(-)

diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/config/ListenerConfig.java
 
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/config/ListenerConfig.java
index eb5c11af87..98aec85f00 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/config/ListenerConfig.java
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/config/ListenerConfig.java
@@ -17,7 +17,7 @@
 
 package org.apache.linkis.monitor.config;
 
-import org.apache.linkis.monitor.until.HttpsUntils;
+import org.apache.linkis.monitor.entity.ClientSingleton;
 import org.apache.linkis.monitor.until.ThreadUtils;
 import org.apache.linkis.monitor.utils.log.LogUtils;
 
@@ -38,7 +38,7 @@ public class ListenerConfig {
   private void shutdownEntrance(ContextClosedEvent event) {
     try {
       ThreadUtils.executors.shutdown();
-      HttpsUntils.client.close();
+      ClientSingleton.getInstance().close();
     } catch (IOException e) {
       logger.error("ListenerConfig error msg  {}", e.getMessage());
     }
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClientClientImpl.scala
 
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/department/dao/UserDepartmentInfoMapper.java
similarity index 51%
copy from 
linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClientClientImpl.scala
copy to 
linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/department/dao/UserDepartmentInfoMapper.java
index 5554701571..1b37d30cb5 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClientClientImpl.scala
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/department/dao/UserDepartmentInfoMapper.java
@@ -15,25 +15,31 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.monitor.client
+package org.apache.linkis.monitor.department.dao;
 
-import org.apache.linkis.httpclient.dws.DWSHttpClient
-import org.apache.linkis.httpclient.dws.config.DWSClientConfig
-import org.apache.linkis.httpclient.request.Action
-import org.apache.linkis.httpclient.response.Result
-import org.apache.linkis.monitor.request.MonitorResourceAction
+import org.apache.linkis.monitor.department.entity.UserDepartmentInfo;
 
-class MonitorHTTPClientClientImpl(clientConfig: DWSClientConfig) extends 
MonitorHTTPClient {
+import org.apache.ibatis.annotations.Mapper;
+import org.apache.ibatis.annotations.Param;
 
-  private val dwsHttpClient =
-    new DWSHttpClient(clientConfig, "Linkis-MonitorResource-Execution-Thread")
+import org.springframework.transaction.annotation.Transactional;
 
-  override protected[client] def executeJob(ujesJobAction: 
MonitorResourceAction): Result =
-    ujesJobAction match {
+import java.util.List;
 
-      case action: Action => dwsHttpClient.execute(action)
+@Mapper
+public interface UserDepartmentInfoMapper {
 
-    }
+  void insertUser(UserDepartmentInfo user);
 
-  override def close(): Unit = dwsHttpClient.close()
+  @Transactional(rollbackFor = Exception.class)
+  int batchInsertUsers(@Param("userDepartmentInfos") List<UserDepartmentInfo> 
userDepartmentInfos);
+
+  void updateUser(UserDepartmentInfo user);
+
+  UserDepartmentInfo selectUser(@Param("userName") String userName);
+
+  @Transactional(rollbackFor = Exception.class)
+  void deleteUser();
+
+  List<UserDepartmentInfo> selectAllUsers();
 }
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/department/entity/UserDepartmentInfo.java
 
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/department/entity/UserDepartmentInfo.java
new file mode 100644
index 0000000000..c5ea27e7ce
--- /dev/null
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/department/entity/UserDepartmentInfo.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.monitor.department.entity;
+
+import java.util.Date;
+
+public class UserDepartmentInfo {
+
+  private String clusterCode;
+
+  private String userType;
+  private String userName;
+  private String orgId;
+  private String orgName;
+  private String queueName;
+  private String dbName;
+  private String interfaceUser;
+  private String isUnionAnalyse;
+  private Date createTime;
+  private String userItsmNo;
+
+  // 构造函数、getter和setter方法
+  public UserDepartmentInfo(
+      String clusterCode,
+      String userType,
+      String userName,
+      String orgId,
+      String orgName,
+      String queueName,
+      String dbName,
+      String interfaceUser,
+      String isUnionAnalyse,
+      Date createTime,
+      String userItsmNo) {
+    this.clusterCode = clusterCode;
+    this.userType = userType;
+    this.userName = userName;
+    this.orgId = orgId;
+    this.orgName = orgName;
+    this.queueName = queueName;
+    this.dbName = dbName;
+    this.interfaceUser = interfaceUser;
+    this.isUnionAnalyse = isUnionAnalyse;
+    this.createTime = createTime;
+    this.userItsmNo = userItsmNo;
+  }
+
+  public String getClusterCode() {
+    return clusterCode;
+  }
+
+  public void setClusterCode(String clusterCode) {
+    this.clusterCode = clusterCode;
+  }
+
+  public String getUserType() {
+    return userType;
+  }
+
+  public void setUserType(String userType) {
+    this.userType = userType;
+  }
+
+  public String getUserName() {
+    return userName;
+  }
+
+  public void setUserName(String userName) {
+    this.userName = userName;
+  }
+
+  public String getOrgId() {
+    return orgId;
+  }
+
+  public void setOrgId(String orgId) {
+    this.orgId = orgId;
+  }
+
+  public String getOrgName() {
+    return orgName;
+  }
+
+  public void setOrgName(String orgName) {
+    this.orgName = orgName;
+  }
+
+  public String getQueueName() {
+    return queueName;
+  }
+
+  public void setQueueName(String queueName) {
+    this.queueName = queueName;
+  }
+
+  public String getDbName() {
+    return dbName;
+  }
+
+  public void setDbName(String dbName) {
+    this.dbName = dbName;
+  }
+
+  public String getInterfaceUser() {
+    return interfaceUser;
+  }
+
+  public void setInterfaceUser(String interfaceUser) {
+    this.interfaceUser = interfaceUser;
+  }
+
+  public String getIsUnionAnalyse() {
+    return isUnionAnalyse;
+  }
+
+  public void setIsUnionAnalyse(String isUnionAnalyse) {
+    this.isUnionAnalyse = isUnionAnalyse;
+  }
+
+  public Date getCreateTime() {
+    return createTime;
+  }
+
+  public void setCreateTime(Date createTime) {
+    this.createTime = createTime;
+  }
+
+  public String getUserItsmNo() {
+    return userItsmNo;
+  }
+
+  public void setUserItsmNo(String userItsmNo) {
+    this.userItsmNo = userItsmNo;
+  }
+}
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/HttpsUntils.java
 
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/entity/ClientSingleton.java
similarity index 50%
copy from 
linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/HttpsUntils.java
copy to 
linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/entity/ClientSingleton.java
index a504a9d41d..3c660426b3 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/HttpsUntils.java
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/entity/ClientSingleton.java
@@ -15,65 +15,37 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.monitor.until;
+package org.apache.linkis.monitor.entity;
 
 import org.apache.linkis.bml.conf.BmlConfiguration;
 import org.apache.linkis.common.conf.Configuration;
-import org.apache.linkis.common.utils.Utils;
 import 
org.apache.linkis.httpclient.dws.authentication.TokenAuthenticationStrategy;
 import org.apache.linkis.httpclient.dws.config.DWSClientConfig;
 import org.apache.linkis.httpclient.dws.config.DWSClientConfigBuilder;
 import org.apache.linkis.monitor.client.MonitorHTTPClient;
 import org.apache.linkis.monitor.client.MonitorHTTPClientClientImpl;
-import org.apache.linkis.monitor.config.MonitorConfig;
-import org.apache.linkis.monitor.entity.IndexEntity;
-import org.apache.linkis.monitor.request.EmsListAction;
-import org.apache.linkis.monitor.request.EntranceTaskAction;
-import org.apache.linkis.monitor.response.EntranceTaskResult;
-import org.apache.linkis.server.BDPJettyServerHelper;
-import org.apache.linkis.ujes.client.response.EmsListResult;
 
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.ContentType;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
 
-import org.springframework.util.Assert;
-
-import java.io.IOException;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class HttpsUntils {
-  private static final Logger logger = 
LoggerFactory.getLogger(HttpsUntils.class);
+public class ClientSingleton {
+  private static MonitorHTTPClient instance;
+  private static DWSClientConfig dwsClientConfig;
 
-  public static DWSClientConfig dwsClientConfig = createClientConfig(null, 
null);
-  //        IOUtils.closeQuietly(client);
-  public static MonitorHTTPClient client = new 
MonitorHTTPClientClientImpl(dwsClientConfig);
-  public static final String localHost = Utils.getLocalHostname();
+  private ClientSingleton() {}
 
-  public static Map<String, Object> sendHttp(String url, Map<String, Object> 
properties)
-      throws IOException {
-    if (null == dwsClientConfig) {
-      dwsClientConfig = createClientConfig(url, properties);
+  public static synchronized MonitorHTTPClient getInstance() {
+    if (instance == null) {
+      if (dwsClientConfig == null) {
+        dwsClientConfig = createClientConfig(null, null); // NOSONAR
+      }
+      instance = new MonitorHTTPClientClientImpl(dwsClientConfig);
     }
-    if (null == client) {
-      client = new MonitorHTTPClientClientImpl(dwsClientConfig);
-    }
-    EmsListAction build = EmsListAction.newBuilder().setUser("hadoop").build();
-    EmsListResult result = client.list(build);
-    return result.getResultMap();
+    return instance;
   }
 
   public static DWSClientConfig createClientConfig(String url, Map<String, 
Object> properties) {
@@ -89,7 +61,7 @@ public class HttpsUntils {
     }
     int maxConnection =
         (int)
-            parms.getOrDefault(
+            parms.getOrDefault( // NOSONAR
                 BmlConfiguration.CONNECTION_MAX_SIZE_SHORT_NAME(),
                 BmlConfiguration.CONNECTION_MAX_SIZE().getValue());
     int connectTimeout =
@@ -132,42 +104,4 @@ public class HttpsUntils {
 
     return clientConfig;
   }
-
-  public static Map<String, Object> getEntranceTask(String url, String user, 
String Instance)
-      throws IOException {
-    if (null == dwsClientConfig) {
-      dwsClientConfig = createClientConfig(null, null);
-    }
-    if (null == client) {
-      client = new MonitorHTTPClientClientImpl(dwsClientConfig);
-    }
-    EntranceTaskAction build =
-        
EntranceTaskAction.newBuilder().setUser(user).setInstance(Instance).build();
-    EntranceTaskResult result = client.entranList(build);
-    return result.getResultMap();
-  }
-
-  public static void sendIndex(List<IndexEntity> list) throws IOException {
-    Map<String, Object> parm = new HashMap<>();
-    parm.put("userAuthKey", MonitorConfig.ECM_TASK_USER_AUTHKEY.getValue());
-    parm.put("metricDataList", list);
-    String json = BDPJettyServerHelper.gson().toJson(parm);
-
-    RequestConfig requestConfig = RequestConfig.DEFAULT;
-    StringEntity entity =
-        new StringEntity(
-            json, 
ContentType.create(ContentType.APPLICATION_JSON.getMimeType(), "UTF-8"));
-    entity.setContentEncoding("UTF-8");
-
-    HttpPost httpPost = new HttpPost(MonitorConfig.ECM_TASK_IMURL.getValue());
-    httpPost.setConfig(requestConfig);
-    httpPost.setEntity(entity);
-
-    CloseableHttpClient httpClient = HttpClients.createDefault();
-    CloseableHttpResponse execute = httpClient.execute(httpPost);
-    String responseStr = EntityUtils.toString(execute.getEntity(), "UTF-8");
-    Map<String, String> map = 
BDPJettyServerHelper.gson().fromJson(responseStr, Map.class);
-    logger.info("send index response :{}", map);
-    Assert.isTrue(!"0".equals(map.get("resultCode")), map.get("resultMsg"));
-  }
 }
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
 
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
index aa73471c49..688fda5afb 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
@@ -23,7 +23,7 @@ import java.util.Date;
 
 public class QueryUtils {
 
-  private static DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss.SSS");
+  private static DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss.SSS"); // NOSONAR
 
   public static String dateToString(Date date) {
     return dateFormat.format(date);
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/EntranceTaskMonitor.java
 
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/EntranceTaskMonitor.java
index edce194481..02ea5322e9 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/EntranceTaskMonitor.java
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/EntranceTaskMonitor.java
@@ -121,7 +121,8 @@ public class EntranceTaskMonitor {
         });
     Map<String, Object> likisData = null;
     try {
-      likisData = MapUtils.getMap(HttpsUntils.getEntranceTask(null, "hadoop", 
null), "data");
+      likisData =
+          MapUtils.getMap(HttpsUntils.getEntranceTask(null, 
Constants.ADMIN_USER(), null), "data");
       logger.info("TaskMonitor hadoop response  {}:", likisData);
     } catch (IOException e) {
       logger.warn("failed to get EntranceTask data");
@@ -163,7 +164,9 @@ public class EntranceTaskMonitor {
         try {
           // 通过serviceInstance 获取entrance中任务数量信息
           Map<String, Object> entranceData =
-              MapUtils.getMap(HttpsUntils.getEntranceTask(null, "hadoop", 
entranceService), "data");
+              MapUtils.getMap(
+                  HttpsUntils.getEntranceTask(null, Constants.ADMIN_USER(), 
entranceService),
+                  "data");
           int runningTaskNumber = 0;
           int queuedTaskNumber = 0;
           int totalTaskNumber = 0;
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java
 
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java
index be7758f7a0..c1a7a16d06 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java
@@ -23,12 +23,13 @@ import org.apache.linkis.monitor.core.pac.DataFetcher;
 import org.apache.linkis.monitor.core.scanner.AnomalyScanner;
 import org.apache.linkis.monitor.core.scanner.DefaultScanner;
 import org.apache.linkis.monitor.factory.MapperFactory;
+import 
org.apache.linkis.monitor.jobhistory.analyze.JobHistoryAnalyzeAlertSender;
+import org.apache.linkis.monitor.jobhistory.analyze.JobHistoryAnalyzeRule;
 import org.apache.linkis.monitor.jobhistory.errorcode.JobHistoryErrCodeRule;
 import 
org.apache.linkis.monitor.jobhistory.errorcode.JobHistoryErrorCodeAlertSender;
 import org.apache.linkis.monitor.jobhistory.index.JobIndexRule;
 import org.apache.linkis.monitor.jobhistory.index.JobIndexSender;
-import org.apache.linkis.monitor.jobhistory.jobtime.JobTimeExceedAlertSender;
-import org.apache.linkis.monitor.jobhistory.jobtime.JobTimeExceedRule;
+import org.apache.linkis.monitor.jobhistory.jobtime.*;
 import org.apache.linkis.monitor.jobhistory.labels.JobHistoryLabelsAlertSender;
 import org.apache.linkis.monitor.jobhistory.labels.JobHistoryLabelsRule;
 import org.apache.linkis.monitor.jobhistory.runtime.CommonJobRunTimeRule;
@@ -75,7 +76,7 @@ public class JobHistoryMonitor {
   @Scheduled(cron = "${linkis.monitor.jobHistory.finished.cron}")
   public void jobHistoryFinishedScan() {
     logger.info("Start scan jobHistoryFinishedScan");
-    long intervalMs = 20 * 60 * 1000;
+    long intervalMs = 20 * 60 * 1000L;
     long maxIntervalMs = Constants.ERRORCODE_MAX_INTERVALS_SECONDS() * 1000;
     long endTime = System.currentTimeMillis();
     long startTime = endTime - intervalMs;
@@ -98,7 +99,7 @@ public class JobHistoryMonitor {
       logger.info("Get JobHistoryId from cache ID:" + id);
     }
     List<DataFetcher> fetchers =
-        JobMonitorUtils.generateFetchersfortime(startTime, endTime, id, 
"updated_time");
+        JobMonitorUtils.generateFetchersfortime(startTime, endTime, id, 
"finished_job");
     if (fetchers.isEmpty()) {
       logger.warn("generated 0 dataFetchers, plz check input");
       return;
@@ -169,6 +170,14 @@ public class JobHistoryMonitor {
     } catch (Exception e) {
       logger.warn("CommonJobRunTimeRule Scan Error msg: " + e.getMessage());
     }
+    // 新增失败任务分析扫描
+    try {
+      JobHistoryAnalyzeRule jobHistoryAnalyzeRule =
+          new JobHistoryAnalyzeRule(new JobHistoryAnalyzeAlertSender());
+      scanner.addScanRule(jobHistoryAnalyzeRule);
+    } catch (Exception e) {
+      logger.warn("JobHistoryAnalyzeRule Scan Error msg: " + e.getMessage());
+    }
     // 执行任务扫描
     JobMonitorUtils.run(scanner, fetchers, true);
 
@@ -176,7 +185,7 @@ public class JobHistoryMonitor {
     JobIndexRule jobIndexRule = new JobIndexRule(new JobIndexSender());
     scannerIndex.addScanRule(jobIndexRule);
     List<DataFetcher> createFetcher =
-        JobMonitorUtils.generateFetchersfortime(startTime, endTime, id, 
"department");
+        JobMonitorUtils.generateFetchersfortime(startTime, endTime, id, "");
     JobMonitorUtils.run(scannerIndex, createFetcher, true);
   }
 
@@ -193,7 +202,7 @@ public class JobHistoryMonitor {
     AnomalyScanner scanner = new DefaultScanner();
     boolean shouldStart = false;
     List<DataFetcher> fetchers =
-        JobMonitorUtils.generateFetchers(startTime, endTime, maxIntervalMs, 
id, "created_time");
+        JobMonitorUtils.generateFetchers(startTime, endTime, maxIntervalMs, 
id, "unfinished_job");
     if (fetchers.isEmpty()) {
       logger.warn("generated 0 dataFetchers, plz check input");
       return;
@@ -215,4 +224,50 @@ public class JobHistoryMonitor {
     }
     JobMonitorUtils.run(scanner, fetchers, shouldStart);
   }
+
+  /** * 每10分钟扫描一次,扫描两个小时之内的任务,告警要求:管理台配置告警相关参数 */
+  @Scheduled(cron = "${linkis.monitor.jdbc.timeout.alert.cron:0 0/10 0 * * ?}")
+  public void jdbcUnfinishedAlertScan() {
+    long id =
+        
Optional.ofNullable(CacheUtils.cacheBuilder.getIfPresent("jdbcUnfinishedAlertScan"))
+            .orElse(MonitorConfig.JOB_HISTORY_TIME_EXCEED.getValue());
+    long intervalMs = 7200 * 1000L;
+    long maxIntervalMs = Constants.ERRORCODE_MAX_INTERVALS_SECONDS() * 1000;
+    long endTime = System.currentTimeMillis();
+    long startTime = endTime - intervalMs;
+    AnomalyScanner scanner = new DefaultScanner();
+    List<DataFetcher> fetchers =
+        JobMonitorUtils.generateFetchers(startTime, endTime, maxIntervalMs, 
id, "");
+    if (fetchers.isEmpty()) {
+      logger.warn("jdbcUnfinishedScan generated 0 dataFetchers, plz check 
input");
+      return;
+    }
+    StarrocksTimeExceedRule starrocksTimeExceedRule =
+        new StarrocksTimeExceedRule(new StarrocksTimeExceedAlertSender());
+    scanner.addScanRule(starrocksTimeExceedRule);
+    JobMonitorUtils.run(scanner, fetchers, true);
+  }
+
+  /** * 每10分钟扫描一次,扫描两个小时之内的任务,满足要求触发kill kill要求:数据源配置kill参数 */
+  @Scheduled(cron = "${linkis.monitor.jdbc.timeout.kill.cron:0 0/10 0 * * ?}")
+  public void jdbcUnfinishedKillScan() {
+    long id =
+        
Optional.ofNullable(CacheUtils.cacheBuilder.getIfPresent("jdbcUnfinishedKillScan"))
+            .orElse(MonitorConfig.JOB_HISTORY_TIME_EXCEED.getValue());
+    long intervalMs = 7200 * 1000L;
+    long maxIntervalMs = Constants.ERRORCODE_MAX_INTERVALS_SECONDS() * 1000;
+    long endTime = System.currentTimeMillis();
+    long startTime = endTime - intervalMs;
+    AnomalyScanner scanner = new DefaultScanner();
+    List<DataFetcher> fetchers =
+        JobMonitorUtils.generateFetchers(startTime, endTime, maxIntervalMs, 
id, "");
+    if (fetchers.isEmpty()) {
+      logger.warn("jdbcUnfinishedScan generated 0 dataFetchers, plz check 
input");
+      return;
+    }
+    StarrocksTimeKillRule starrocksTimeKillRule =
+        new StarrocksTimeKillRule(new StarrocksTimeKillAlertSender());
+    scanner.addScanRule(starrocksTimeKillRule);
+    JobMonitorUtils.run(scanner, fetchers, true);
+  }
 }
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/ResourceMonitor.java
 
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/ResourceMonitor.java
index e8658050e0..a9a1a30887 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/ResourceMonitor.java
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/ResourceMonitor.java
@@ -61,7 +61,7 @@ public class ResourceMonitor {
     // 获取emNode资源信息
     List<Map<String, Object>> emNodeVoList = new ArrayList<>();
     try {
-      Map<String, Object> resultmap = HttpsUntils.sendHttp(null, null);
+      Map<String, Object> resultmap = HttpsUntils.getEmsResourceList();
       // got interface data
       Map<String, List<Map<String, Object>>> data = MapUtils.getMap(resultmap, 
"data");
       emNodeVoList = data.getOrDefault("EMs", new ArrayList<>());
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/UserDepartmentInfoSync.java
 
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/UserDepartmentInfoSync.java
new file mode 100644
index 0000000000..338d220489
--- /dev/null
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/UserDepartmentInfoSync.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.monitor.scheduled;
+
+import org.apache.linkis.monitor.constants.Constants;
+import org.apache.linkis.monitor.department.dao.UserDepartmentInfoMapper;
+import org.apache.linkis.monitor.department.entity.UserDepartmentInfo;
+import org.apache.linkis.monitor.factory.MapperFactory;
+import org.apache.linkis.monitor.utils.alert.AlertDesc;
+import org.apache.linkis.monitor.utils.alert.ims.MonitorAlertUtils;
+import org.apache.linkis.monitor.utils.alert.ims.PooledImsAlertUtils;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.springframework.context.annotation.PropertySource;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import com.github.pagehelper.PageHelper;
+import com.github.pagehelper.PageInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Component
+@PropertySource(value = "classpath:linkis-et-monitor.properties", encoding = 
"UTF-8")
+public class UserDepartmentInfoSync {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(ResourceMonitor.class);
+  private static final int pagesize = 5000;
+
+  private static final UserDepartmentInfoMapper userDepartmentInfoMapper =
+      MapperFactory.getUserDepartmentInfoMapper();
+
+  @Scheduled(cron = "${linkis.monitor.org.user.sync.cron:0 0 11 1/7 * ?}")
+  public static void DepartmentInfoSync() {
+    // 获取linkis_org_user_sync信息
+    // 收集异常用户
+    List<UserDepartmentInfo> alterList = new ArrayList<>();
+    int pageNum = 1; // 初始pageNum
+    while (true) {
+      List<UserDepartmentInfo> departSyncList = null;
+      PageHelper.startPage(pageNum, pagesize);
+      try {
+        departSyncList = userDepartmentInfoMapper.selectAllUsers();
+      } finally {
+        PageHelper.clearPage();
+      }
+      PageInfo<UserDepartmentInfo> pageInfo = new PageInfo<>(departSyncList);
+      // 处理 departSyncList 中的数据
+      processDepartSyncList(pageInfo.getList(), alterList);
+      if (!pageInfo.isHasNextPage()) {
+        break; // 没有更多记录,退出循环
+      }
+      pageNum++;
+    }
+    // 统计异常名称,然后发送告警
+    String usernames =
+        alterList.stream()
+            .filter(s -> StringUtils.isNotBlank(s.getUserName()))
+            .map(UserDepartmentInfo::getUserName)
+            .limit(5)
+            .collect(Collectors.joining(","));
+    if (StringUtils.isNotBlank(usernames)) {
+      HashMap<String, String> replaceParm = new HashMap<>();
+      replaceParm.put("$user", usernames);
+      replaceParm.put("$count", String.valueOf(alterList.size()));
+      Map<String, AlertDesc> ecmResourceAlerts =
+          MonitorAlertUtils.getAlerts(Constants.DEPARTMENT_USER_IM(), 
replaceParm);
+      PooledImsAlertUtils.addAlert(ecmResourceAlerts.get("12019"));
+    }
+  }
+
+  private static void processDepartSyncList(
+      List<UserDepartmentInfo> departSyncList, List<UserDepartmentInfo> 
alterList) {
+    if (CollectionUtils.isEmpty(departSyncList)) {
+      logger.info("No user department info to sync");
+      // 并且发送告警通知
+      return;
+    } else {
+      logger.info("Start to sync user department info");
+      // 收集异常用户
+      List<UserDepartmentInfo> errorUserList =
+          departSyncList.stream()
+              .filter(
+                  userDepartmentInfo ->
+                      StringUtils.isNotBlank(userDepartmentInfo.getUserName())
+                          && 
(StringUtils.isBlank(userDepartmentInfo.getOrgId())
+                              || 
StringUtils.isBlank(userDepartmentInfo.getOrgName())))
+              .collect(Collectors.toList());
+      // 收集需要同步用户
+      List<UserDepartmentInfo> syncList =
+          departSyncList.stream()
+              .filter(
+                  userDepartmentInfo ->
+                      StringUtils.isNotBlank(userDepartmentInfo.getUserName())
+                          && 
StringUtils.isNotBlank(userDepartmentInfo.getOrgId())
+                          && 
StringUtils.isNotBlank(userDepartmentInfo.getOrgName()))
+              .collect(Collectors.toList());
+      if (!CollectionUtils.isEmpty(errorUserList)) {
+        alterList.addAll(errorUserList);
+      }
+      if (!CollectionUtils.isEmpty(syncList)) {
+        // 同步用户
+        List<UserDepartmentInfo> insertList = new ArrayList<>();
+        syncList.forEach(
+            departSyncInfo -> {
+              UserDepartmentInfo userDepartmentInfo =
+                  
userDepartmentInfoMapper.selectUser(departSyncInfo.getUserName());
+              if (null == userDepartmentInfo) {
+                insertList.add(departSyncInfo);
+              } else {
+                if 
((!departSyncInfo.getOrgId().equals(userDepartmentInfo.getOrgId()))
+                    || 
(!departSyncInfo.getOrgName().equals(userDepartmentInfo.getOrgName()))) {
+                  userDepartmentInfoMapper.updateUser(departSyncInfo);
+                }
+              }
+            });
+        if (!CollectionUtils.isEmpty(insertList)) {
+          userDepartmentInfoMapper.batchInsertUsers(insertList);
+        }
+      }
+    }
+  }
+}
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/UserModeMonitor.java
 
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/UserModeMonitor.java
index ad6f861479..e55e01352d 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/UserModeMonitor.java
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/UserModeMonitor.java
@@ -24,7 +24,7 @@ import 
org.apache.linkis.httpclient.dws.config.DWSClientConfig;
 import org.apache.linkis.manager.label.constant.LabelKeyConstant;
 import org.apache.linkis.monitor.config.MonitorConfig;
 import org.apache.linkis.monitor.constants.Constants;
-import org.apache.linkis.monitor.until.HttpsUntils;
+import org.apache.linkis.monitor.entity.ClientSingleton;
 import org.apache.linkis.monitor.utils.alert.AlertDesc;
 import org.apache.linkis.monitor.utils.alert.ims.MonitorAlertUtils;
 import org.apache.linkis.monitor.utils.alert.ims.PooledImsAlertUtils;
@@ -57,7 +57,8 @@ public class UserModeMonitor {
 
   private static final Logger logger = 
LoggerFactory.getLogger(UserModeMonitor.class);
 
-  private static final DWSClientConfig clientConfig = 
HttpsUntils.dwsClientConfig;
+  private static final DWSClientConfig clientConfig =
+      ClientSingleton.createClientConfig(null, null);
 
   private static final UJESClient client = new UJESClientImpl(clientConfig);
 
@@ -138,7 +139,7 @@ public class UserModeMonitor {
   public void dbJob() {
     Map<String, Object> properties = new HashMap<>();
     properties.put("readTimeout", 
MonitorConfig.USER_MODE_INTERFACE_TIMEOUT.getValue());
-    DWSClientConfig clientConfig = HttpsUntils.createClientConfig(null, 
properties);
+    DWSClientConfig clientConfig = ClientSingleton.createClientConfig(null, 
properties);
     UJESClientImpl ujesClient = new UJESClientImpl(clientConfig);
     GetTableStatisticInfoAction builder =
         GetTableStatisticInfoAction.builder()
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/HttpsUntils.java
 
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/HttpsUntils.java
index a504a9d41d..0476765594 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/HttpsUntils.java
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/HttpsUntils.java
@@ -17,24 +17,26 @@
 
 package org.apache.linkis.monitor.until;
 
-import org.apache.linkis.bml.conf.BmlConfiguration;
-import org.apache.linkis.common.conf.Configuration;
 import org.apache.linkis.common.utils.Utils;
-import 
org.apache.linkis.httpclient.dws.authentication.TokenAuthenticationStrategy;
-import org.apache.linkis.httpclient.dws.config.DWSClientConfig;
-import org.apache.linkis.httpclient.dws.config.DWSClientConfigBuilder;
+import 
org.apache.linkis.datasource.client.response.GetInfoPublishedByDataSourceNameResult;
+import org.apache.linkis.governance.common.conf.GovernanceCommonConf;
 import org.apache.linkis.monitor.client.MonitorHTTPClient;
-import org.apache.linkis.monitor.client.MonitorHTTPClientClientImpl;
 import org.apache.linkis.monitor.config.MonitorConfig;
+import org.apache.linkis.monitor.constants.Constants;
+import org.apache.linkis.monitor.entity.ClientSingleton;
 import org.apache.linkis.monitor.entity.IndexEntity;
-import org.apache.linkis.monitor.request.EmsListAction;
-import org.apache.linkis.monitor.request.EntranceTaskAction;
+import org.apache.linkis.monitor.jobhistory.entity.JobHistory;
+import org.apache.linkis.monitor.request.*;
+import org.apache.linkis.monitor.response.AnalyzeJobResultAction;
 import org.apache.linkis.monitor.response.EntranceTaskResult;
+import org.apache.linkis.monitor.response.KeyvalueResult;
+import org.apache.linkis.monitor.response.KillJobResultAction;
+import org.apache.linkis.protocol.utils.ZuulEntranceUtils;
 import org.apache.linkis.server.BDPJettyServerHelper;
 import org.apache.linkis.ujes.client.response.EmsListResult;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.http.client.config.RequestConfig;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpPost;
@@ -47,10 +49,8 @@ import org.apache.http.util.EntityUtils;
 import org.springframework.util.Assert;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,89 +58,18 @@ import org.slf4j.LoggerFactory;
 public class HttpsUntils {
   private static final Logger logger = 
LoggerFactory.getLogger(HttpsUntils.class);
 
-  public static DWSClientConfig dwsClientConfig = createClientConfig(null, 
null);
-  //        IOUtils.closeQuietly(client);
-  public static MonitorHTTPClient client = new 
MonitorHTTPClientClientImpl(dwsClientConfig);
   public static final String localHost = Utils.getLocalHostname();
 
-  public static Map<String, Object> sendHttp(String url, Map<String, Object> 
properties)
-      throws IOException {
-    if (null == dwsClientConfig) {
-      dwsClientConfig = createClientConfig(url, properties);
-    }
-    if (null == client) {
-      client = new MonitorHTTPClientClientImpl(dwsClientConfig);
-    }
-    EmsListAction build = EmsListAction.newBuilder().setUser("hadoop").build();
+  public static Map<String, Object> getEmsResourceList() throws IOException {
+    MonitorHTTPClient client = ClientSingleton.getInstance();
+    EmsListAction build = 
EmsListAction.newBuilder().setUser(Constants.ADMIN_USER()).build();
     EmsListResult result = client.list(build);
     return result.getResultMap();
   }
 
-  public static DWSClientConfig createClientConfig(String url, Map<String, 
Object> properties) {
-    String realUrl = "";
-    if (StringUtils.isBlank(url)) {
-      realUrl = Configuration.getGateWayURL();
-    } else {
-      realUrl = url;
-    }
-    Map<String, Object> parms = new HashMap<>();
-    if (MapUtils.isNotEmpty(properties)) {
-      parms = properties;
-    }
-    int maxConnection =
-        (int)
-            parms.getOrDefault(
-                BmlConfiguration.CONNECTION_MAX_SIZE_SHORT_NAME(),
-                BmlConfiguration.CONNECTION_MAX_SIZE().getValue());
-    int connectTimeout =
-        (int)
-            parms.getOrDefault(
-                BmlConfiguration.CONNECTION_TIMEOUT_SHORT_NAME(),
-                BmlConfiguration.CONNECTION_TIMEOUT().getValue());
-    int readTimeout =
-        (int)
-            parms.getOrDefault(
-                BmlConfiguration.CONNECTION_READ_TIMEOUT_SHORT_NAME(),
-                BmlConfiguration.CONNECTION_READ_TIMEOUT().getValue());
-    String tokenKey =
-        (String)
-            parms.getOrDefault(
-                BmlConfiguration.AUTH_TOKEN_KEY_SHORT_NAME(),
-                BmlConfiguration.AUTH_TOKEN_KEY().getValue());
-    String tokenValue =
-        (String)
-            parms.getOrDefault(
-                BmlConfiguration.AUTH_TOKEN_VALUE_SHORT_NAME(),
-                BmlConfiguration.AUTH_TOKEN_VALUE().getValue());
-
-    DWSClientConfig clientConfig =
-        ((DWSClientConfigBuilder)
-                (DWSClientConfigBuilder.newBuilder()
-                    .addServerUrl(realUrl)
-                    .connectionTimeout(connectTimeout)
-                    .discoveryEnabled(false)
-                    .discoveryFrequency(1, TimeUnit.MINUTES)
-                    .loadbalancerEnabled(false)
-                    .maxConnectionSize(maxConnection)
-                    .retryEnabled(false)
-                    .readTimeout(readTimeout)
-                    .setAuthenticationStrategy(new 
TokenAuthenticationStrategy())
-                    .setAuthTokenKey(tokenKey)
-                    .setAuthTokenValue(tokenValue)))
-            .setDWSVersion("v1")
-            .build();
-
-    return clientConfig;
-  }
-
   public static Map<String, Object> getEntranceTask(String url, String user, 
String Instance)
       throws IOException {
-    if (null == dwsClientConfig) {
-      dwsClientConfig = createClientConfig(null, null);
-    }
-    if (null == client) {
-      client = new MonitorHTTPClientClientImpl(dwsClientConfig);
-    }
+    MonitorHTTPClient client = ClientSingleton.getInstance();
     EntranceTaskAction build =
         
EntranceTaskAction.newBuilder().setUser(user).setInstance(Instance).build();
     EntranceTaskResult result = client.entranList(build);
@@ -156,8 +85,10 @@ public class HttpsUntils {
     RequestConfig requestConfig = RequestConfig.DEFAULT;
     StringEntity entity =
         new StringEntity(
-            json, 
ContentType.create(ContentType.APPLICATION_JSON.getMimeType(), "UTF-8"));
-    entity.setContentEncoding("UTF-8");
+            json,
+            ContentType.create(
+                ContentType.APPLICATION_JSON.getMimeType(), 
StandardCharsets.UTF_8.toString()));
+    entity.setContentEncoding(StandardCharsets.UTF_8.toString());
 
     HttpPost httpPost = new HttpPost(MonitorConfig.ECM_TASK_IMURL.getValue());
     httpPost.setConfig(requestConfig);
@@ -165,9 +96,77 @@ public class HttpsUntils {
 
     CloseableHttpClient httpClient = HttpClients.createDefault();
     CloseableHttpResponse execute = httpClient.execute(httpPost);
-    String responseStr = EntityUtils.toString(execute.getEntity(), "UTF-8");
+    String responseStr =
+        EntityUtils.toString(execute.getEntity(), 
StandardCharsets.UTF_8.toString());
     Map<String, String> map = 
BDPJettyServerHelper.gson().fromJson(responseStr, Map.class);
     logger.info("send index response :{}", map);
     Assert.isTrue(!"0".equals(map.get("resultCode")), map.get("resultMsg"));
   }
+
+  public static String getJDBCConf(String user, String conf) {
+    MonitorHTTPClient client = ClientSingleton.getInstance();
+    KeyvalueAction build =
+        KeyvalueAction.newBuilder()
+            .setVersion("4")
+            .setEngineType(Constants.JDBC_ENGINE())
+            .setCreator("IDE")
+            .setConfigKey(conf)
+            .setUser(user)
+            .build();
+    KeyvalueResult result = client.getConfKeyValue(build);
+    Map data = MapUtils.getMap(result.getResultMap(), "data", new HashMap<>());
+    ArrayList arrayList = (ArrayList) data.get("configValues");
+    if (CollectionUtils.isNotEmpty(arrayList)) {
+      String json = BDPJettyServerHelper.gson().toJson(arrayList.get(0));
+      Map map = BDPJettyServerHelper.gson().fromJson(json, Map.class);
+      return MapUtils.getString(map, "configValue", "");
+    } else {
+      return "";
+    }
+  }
+
+  public static Map getDatasourceConf(String user, String datasourceName) {
+    MonitorHTTPClient client = ClientSingleton.getInstance();
+    DataSourceParamsAction dataSourceParamsAction =
+        DataSourceParamsAction.builder()
+            .setSystem(Constants.ALERT_SUB_SYSTEM_ID())
+            .setDataSourceName(datasourceName)
+            .setUser(user)
+            .build();
+    GetInfoPublishedByDataSourceNameResult result =
+        client.getInfoByDataSourceInfo(dataSourceParamsAction);
+    Map data = MapUtils.getMap(result.getResultMap(), "data", new HashMap<>());
+    Map datasourceInfoMap = MapUtils.getMap(data, "info", new HashMap<>());
+    return datasourceInfoMap;
+  }
+
+  public static void killJob(JobHistory jobHistory) {
+    MonitorHTTPClient client = ClientSingleton.getInstance();
+    String[] split = 
jobHistory.getInstances().split(Constants.SPLIT_DELIMITER());
+    String execID =
+        ZuulEntranceUtils.generateExecID(
+            jobHistory.getJobReqId(),
+            GovernanceCommonConf.ENTRANCE_SERVICE_NAME().getValue(),
+            split);
+    KillJobAction killJobAction =
+        KillJobAction.builder()
+            .setIdList(Collections.singletonList(execID))
+            .setTaskIDList(Collections.singletonList(jobHistory.getId()))
+            .setExecID(execID)
+            .setUser(jobHistory.getSubmitUser())
+            .build();
+    KillJobResultAction killJobResultAction = client.killJob(killJobAction);
+    Map data = MapUtils.getMap(killJobResultAction.getResultMap(), "data", new 
HashMap<>());
+  }
+
+  public static void analyzeJob(JobHistory jobHistory) {
+    MonitorHTTPClient client = ClientSingleton.getInstance();
+
+    AnalyzeJobAction analyzeJobAction =
+        AnalyzeJobAction.newBuilder()
+            .setTaskID(String.valueOf(jobHistory.getId()))
+            .setUser(Constants.ADMIN_USER())
+            .build();
+    AnalyzeJobResultAction analyzeJobResultAction = 
client.analyzeJob(analyzeJobAction);
+  }
 }
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/JobMonitorUtils.java
 
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/JobMonitorUtils.java
index 3366014c89..66dd4a3b68 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/JobMonitorUtils.java
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/JobMonitorUtils.java
@@ -44,14 +44,14 @@ public class JobMonitorUtils {
   }
 
   public static List<DataFetcher> generateFetchers(
-      long startTime, long endTime, long maxIntervalMs, long id, String 
timeType) {
+      long startTime, long endTime, long maxIntervalMs, long id, String 
jobStatus) {
     List<DataFetcher> ret = new ArrayList<>();
     long pe = endTime;
     long ps;
     while (pe > startTime) {
       ps = Math.max(pe - maxIntervalMs, startTime);
       String[] fetcherArgs =
-          new String[] {String.valueOf(ps), String.valueOf(pe), 
String.valueOf(id), timeType};
+          new String[] {String.valueOf(ps), String.valueOf(pe), 
String.valueOf(id), jobStatus};
       ret.add(new JobHistoryDataFetcher(fetcherArgs, 
MapperFactory.getJobHistoryMapper()));
       logger.info(
           "Generated dataFetcher for startTime: " + new Date(ps) + ". EndTime: 
" + new Date(pe));
@@ -61,11 +61,11 @@ public class JobMonitorUtils {
   }
 
   public static List<DataFetcher> generateFetchersfortime(
-      long startTime, long endTime, long id, String timeType) {
+      long startTime, long endTime, long id, String jobStatus) {
     List<DataFetcher> fetchers = new ArrayList<>();
     String[] fetcherArgs =
         new String[] {
-          String.valueOf(startTime), String.valueOf(endTime), 
String.valueOf(id), timeType
+          String.valueOf(startTime), String.valueOf(endTime), 
String.valueOf(id), jobStatus
         };
     fetchers.add(new JobHistoryDataFetcher(fetcherArgs, 
MapperFactory.getJobHistoryMapper()));
     logger.info(
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/ThreadUtils.java
 
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/ThreadUtils.java
index 15a2626379..5e4133aa90 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/ThreadUtils.java
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/ThreadUtils.java
@@ -20,6 +20,7 @@ package org.apache.linkis.monitor.until;
 import org.apache.linkis.common.utils.Utils;
 import org.apache.linkis.monitor.config.MonitorConfig;
 import org.apache.linkis.monitor.constants.Constants;
+import org.apache.linkis.monitor.jobhistory.entity.JobHistory;
 import org.apache.linkis.monitor.utils.alert.AlertDesc;
 import org.apache.linkis.monitor.utils.alert.ims.MonitorAlertUtils;
 import org.apache.linkis.monitor.utils.alert.ims.PooledImsAlertUtils;
@@ -42,6 +43,9 @@ public class ThreadUtils extends ApplicationContextEvent {
   public static ExecutionContextExecutorService executors =
       Utils.newCachedExecutionContext(5, "alert-pool-thread-", false);
 
+  public static ExecutionContextExecutorService executors_analyze =
+      Utils.newCachedExecutionContext(50, "analyze-pool-thread-", false);
+
   public ThreadUtils(ApplicationContext source) {
     super(source);
   }
@@ -64,4 +68,9 @@ public class ThreadUtils extends ApplicationContextEvent {
     }
     return msg;
   }
+
+  public static void analyzeRun(JobHistory jobHistory) {
+    FutureTask future = new FutureTask(() -> 
HttpsUntils.analyzeJob(jobHistory), -1);
+    executors_analyze.submit(future);
+  }
 }
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/resources/mapper/common/JobHistoryMapper.xml
 
b/linkis-extensions/linkis-et-monitor/src/main/resources/mapper/common/JobHistoryMapper.xml
index 7be8f93154..dcab938da9 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/resources/mapper/common/JobHistoryMapper.xml
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/resources/mapper/common/JobHistoryMapper.xml
@@ -47,7 +47,7 @@
 
     <sql id="jobhistory_query">
         
job.`id`,job.`job_req_id`,job.`submit_user`,job.`execute_user`,job.`labels`,job.`params`,job.`status`,job.`error_code`,job.`created_time`,
-        
job.`updated_time`,job.`instances`,job.`observe_info`,org.`org_id`,org.`org_name`
+        
job.`updated_time`,job.`instances`,job.`engine_type`,job.`observe_info`,org.`org_id`,org.`org_name`
     </sql>
 
     <select id="selectJobHistory" useCache="false" resultMap="jobHistoryMap"
@@ -127,7 +127,7 @@
         <include refid="jobhistory_query"/>
         FROM linkis_ps_job_history_group_history job JOIN linkis_org_user org 
ON job.submit_user = org.user_name
         <where>
-            <if test="id != null">job.id > #{id}</if>
+            <if test="id != null">job.id >= #{id}</if>
             <if test="umUser != null">and job.submit_user = #{umUser}</if>
             <if test="engineType != null">and job.engine_type = 
#{engineType}</if>
             <if test="startDate != null">and job.created_time >= #{startDate} 
AND job.created_time <![CDATA[<=]]>#{endDate}
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/resources/mapper/common/UserDepartmentInfoMapper.xml
 
b/linkis-extensions/linkis-et-monitor/src/main/resources/mapper/common/UserDepartmentInfoMapper.xml
new file mode 100644
index 0000000000..295cee560f
--- /dev/null
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/resources/mapper/common/UserDepartmentInfoMapper.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" 
"http://mybatis.org/dtd/mybatis-3-mapper.dtd";>
+
+<mapper 
namespace="org.apache.linkis.monitor.department.dao.UserDepartmentInfoMapper">
+
+    <!-- 新增方法 -->
+    <insert id="insertUser">
+        INSERT INTO linkis_org_user(cluster_code, user_type, user_name, 
org_id, org_name, queue_name, db_name, interface_user, is_union_analyse, 
create_time, user_itsm_no)
+        VALUES (#{clusterCode}, #{userType}, #{userName}, #{orgId}, 
#{orgName}, #{queueName}, #{dbName}, #{interfaceUser}, #{isUnionAnalyse}, 
#{createTime}, #{userItsmNo})
+    </insert>
+
+    <!-- 批量新增方法 -->
+    <insert id="batchInsertUsers" parameterType="java.util.List">
+        INSERT INTO linkis_org_user (cluster_code, user_type, user_name, 
org_id, org_name, queue_name, db_name, interface_user, is_union_analyse, 
create_time, user_itsm_no)
+        VALUES
+        <foreach item="user" index="index" collection="userDepartmentInfos" 
separator=",">
+            (#{user.clusterCode}, #{user.userType}, #{user.userName}, 
#{user.orgId}, #{user.orgName}, #{user.queueName}, #{user.dbName}, 
#{user.interfaceUser}, #{user.isUnionAnalyse}, #{user.createTime}, 
#{user.userItsmNo})
+        </foreach>
+    </insert>
+
+    <!-- 修改方法 -->
+    <update id="updateUser">
+        UPDATE linkis_org_user
+        <set>
+            <if test="clusterCode != null">cluster_code = #{clusterCode},</if>
+            <if test="userType != null">user_type = #{userType},</if>
+            <if test="orgId != null">org_id = #{orgId},</if>
+            <if test="orgName != null">org_name = #{orgName},</if>
+            <if test="queueName != null">queue_name = #{queueName},</if>
+            <if test="dbName != null">db_name = #{dbName},</if>
+            <if test="interfaceUser != null">interface_user = 
#{interfaceUser},</if>
+            <if test="isUnionAnalyse != null">is_union_analyse = 
#{isUnionAnalyse},</if>
+            <if test="createTime != null">create_time = #{createTime},</if>
+            <if test="userItsmNo != null">user_itsm_no = #{userItsmNo},</if>
+        </set>
+        WHERE user_name = #{userName}
+    </update>
+
+    <!-- 查询方法 -->
+    <select id="selectUser" 
resultType="org.apache.linkis.monitor.department.entity.UserDepartmentInfo">
+        SELECT * FROM linkis_org_user WHERE user_name = #{userName}
+    </select>
+
+    <!-- 删除方法 -->
+    <delete id="deleteUser">
+        DELETE FROM linkis_org_user
+    </delete>
+
+    <!-- 查询所有数据 -->
+    <select id="selectAllUsers" 
resultType="org.apache.linkis.monitor.department.entity.UserDepartmentInfo">
+        SELECT * FROM linkis_org_user_sync
+    </select>
+
+
+
+
+</mapper>
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/LinkisJobHistoryScanSpringConfiguration.scala
 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/LinkisJobHistoryScanSpringConfiguration.scala
index e3652306c1..c6b2b04d59 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/LinkisJobHistoryScanSpringConfiguration.scala
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/LinkisJobHistoryScanSpringConfiguration.scala
@@ -17,6 +17,7 @@
 
 package org.apache.linkis.monitor
 
+import org.apache.linkis.monitor.department.dao.UserDepartmentInfoMapper
 import org.apache.linkis.monitor.factory.MapperFactory
 import org.apache.linkis.monitor.instance.dao.InstanceInfoDao
 import org.apache.linkis.monitor.jobhistory.dao.JobHistoryMapper
@@ -39,10 +40,14 @@ class LinkisJobHistoryScanSpringConfiguration {
   @Autowired
   private var instanceInfoMapper: InstanceInfoDao = _
 
+  @Autowired
+  private var userDepartmentInfoMapper: UserDepartmentInfoMapper = _
+
   @PostConstruct
   def init(): Unit = {
     MapperFactory.setJobHistoryMapper(jobHistoryMapper)
     MapperFactory.setInstanceInfoMapper(instanceInfoMapper)
+    MapperFactory.setUserDepartmentInfoMapper(userDepartmentInfoMapper)
   }
 
 }
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClient.scala
 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClient.scala
index 4caccd73a3..ab22404d59 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClient.scala
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClient.scala
@@ -17,12 +17,30 @@
 
 package org.apache.linkis.monitor.client
 
+import org.apache.linkis.datasource.client.response.{
+  GetConnectParamsByDataSourceNameResult,
+  GetInfoByDataSourceNameResult,
+  GetInfoPublishedByDataSourceNameResult
+}
 import org.apache.linkis.httpclient.authentication.AuthenticationStrategy
 import 
org.apache.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy
 import org.apache.linkis.httpclient.dws.config.{DWSClientConfig, 
DWSClientConfigBuilder}
 import org.apache.linkis.httpclient.response.Result
-import org.apache.linkis.monitor.request.{EmsListAction, EntranceTaskAction, 
MonitorResourceAction}
-import org.apache.linkis.monitor.response.EntranceTaskResult
+import org.apache.linkis.monitor.request.{
+  AnalyzeJobAction,
+  DataSourceParamsAction,
+  EmsListAction,
+  EntranceTaskAction,
+  KeyvalueAction,
+  KillJobAction,
+  MonitorAction
+}
+import org.apache.linkis.monitor.response.{
+  AnalyzeJobResultAction,
+  EntranceTaskResult,
+  KeyvalueResult,
+  KillJobResultAction
+}
 import org.apache.linkis.ujes.client.response.EmsListResult
 
 import java.io.Closeable
@@ -30,7 +48,7 @@ import java.util.concurrent.TimeUnit
 
 abstract class MonitorHTTPClient extends Closeable {
 
-  protected[client] def executeJob(ujesJobAction: MonitorResourceAction): 
Result
+  protected[client] def executeJob(ujesJobAction: MonitorAction): Result
 
   def list(emsListAction: EmsListAction): EmsListResult = {
     executeJob(emsListAction).asInstanceOf[EmsListResult]
@@ -40,6 +58,24 @@ abstract class MonitorHTTPClient extends Closeable {
     executeJob(entranceTaskAction).asInstanceOf[EntranceTaskResult]
   }
 
+  def getConfKeyValue(keyvalueAction: KeyvalueAction): KeyvalueResult = {
+    executeJob(keyvalueAction).asInstanceOf[KeyvalueResult]
+  }
+
+  def getInfoByDataSourceInfo(
+      datasourceInfoAction: DataSourceParamsAction
+  ): GetInfoPublishedByDataSourceNameResult = {
+    
executeJob(datasourceInfoAction).asInstanceOf[GetInfoPublishedByDataSourceNameResult]
+  }
+
+  def killJob(killJobAction: KillJobAction): KillJobResultAction = {
+    executeJob(killJobAction).asInstanceOf[KillJobResultAction]
+  }
+
+  def analyzeJob(analyzeJobAction: AnalyzeJobAction): AnalyzeJobResultAction = 
{
+    executeJob(analyzeJobAction).asInstanceOf[AnalyzeJobResultAction]
+  }
+
 }
 
 object MonitorHTTPClient {
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClientClientImpl.scala
 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClientClientImpl.scala
index 5554701571..8074aeeb62 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClientClientImpl.scala
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClientClientImpl.scala
@@ -21,14 +21,14 @@ import org.apache.linkis.httpclient.dws.DWSHttpClient
 import org.apache.linkis.httpclient.dws.config.DWSClientConfig
 import org.apache.linkis.httpclient.request.Action
 import org.apache.linkis.httpclient.response.Result
-import org.apache.linkis.monitor.request.MonitorResourceAction
+import org.apache.linkis.monitor.request.MonitorAction
 
 class MonitorHTTPClientClientImpl(clientConfig: DWSClientConfig) extends 
MonitorHTTPClient {
 
   private val dwsHttpClient =
     new DWSHttpClient(clientConfig, "Linkis-MonitorResource-Execution-Thread")
 
-  override protected[client] def executeJob(ujesJobAction: 
MonitorResourceAction): Result =
+  override protected[client] def executeJob(ujesJobAction: MonitorAction): 
Result =
     ujesJobAction match {
 
       case action: Action => dwsHttpClient.execute(action)
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorResourceClient.scala
 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorResourceClient.scala
index d0660e1116..77a3226ba3 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorResourceClient.scala
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorResourceClient.scala
@@ -21,7 +21,7 @@ import 
org.apache.linkis.httpclient.authentication.AuthenticationStrategy
 import 
org.apache.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy
 import org.apache.linkis.httpclient.dws.config.{DWSClientConfig, 
DWSClientConfigBuilder}
 import org.apache.linkis.httpclient.response.Result
-import org.apache.linkis.monitor.request.{EmsListAction, MonitorResourceAction}
+import org.apache.linkis.monitor.request.{EmsListAction, MonitorAction}
 import org.apache.linkis.ujes.client.response.EmsListResult
 
 import java.io.Closeable
@@ -29,7 +29,7 @@ import java.util.concurrent.TimeUnit
 
 abstract class MonitorResourceClient extends Closeable {
 
-  protected[client] def executeJob(ujesJobAction: MonitorResourceAction): 
Result
+  protected[client] def executeJob(monitorAction: MonitorAction): Result
 
   def list(jobListAction: EmsListAction): EmsListResult = {
     executeJob(jobListAction).asInstanceOf[EmsListResult]
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorResourceClientImpl.scala
 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorResourceClientImpl.scala
index 06cff3b46a..3112b2b63f 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorResourceClientImpl.scala
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorResourceClientImpl.scala
@@ -21,15 +21,15 @@ import org.apache.linkis.httpclient.dws.DWSHttpClient
 import org.apache.linkis.httpclient.dws.config.DWSClientConfig
 import org.apache.linkis.httpclient.request.Action
 import org.apache.linkis.httpclient.response.Result
-import org.apache.linkis.monitor.request.MonitorResourceAction
+import org.apache.linkis.monitor.request.MonitorAction
 
 class MonitorResourceClientImpl(clientConfig: DWSClientConfig) extends 
MonitorResourceClient {
 
   private val dwsHttpClient =
     new DWSHttpClient(clientConfig, "Linkis-MonitorResource-Execution-Thread")
 
-  override protected[client] def executeJob(ujesJobAction: 
MonitorResourceAction): Result =
-    ujesJobAction match {
+  override protected[client] def executeJob(monitorAction: MonitorAction): 
Result =
+    monitorAction match {
 
       case action: Action => dwsHttpClient.execute(action)
 
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/constants/Constants.scala
 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/constants/Constants.scala
index 9da7e35ebd..affa0ccb83 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/constants/Constants.scala
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/constants/Constants.scala
@@ -79,6 +79,7 @@ object Constants {
   val BML_CLEAR_IM = "bml.clear.monitor.im."
   val THREAD_TIME_OUT_IM = "thread.monitor.timeout.im."
   val JOB_RESULT_IM = "jobhistory.result.monitor.im."
+  val DEPARTMENT_USER_IM = "department.user.sync.im."
 
   val BML_VERSION_MAX_NUM: CommonVars[Int] =
     CommonVars[Int]("linkis.monitor.bml.cleaner.version.max.num", 50)
@@ -98,4 +99,18 @@ object Constants {
   val LINKIS_CLUSTER_NAME =
     CommonVars.properties.getProperty("linkis.cluster.name", "")
 
+  val ADMIN_USER = "hadoop"
+
+  val SPLIT_DELIMITER = ";"
+
+  val JDBC_ALERT_TIME = "linkis.jdbc.task.timeout.alert.time"
+
+  val JDBC_ALERT_USER = "linkis.jdbc.task.timeout.alert.user"
+
+  val JDBC_ALERT_LEVEL = "linkis.jdbc.task.timeout.alert.level"
+
+  val JOB_DATASOURCE_CONF = "wds.linkis.engine.runtime.datasource"
+
+  val JDBC_ENGINE = "jdbc"
+
 }
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/factory/MapperFactory.scala
 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/factory/MapperFactory.scala
index eb503c52aa..3f81c66514 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/factory/MapperFactory.scala
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/factory/MapperFactory.scala
@@ -17,6 +17,7 @@
 
 package org.apache.linkis.monitor.factory
 
+import org.apache.linkis.monitor.department.dao.UserDepartmentInfoMapper
 import org.apache.linkis.monitor.instance.dao.{
   InsLabelRelationDao,
   InstanceInfoDao,
@@ -34,6 +35,8 @@ object MapperFactory {
 
   private var instanceLabelRelationMapper: InsLabelRelationDao = _
 
+  private var userDepartmentInfoMapper: UserDepartmentInfoMapper = _
+
   def getJobHistoryMapper(): JobHistoryMapper = jobHistoryMapper
 
   def setJobHistoryMapper(jobHistoryMapper: JobHistoryMapper): Unit = {
@@ -58,4 +61,12 @@ object MapperFactory {
     MapperFactory.instanceLabelRelationMapper = instanceLabelRelationMapper
   }
 
+  // 获取userDepartmentInfoMapper的值
+  def getUserDepartmentInfoMapper: UserDepartmentInfoMapper = 
userDepartmentInfoMapper
+
+  // 设置userDepartmentInfoMapper的值
+  def setUserDepartmentInfoMapper(mapper: UserDepartmentInfoMapper): Unit = {
+    userDepartmentInfoMapper = mapper
+  }
+
 }
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/JobHistoryDataFetcher.scala
 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/JobHistoryDataFetcher.scala
index 4f43d86d40..4f553847f6 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/JobHistoryDataFetcher.scala
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/JobHistoryDataFetcher.scala
@@ -54,60 +54,46 @@ class JobHistoryDataFetcher(args: Array[Any], mapper: 
JobHistoryMapper)
         "Wrong input for JobHistoryDataFetcher. DataType: " + 
args.getClass.getCanonicalName
       )
     }
-    if (args != null && args.length == 2) {
-      val start = Utils.tryCatch(args(0).asInstanceOf[String].toLong) { t =>
-        {
-          logger.error("Failed to get data from DB: Illegal arguments.", t)
-          throw t
-        }
-      }
-      val end = Utils.tryCatch(args(1).asInstanceOf[String].toLong) { t =>
-        {
-          logger.error("Failed to get data from DB: Illegal arguments.", t)
-          throw t
-        }
-      }
-      mapper
-        .search(null, null, null, new Date(start), new Date(end), null)
-        .asInstanceOf[util.List[scala.Any]]
-    } else if (args != null && args.length == 4) {
-      val start = Utils.tryCatch(args(0).asInstanceOf[String].toLong) { t =>
-        {
-          logger.error("Failed to get data from DB: Illegal arguments.", t)
-          throw t
-        }
-      }
-      val end = Utils.tryCatch(args(1).asInstanceOf[String].toLong) { t =>
-        {
-          logger.error("Failed to get data from DB: Illegal arguments.", t)
-          throw t
-        }
-      }
-      val id = Utils.tryCatch(args(2).asInstanceOf[String].toLong) { t =>
-        {
-          logger.error("Failed to get data from DB: Illegal arguments.", t)
-          throw t
-        }
-      }
-      if (
-          StringUtils.isNotBlank(args(3).asInstanceOf[String]) && args(3)
-            .asInstanceOf[String]
-            .equals("updated_time")
-      ) {
-        val list = new util.ArrayList[String]()
-        Constants.DATA_FINISHED_JOB_STATUS_ARRAY.foreach(list.add)
-        mapper
-          .searchByCacheAndUpdateTime(id, null, list, new Date(start), new 
Date(end), null)
-          .asInstanceOf[util.List[scala.Any]]
-      } else {
-        var list = new util.ArrayList[String]()
-        Constants.DATA_UNFINISHED_JOB_STATUS_ARRAY.foreach(list.add)
-        if (args(3).asInstanceOf[String].equals("department")) {
-          list = null;
-        }
-        mapper
-          .searchByCache(id, null, list, new Date(start), new Date(end), null)
-          .asInstanceOf[util.List[scala.Any]]
+    if (args != null) {
+      val start = args(0).asInstanceOf[String].toLong
+      val end = args(1).asInstanceOf[String].toLong
+      // 根据参数数量进行不同的处理
+      args.length match {
+        // 参数数量为2,则数据库查询仅筛选开始和结束时间
+        case 2 =>
+          mapper
+            .search(null, null, null, new Date(start), new Date(end), null)
+            .asInstanceOf[util.List[scala.Any]]
+        // 参数数量为4,根据第四个参数进行不同的查询
+        case 4 =>
+          val id = args(2).asInstanceOf[String].toLong
+          val parm = args(3).asInstanceOf[String]
+          parm match {
+            // 筛选任务包含id,时间,已完成状态任务
+            case "finished_job" =>
+              val list = new util.ArrayList[String]()
+              Constants.DATA_FINISHED_JOB_STATUS_ARRAY.foreach(list.add)
+              mapper
+                .searchByCacheAndUpdateTime(id, null, list, new Date(start), 
new Date(end), null)
+                .asInstanceOf[util.List[scala.Any]]
+            // 筛选任务包含id,时间,未完成状态任务
+            case "unfinished_job" =>
+              var list = new util.ArrayList[String]()
+              Constants.DATA_UNFINISHED_JOB_STATUS_ARRAY.foreach(list.add)
+              mapper
+                .searchByCache(id, null, list, new Date(start), new Date(end), 
null)
+                .asInstanceOf[util.List[scala.Any]]
+            // 筛选任务包含id,时间
+            case _ =>
+              mapper
+                .searchByCache(id, null, null, new Date(start), new Date(end), 
null)
+                .asInstanceOf[util.List[scala.Any]]
+          }
+        case _ =>
+          throw new AnomalyScannerException(
+            21304,
+            "Wrong input for JobHistoryDataFetcher. Data: " + args
+          )
       }
     } else {
       throw new AnomalyScannerException(
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeAlertSender.scala
similarity index 55%
copy from 
linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
copy to 
linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeAlertSender.scala
index aa73471c49..2b17adb39b 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeAlertSender.scala
@@ -15,17 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.monitor.jobhistory;
+package org.apache.linkis.monitor.jobhistory.analyze
 
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Date;
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.monitor.constants.Constants
+import org.apache.linkis.monitor.core.ob.{Event, Observer}
+import org.apache.linkis.monitor.jobhistory.entity.JobHistory
+import org.apache.linkis.monitor.jobhistory.exception.AnomalyScannerException
+import org.apache.linkis.monitor.utils.alert.ims.{MonitorAlertUtils, 
PooledImsAlertUtils}
 
-public class QueryUtils {
+import java.util
 
-  private static DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss.SSS");
+import scala.collection.JavaConverters._
+
+class JobHistoryAnalyzeAlertSender() extends Observer with Logging {
+  override def update(e: Event, jobHistroyList: scala.Any): Unit = {}
 
-  public static String dateToString(Date date) {
-    return dateFormat.format(date);
-  }
 }
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/MonitorResourceAction.scala
 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeHitEvent.scala
similarity index 82%
copy from 
linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/MonitorResourceAction.scala
copy to 
linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeHitEvent.scala
index 7ea2001481..675aacfc98 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/MonitorResourceAction.scala
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeHitEvent.scala
@@ -15,8 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.monitor.request
+package org.apache.linkis.monitor.jobhistory.analyze
 
-import org.apache.linkis.httpclient.dws.request.DWSHttpAction
+import org.apache.linkis.monitor.core.ob.SingleObserverEvent
 
-trait MonitorResourceAction extends DWSHttpAction with UserAction
+class JobHistoryAnalyzeHitEvent extends SingleObserverEvent
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeRule.scala
 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeRule.scala
new file mode 100644
index 0000000000..aec128ffab
--- /dev/null
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeRule.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.monitor.jobhistory.analyze
+
+import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.monitor.constants.Constants
+import org.apache.linkis.monitor.core.ob.Observer
+import org.apache.linkis.monitor.core.pac.{AbstractScanRule, ScannedData}
+import org.apache.linkis.monitor.jobhistory.entity.JobHistory
+import org.apache.linkis.monitor.until.{CacheUtils, HttpsUntils, ThreadUtils}
+import org.apache.linkis.monitor.utils.job.JohistoryUtils
+
+import java.util
+
+class JobHistoryAnalyzeRule(hitObserver: Observer)
+    extends AbstractScanRule(event = new JobHistoryAnalyzeHitEvent, observer = 
hitObserver)
+    with Logging {
+  private val scanRuleList = CacheUtils.cacheBuilder
+
+  /**
+   * if data match the pattern, return true and trigger observer should call 
isMatched()
+   *
+   * @param data
+   * @return
+   */
+  override def triggerIfMatched(data: util.List[ScannedData]): Boolean = {
+    if (!getHitEvent.isRegistered) {
+      logger.error("ScanRule is not bind with an observer. Will not be 
triggered")
+      return false
+    }
+    for (scanedData <- JohistoryUtils.getJobhistorySanData(data)) {
+      scanedData match {
+        case jobHistory: JobHistory =>
+          val jobStatus = jobHistory.getStatus.toUpperCase()
+          if (Constants.FINISHED_JOB_STATUS.contains(jobStatus) && 
jobStatus.equals("FAILED")) {
+            // 执行任务分析
+            ThreadUtils.analyzeRun(jobHistory)
+          }
+        case _ =>
+      }
+    }
+    true
+  }
+
+}
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/JobTimeExceedRule.scala
 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/JobTimeExceedRule.scala
index f788173e43..0367bfc05e 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/JobTimeExceedRule.scala
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/JobTimeExceedRule.scala
@@ -18,6 +18,7 @@
 package org.apache.linkis.monitor.jobhistory.jobtime
 
 import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.monitor.config.MonitorConfig
 import org.apache.linkis.monitor.constants.Constants
 import org.apache.linkis.monitor.core.ob.Observer
 import org.apache.linkis.monitor.core.pac.{AbstractScanRule, ScannedData}
@@ -26,7 +27,7 @@ import 
org.apache.linkis.monitor.jobhistory.exception.AnomalyScannerException
 import org.apache.linkis.monitor.until.CacheUtils
 
 import java.util
-import java.util.Locale
+import java.util.{Locale, Optional}
 
 import scala.collection.JavaConverters._
 
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedAlertSender.scala
 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedAlertSender.scala
new file mode 100644
index 0000000000..4e6e707335
--- /dev/null
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedAlertSender.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.monitor.jobhistory.jobtime
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.monitor.constants.Constants
+import org.apache.linkis.monitor.core.ob.{Event, Observer}
+import org.apache.linkis.monitor.jobhistory.entity.JobHistory
+import org.apache.linkis.monitor.jobhistory.exception.AnomalyScannerException
+import org.apache.linkis.monitor.until.HttpsUntils
+import org.apache.linkis.monitor.utils.alert.ims.{MonitorAlertUtils, 
PooledImsAlertUtils}
+
+import org.apache.commons.collections.MapUtils
+import org.apache.commons.lang3.StringUtils
+
+import java.util
+
+import scala.collection.JavaConverters.asScalaBufferConverter
+
+class StarrocksTimeExceedAlertSender extends Observer with Logging {
+
+  /**
+   * Observer Pattern
+   */
+  override def update(e: Event, jobHistroyList: scala.Any): Unit = {
+    if (!e.isInstanceOf[StarrocksTimeExceedHitEvent]) {
+      throw new AnomalyScannerException(
+        21304,
+        "Wrong event that triggers JobHistoryErrorCodeAlertSender. Input 
DataType: " + e.getClass.getCanonicalName
+      )
+    }
+    if (null == jobHistroyList || !jobHistroyList.isInstanceOf[util.List[_]]) {
+      throw new AnomalyScannerException(
+        21304,
+        "Wrong input for JobHistoryErrorCodeAlertSender. Input DataType: " + 
jobHistroyList.getClass.getCanonicalName
+      )
+    }
+    for (a <- jobHistroyList.asInstanceOf[util.List[_]].asScala) {
+      if (a == null) {
+        logger.warn("Ignore null input data")
+      } else if (!a.isInstanceOf[JobHistory]) {
+        logger.warn("Ignore wrong input data Type : " + 
a.getClass.getCanonicalName)
+      } else {
+        val jobHistory = a.asInstanceOf[JobHistory]
+        val timeValue =
+          HttpsUntils.getJDBCConf(jobHistory.getSubmitUser, 
Constants.JDBC_ALERT_TIME)
+        val userValue =
+          HttpsUntils.getJDBCConf(jobHistory.getSubmitUser, 
Constants.JDBC_ALERT_USER)
+        var levelValue =
+          HttpsUntils.getJDBCConf(jobHistory.getSubmitUser, 
Constants.JDBC_ALERT_LEVEL)
+        if (StringUtils.isNotBlank(timeValue) && 
StringUtils.isNotBlank(userValue)) {
+          val replaceParm: util.HashMap[String, String] = new 
util.HashMap[String, String]
+          replaceParm.put("$id", String.valueOf(jobHistory.getId))
+          replaceParm.put("$timeoutTime", timeValue)
+          replaceParm.put("$alteruser", userValue)
+          replaceParm.put("$eccAlertUser", userValue)
+          replaceParm.put("$submitUser", jobHistory.getSubmitUser)
+          if (StringUtils.isBlank(levelValue)) {
+            levelValue = "3";
+          }
+          replaceParm.put("$alterLevel", levelValue)
+          val alters = 
MonitorAlertUtils.getAlerts(Constants.USER_LABEL_MONITOR, replaceParm)
+          PooledImsAlertUtils.addAlert(alters.get("12020"))
+        }
+      }
+    }
+  }
+
+}
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/MonitorResourceAction.scala
 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedHitEvent.scala
similarity index 82%
copy from 
linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/MonitorResourceAction.scala
copy to 
linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedHitEvent.scala
index 7ea2001481..af5432cd9f 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/MonitorResourceAction.scala
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedHitEvent.scala
@@ -15,8 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.monitor.request
+package org.apache.linkis.monitor.jobhistory.jobtime
 
-import org.apache.linkis.httpclient.dws.request.DWSHttpAction
+import org.apache.linkis.monitor.core.ob.SingleObserverEvent
 
-trait MonitorResourceAction extends DWSHttpAction with UserAction
+class StarrocksTimeExceedHitEvent extends SingleObserverEvent
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedRule.scala
 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedRule.scala
new file mode 100644
index 0000000000..b616c5c02c
--- /dev/null
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedRule.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.monitor.jobhistory.jobtime
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.monitor.constants.Constants
+import org.apache.linkis.monitor.core.ob.Observer
+import org.apache.linkis.monitor.core.pac.{AbstractScanRule, ScannedData}
+import org.apache.linkis.monitor.jobhistory.entity.JobHistory
+import org.apache.linkis.monitor.until.{CacheUtils, HttpsUntils}
+import org.apache.linkis.server.BDPJettyServerHelper
+
+import org.apache.commons.collections.MapUtils
+import org.apache.commons.lang3.StringUtils
+
+import java.util
+import java.util.{ArrayList, HashMap, List, Locale, Map}
+
+import scala.collection.JavaConverters._
+
+class StarrocksTimeExceedRule(hitObserver: Observer)
+    extends AbstractScanRule(event = new StarrocksTimeExceedHitEvent, observer 
= hitObserver)
+    with Logging {
+
+  private val scanRuleList = CacheUtils.cacheBuilder
+
+  /**
+   * if data match the pattern, return true and trigger observer should call 
isMatched()
+   *
+   * @param data
+   * @return
+   */
+  override def triggerIfMatched(data: util.List[ScannedData]): Boolean = {
+
+    if (!getHitEvent().isRegistered || data == null) {
+      logger.error("ScanRule is not bind with an observer. Will not be 
triggered")
+      return false
+    }
+    val alertData: util.List[JobHistory] = new util.ArrayList[JobHistory]()
+    for (scannedData <- data.asScala) {
+      if (scannedData != null && scannedData.getData() != null) {
+        var taskMinID = 0L;
+        for (jobHistory <- scannedData.getData().asScala) {
+          jobHistory match {
+            case job: JobHistory =>
+              val status = job.getStatus.toUpperCase(Locale.getDefault)
+              val engineType = job.getEngineType.toUpperCase(Locale.getDefault)
+              if (
+                  Constants.UNFINISHED_JOB_STATUS
+                    .contains(status) && engineType.equals(
+                    Constants.JDBC_ENGINE.toUpperCase(Locale.getDefault)
+                  )
+              ) {
+                // 获取job所使用的数据源类型
+                val datasourceConfMap = getDatasourceConf(job)
+                logger.info("starock  datasourceConfMap: {}", 
datasourceConfMap)
+                // 计算任务执行时间
+                val elapse = System.currentTimeMillis() - 
job.getCreatedTime.getTime
+                // 获取告警配置
+                val timeValue =
+                  HttpsUntils.getJDBCConf(job.getSubmitUser, 
Constants.JDBC_ALERT_TIME)
+                logger.info("starock  timeValue: {},elapse   {}", timeValue, 
elapse)
+                if (StringUtils.isNotBlank(timeValue)) {
+                  val timeoutInSeconds = timeValue.toDouble
+                  val timeoutInMillis = (timeoutInSeconds * 60 * 1000).toLong
+                  if (elapse > timeoutInMillis) {
+                    // 发送告警
+                    alertData.add(job)
+                  }
+                }
+              }
+              if (taskMinID == 0L || taskMinID > job.getId) {
+                taskMinID = job.getId
+                scanRuleList.put("jdbcUnfinishedAlertScan", taskMinID)
+              }
+            case _ =>
+              logger.warn(
+                "Ignored wrong input data Type : " + jobHistory + ", " + 
jobHistory.getClass.getCanonicalName
+              )
+          }
+        }
+      } else {
+        logger.warn("Ignored null scanned data")
+      }
+
+    }
+    logger.info("hit " + alertData.size() + " data in one iteration")
+    if (alertData.size() > 0) {
+      getHitEvent.notifyObserver(getHitEvent, alertData)
+      true
+    } else {
+      false
+    }
+  }
+
+  private def getDatasourceConf(job: JobHistory): util.Map[_, _] = {
+    // 获取任务参数中datasourcename
+    val parmMap =
+      BDPJettyServerHelper.gson.fromJson(job.getParams, 
classOf[java.util.Map[String, String]])
+    val configurationMap =
+      MapUtils.getMap(parmMap, "configuration", new util.HashMap[String, 
String]())
+    val runtimeMap =
+      MapUtils.getMap(configurationMap, "runtime", new util.HashMap[String, 
String]())
+    val datasourceName = MapUtils.getString(runtimeMap, 
Constants.JOB_DATASOURCE_CONF, "")
+    // 获取datasource信息
+    if (StringUtils.isNotBlank(datasourceName)) {
+      HttpsUntils.getDatasourceConf(job.getSubmitUser, datasourceName)
+    } else {
+      new util.HashMap[String, String]()
+    }
+  }
+
+}
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/UserAction.scala
 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillAlertSender.scala
similarity index 70%
copy from 
linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/UserAction.scala
copy to 
linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillAlertSender.scala
index 4733a1b45f..18553f228e 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/UserAction.scala
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillAlertSender.scala
@@ -15,12 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.monitor.request
+package org.apache.linkis.monitor.jobhistory.jobtime
 
-trait UserAction extends org.apache.linkis.httpclient.request.UserAction {
-  private var user: String = _
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.monitor.core.ob.{Event, Observer}
 
-  override def setUser(user: String): Unit = this.user = user
+class StarrocksTimeKillAlertSender extends Observer with Logging {
+
+  /**
+   * Observer Pattern
+   */
+  override def update(e: Event, jobHistroyList: scala.Any): Unit = {}
 
-  override def getUser: String = user
 }
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillRule.scala
 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillRule.scala
new file mode 100644
index 0000000000..e5df6f3ff7
--- /dev/null
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillRule.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.monitor.jobhistory.jobtime
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.monitor.constants.Constants
+import org.apache.linkis.monitor.core.ob.Observer
+import org.apache.linkis.monitor.core.pac.{AbstractScanRule, ScannedData}
+import org.apache.linkis.monitor.jobhistory.entity.JobHistory
+import org.apache.linkis.monitor.until.{CacheUtils, HttpsUntils}
+import org.apache.linkis.server.BDPJettyServerHelper
+
+import org.apache.commons.collections.MapUtils
+import org.apache.commons.lang3.StringUtils
+
+import java.util
+import java.util.Locale
+
+import scala.collection.JavaConverters._
+
+class StarrocksTimeKillRule(hitObserver: Observer)
+    extends AbstractScanRule(event = new StarrocksTimeKillHitEvent, observer = 
hitObserver)
+    with Logging {
+
+  private val scanRuleList = CacheUtils.cacheBuilder
+
+  /**
+   * if data match the pattern, return true and trigger observer should call 
isMatched()
+   *
+   * @param data
+   * @return
+   */
+  override def triggerIfMatched(data: util.List[ScannedData]): Boolean = {
+
+    if (!getHitEvent().isRegistered || data == null) {
+      logger.error("ScanRule is not bind with an observer. Will not be 
triggered")
+      return false
+    }
+    for (scannedData <- data.asScala) {
+      if (scannedData != null && scannedData.getData() != null) {
+        var taskMinID = 0L;
+        for (jobHistory <- scannedData.getData().asScala) {
+          jobHistory match {
+            case job: JobHistory =>
+              val status = job.getStatus.toUpperCase(Locale.getDefault)
+              val engineType = job.getEngineType.toUpperCase(Locale.getDefault)
+              if (
+                  Constants.UNFINISHED_JOB_STATUS
+                    .contains(status) && engineType.equals(
+                    Constants.JDBC_ENGINE.toUpperCase(Locale.getDefault)
+                  )
+              ) {
+                // 计算任务执行时间
+                val elapse = System.currentTimeMillis() - 
job.getCreatedTime.getTime
+                // 获取超时kill配置信息
+                if (StringUtils.isNotBlank(job.getParams)) {
+                  val connectParamsMap = MapUtils.getMap(
+                    getDatasourceConf(job),
+                    "connectParams",
+                    new util.HashMap[AnyRef, AnyRef]
+                  )
+                  val killTime = MapUtils.getString(connectParamsMap, 
"kill_task_time", "")
+                  logger.info("starock  killTime: {}", killTime)
+                  if (StringUtils.isNotBlank(killTime) && elapse > 
killTime.toLong * 60 * 1000) {
+                    if (StringUtils.isNotBlank(killTime)) {
+                      val timeoutInSeconds = killTime.toDouble
+                      val timeoutInMillis = (timeoutInSeconds * 60 * 
1000).toLong
+                      if (elapse > timeoutInMillis) {
+                        // 触发kill任务
+                        HttpsUntils.killJob(job)
+                      }
+                    }
+                  }
+                }
+              }
+              if (taskMinID == 0L || taskMinID > job.getId) {
+                taskMinID = job.getId
+                scanRuleList.put("jdbcUnfinishedKillScan", taskMinID)
+              }
+            case _ =>
+              logger.warn(
+                "Ignored wrong input data Type : " + jobHistory + ", " + 
jobHistory.getClass.getCanonicalName
+              )
+          }
+        }
+      } else {
+        logger.warn("Ignored null scanned data")
+      }
+    }
+    true
+  }
+
+  private def getDatasourceConf(job: JobHistory): util.Map[_, _] = {
+    // 获取任务参数中datasourcename
+    val parmMap =
+      BDPJettyServerHelper.gson.fromJson(job.getParams, 
classOf[java.util.Map[String, String]])
+    val configurationMap =
+      MapUtils.getMap(parmMap, "configuration", new util.HashMap[String, 
String]())
+    val runtimeMap =
+      MapUtils.getMap(configurationMap, "runtime", new util.HashMap[String, 
String]())
+    val datasourceName = MapUtils.getString(runtimeMap, 
Constants.JOB_DATASOURCE_CONF, "")
+    // 获取datasource信息
+    if (StringUtils.isNotBlank(datasourceName)) {
+      HttpsUntils.getDatasourceConf(job.getSubmitUser, datasourceName)
+    } else {
+      new util.HashMap[String, String]()
+    }
+  }
+
+}
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/EmsListAction.scala
 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/AnalyzeJobAction.scala
similarity index 50%
copy from 
linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/EmsListAction.scala
copy to 
linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/AnalyzeJobAction.scala
index 6f3158e869..333c7ff1c2 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/EmsListAction.scala
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/AnalyzeJobAction.scala
@@ -21,35 +21,23 @@ import org.apache.linkis.httpclient.request.GetAction
 
 import org.apache.commons.lang3.StringUtils
 
-import scala.collection.mutable.ArrayBuffer
+class AnalyzeJobAction extends GetAction with MonitorAction {
 
-class EmsListAction extends GetAction with MonitorResourceAction {
-
-  override def suffixURLs: Array[String] = Array("linkisManager", "listAllEMs")
+  override def suffixURLs: Array[String] = Array("jobhistory", 
"diagnosis-query")
 
 }
 
-object EmsListAction {
+object AnalyzeJobAction {
+
   def newBuilder(): Builder = new Builder
 
-  class Builder private[EmsListAction] () {
-    private var user: String = _
-    private var instance: String = _
-    private var nodeHealthy: String = _
-    private var owner: String = _
+  class Builder private[AnalyzeJobAction] () {
 
-    def setInstance(instance: String): Builder = {
-      this.instance = instance
-      this
-    }
-
-    def setNodeHealthy(nodeHealthy: String): Builder = {
-      this.nodeHealthy = nodeHealthy
-      this
-    }
+    private var taskID: String = _
+    private var user: String = _
 
-    def setOwner(owner: String): Builder = {
-      this.owner = owner
+    def setTaskID(taskID: String): Builder = {
+      this.taskID = taskID
       this
     }
 
@@ -58,15 +46,11 @@ object EmsListAction {
       this
     }
 
-    def build(): EmsListAction = {
-      val emsListAction = new EmsListAction
-      if (StringUtils.isNotBlank(instance)) 
emsListAction.setParameter("instance", instance)
-      if (StringUtils.isNotBlank(nodeHealthy)) {
-        emsListAction.setParameter("nodeHealthy", nodeHealthy)
-      }
-      if (StringUtils.isNotBlank(owner)) emsListAction.setParameter("owner", 
owner)
-      if (StringUtils.isNotBlank(user)) emsListAction.setUser(user)
-      emsListAction
+    def build(): AnalyzeJobAction = {
+      val analyzeJobAction = new AnalyzeJobAction
+      if (StringUtils.isNotBlank(taskID)) 
analyzeJobAction.setParameter("taskID", taskID)
+      if (StringUtils.isNotBlank(user)) analyzeJobAction.setUser(user)
+      analyzeJobAction
     }
 
   }
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/DataSourceParamsAction.scala
 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/DataSourceParamsAction.scala
new file mode 100644
index 0000000000..11b0167aef
--- /dev/null
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/DataSourceParamsAction.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.monitor.request
+
+import 
org.apache.linkis.datasource.client.config.DatasourceClientConfig.DATA_SOURCE_SERVICE_MODULE
+import 
org.apache.linkis.datasource.client.errorcode.DatasourceClientErrorCodeSummary
+import 
org.apache.linkis.datasource.client.exception.DataSourceClientBuilderException
+import org.apache.linkis.httpclient.request.GetAction
+import org.apache.linkis.monitor.request.KeyvalueAction.Builder
+
+import org.apache.commons.lang3.StringUtils
+
+import java.net.URLEncoder
+import java.nio.charset.StandardCharsets
+import java.text.MessageFormat
+
+class DataSourceParamsAction extends GetAction with MonitorAction {
+
+  private var dataSourceName: String = _
+
+  override def suffixURLs: Array[String] =
+    Array(DATA_SOURCE_SERVICE_MODULE.getValue, "publishedInfo", "name", 
dataSourceName)
+
+  private var user: String = _
+
+  override def setUser(user: String): Unit = this.user = user
+
+  override def getUser: String = this.user
+}
+
+object DataSourceParamsAction {
+  def builder(): Builder = new Builder
+
+  class Builder private[DataSourceParamsAction] () {
+    private var dataSourceName: String = _
+    private var system: String = _
+    private var user: String = _
+
+    def setUser(user: String): Builder = {
+      this.user = user
+      this
+    }
+
+    def setDataSourceName(dataSourceName: String): Builder = {
+      this.dataSourceName = dataSourceName
+      this
+    }
+
+    def setSystem(system: String): Builder = {
+      this.system = system
+      this
+    }
+
+    def build(): DataSourceParamsAction = {
+      if (dataSourceName == null) {
+        throw new DataSourceClientBuilderException(
+          DatasourceClientErrorCodeSummary.DATASOURCENAME_NEEDED.getErrorDesc
+        )
+      }
+      if (system == null)
+        throw new DataSourceClientBuilderException(
+          DatasourceClientErrorCodeSummary.SYSTEM_NEEDED.getErrorDesc
+        )
+      if (user == null)
+        throw new DataSourceClientBuilderException(
+          DatasourceClientErrorCodeSummary.USER_NEEDED.getErrorDesc
+        )
+      val dataSourceParamsAction = new DataSourceParamsAction
+      dataSourceParamsAction.dataSourceName = this.dataSourceName
+      dataSourceParamsAction.setParameter("system", system)
+      dataSourceParamsAction.setUser(user)
+      dataSourceParamsAction
+    }
+
+  }
+
+}
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/EmsListAction.scala
 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/EmsListAction.scala
index 6f3158e869..3ac1719de5 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/EmsListAction.scala
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/EmsListAction.scala
@@ -23,7 +23,7 @@ import org.apache.commons.lang3.StringUtils
 
 import scala.collection.mutable.ArrayBuffer
 
-class EmsListAction extends GetAction with MonitorResourceAction {
+class EmsListAction extends GetAction with MonitorAction {
 
   override def suffixURLs: Array[String] = Array("linkisManager", "listAllEMs")
 
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/EntranceTaskAction.scala
 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/EntranceTaskAction.scala
index f3175d802f..4e60b59a30 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/EntranceTaskAction.scala
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/EntranceTaskAction.scala
@@ -21,7 +21,7 @@ import org.apache.linkis.httpclient.request.GetAction
 
 import org.apache.commons.lang3.StringUtils
 
-class EntranceTaskAction extends GetAction with MonitorResourceAction {
+class EntranceTaskAction extends GetAction with MonitorAction {
   override def suffixURLs: Array[String] = Array("entrance/operation/metrics", 
"taskinfo")
 }
 
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/KeyvalueAction.scala
 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/KeyvalueAction.scala
new file mode 100644
index 0000000000..654de3ed28
--- /dev/null
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/KeyvalueAction.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.monitor.request
+
+import org.apache.linkis.httpclient.request.GetAction
+import org.apache.linkis.ujes.client.request.UJESJobAction
+
+import org.apache.commons.lang3.StringUtils
+
+class KeyvalueAction extends GetAction with MonitorAction {
+
+  override def suffixURLs: Array[String] = Array("configuration", "keyvalue")
+
+}
+
+object KeyvalueAction {
+
+  def newBuilder(): Builder = new Builder
+
+  class Builder private[KeyvalueAction] () {
+
+    private var engineType: String = _
+    private var version: String = _
+    private var creator: String = _
+    private var configKey: String = _
+    private var user: String = _
+
+    def setEngineType(engineType: String): Builder = {
+      this.engineType = engineType
+      this
+    }
+
+    def setVersion(version: String): Builder = {
+      this.version = version
+      this
+    }
+
+    def setCreator(creator: String): Builder = {
+      this.creator = creator
+      this
+    }
+
+    def setConfigKey(configKey: String): Builder = {
+      this.configKey = configKey
+      this
+    }
+
+    def setUser(user: String): Builder = {
+      this.user = user
+      this
+    }
+
+    def build(): KeyvalueAction = {
+      val keyvalueAction = new KeyvalueAction
+      if (StringUtils.isNotBlank(engineType)) 
keyvalueAction.setParameter("engineType", engineType)
+      if (StringUtils.isNotBlank(version)) 
keyvalueAction.setParameter("version", version)
+      if (StringUtils.isNotBlank(creator)) 
keyvalueAction.setParameter("creator", creator)
+      if (StringUtils.isNotBlank(configKey)) 
keyvalueAction.setParameter("configKey", configKey)
+      if (StringUtils.isNotBlank(user)) keyvalueAction.setUser(user)
+      keyvalueAction
+    }
+
+  }
+
+}
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/KillJobAction.scala
 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/KillJobAction.scala
new file mode 100644
index 0000000000..4b23626efb
--- /dev/null
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/KillJobAction.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.monitor.request
+
+import org.apache.linkis.httpclient.dws.DWSHttpClient
+import org.apache.linkis.httpclient.request.POSTAction
+
+import java.util
+
+class KillJobAction extends POSTAction with MonitorAction {
+
+  private var execID: String = _
+
+  override def suffixURLs: Array[String] = Array("entrance", execID, 
"killJobs")
+
+  override def getRequestPayload: String =
+    DWSHttpClient.jacksonJson.writeValueAsString(getRequestPayloads)
+
+}
+
+object KillJobAction {
+
+  def builder(): Builder = new Builder
+
+  class Builder private[KillJobAction] () {
+    private var user: String = _
+
+    private var idList: util.List[String] = _
+
+    private var taskIDList: util.List[Long] = _
+
+    private var execID: String = _
+
+    def setIdList(idList: util.List[String]): Builder = {
+      this.idList = idList
+      this
+    }
+
+    def setTaskIDList(taskIDList: util.List[Long]): Builder = {
+      this.taskIDList = taskIDList
+      this
+    }
+
+    def setExecID(execID: String): Builder = {
+      this.execID = execID
+      this
+    }
+
+    def setUser(user: String): Builder = {
+      this.user = user
+      this
+    }
+
+    def build(): KillJobAction = {
+      val killJobAction = new KillJobAction
+      killJobAction.setUser(user)
+      killJobAction.addRequestPayload("idList", idList)
+      killJobAction.addRequestPayload("taskIDList", taskIDList)
+      killJobAction.execID = execID
+      killJobAction
+    }
+
+  }
+
+}
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/MonitorResourceAction.scala
 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/MonitorResourceAction.scala
index 7ea2001481..8c203303a0 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/MonitorResourceAction.scala
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/MonitorResourceAction.scala
@@ -18,5 +18,6 @@
 package org.apache.linkis.monitor.request
 
 import org.apache.linkis.httpclient.dws.request.DWSHttpAction
+import org.apache.linkis.ujes.client.request.UserAction
 
-trait MonitorResourceAction extends DWSHttpAction with UserAction
+trait MonitorAction extends DWSHttpAction with UserAction
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/UserAction.scala
 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/UserAction.scala
index 4733a1b45f..7cdabf33a8 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/UserAction.scala
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/UserAction.scala
@@ -15,12 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.monitor.request
+package org.apache.linkis.monitor.jobhistory.jobtime
 
-trait UserAction extends org.apache.linkis.httpclient.request.UserAction {
-  private var user: String = _
+import org.apache.linkis.monitor.core.ob.SingleObserverEvent
 
-  override def setUser(user: String): Unit = this.user = user
-
-  override def getUser: String = user
-}
+class StarrocksTimeKillHitEvent extends SingleObserverEvent
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/response/AnalyzeJobResultAction.scala
similarity index 65%
copy from 
linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
copy to 
linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/response/AnalyzeJobResultAction.scala
index aa73471c49..3c7b0c03c3 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/response/AnalyzeJobResultAction.scala
@@ -15,17 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.monitor.jobhistory;
+package org.apache.linkis.monitor.response
 
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Date;
+import org.apache.linkis.httpclient.dws.annotation.DWSHttpMessageResult
+import org.apache.linkis.httpclient.dws.response.DWSResult
 
-public class QueryUtils {
+import java.util
 
-  private static DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss.SSS");
+import scala.beans.BeanProperty
+
+@DWSHttpMessageResult("/api/rest_j/v\\d+/jobhistory/diagnosis-query")
+class AnalyzeJobResultAction extends DWSResult {
+
+  @BeanProperty
+  var messages: util.ArrayList[util.Map[String, AnyRef]] = _
 
-  public static String dateToString(Date date) {
-    return dateFormat.format(date);
-  }
 }
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/response/KeyvalueResult.scala
similarity index 63%
copy from 
linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
copy to 
linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/response/KeyvalueResult.scala
index aa73471c49..34289b188e 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/response/KeyvalueResult.scala
@@ -15,17 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.monitor.jobhistory;
+package org.apache.linkis.monitor.response
 
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Date;
+import org.apache.linkis.httpclient.dws.annotation.DWSHttpMessageResult
+import org.apache.linkis.httpclient.dws.response.DWSResult
 
-public class QueryUtils {
+import java.util
 
-  private static DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss.SSS");
+import scala.beans.BeanProperty
+
+@DWSHttpMessageResult("/api/rest_j/v\\d+/configuration/keyvalue")
+class KeyvalueResult extends DWSResult {
+
+  @BeanProperty
+  var configValues: util.ArrayList[util.Map[String, AnyRef]] = _
+
+  @BeanProperty
+  var totalPage: Int = _
 
-  public static String dateToString(Date date) {
-    return dateFormat.format(date);
-  }
 }
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/response/KillJobResultAction.scala
similarity index 66%
copy from 
linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
copy to 
linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/response/KillJobResultAction.scala
index aa73471c49..8d8392d94f 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/response/KillJobResultAction.scala
@@ -15,17 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.monitor.jobhistory;
+package org.apache.linkis.monitor.response
 
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Date;
+import org.apache.linkis.httpclient.dws.annotation.DWSHttpMessageResult
+import org.apache.linkis.httpclient.dws.response.DWSResult
 
-public class QueryUtils {
+import java.util
 
-  private static DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss.SSS");
+import scala.beans.BeanProperty
+
+@DWSHttpMessageResult("/api/rest_j/v\\d+/entrance/(\\S+)/killJobs")
+class KillJobResultAction extends DWSResult {
+
+  @BeanProperty
+  var messages: util.ArrayList[util.Map[String, AnyRef]] = _
 
-  public static String dateToString(Date date) {
-    return dateFormat.format(date);
-  }
 }
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/utils/alert/ims/MonitorAlertUtils.scala
 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/utils/alert/ims/MonitorAlertUtils.scala
index 798ed62c95..a0e94abc04 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/utils/alert/ims/MonitorAlertUtils.scala
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/utils/alert/ims/MonitorAlertUtils.scala
@@ -120,7 +120,10 @@ object MonitorAlertUtils extends Logging {
 
         val eccReceivers = {
           val set: util.Set[String] = new util.HashSet[String]
-          if (StringUtils.isNotBlank(data.eccReceivers)) {
+          if (
+              StringUtils
+                .isNotBlank(data.eccReceivers) && 
(!data.eccReceivers.equals("$eccAlertUser"))
+          ) {
             data.eccReceivers.split(",").map(r => set.add(r))
           }
           if (!repaceParams.containsKey("$eccAlertUser")) {
@@ -130,10 +133,7 @@ object MonitorAlertUtils extends Logging {
               }
             })
           } else {
-            set.add(repaceParams.get("$eccAlertUser"))
-          }
-          if (StringUtils.isNotBlank(repaceParams.get("eccReceiver"))) {
-            repaceParams.get("eccReceiver").split(",").map(r => set.add(r))
+            repaceParams.get("$eccAlertUser").split(",").map(r => set.add(r))
           }
           set
         }
@@ -141,14 +141,25 @@ object MonitorAlertUtils extends Logging {
         val subSystemId = repaceParams.getOrDefault("subSystemId", 
Constants.ALERT_SUB_SYSTEM_ID)
         val alertTitle = "集群[" + Constants.LINKIS_CLUSTER_NAME + "]" + 
repaceParams
           .getOrDefault("title", data.alertTitle)
-        val alertLevel =
-          if (StringUtils.isNotBlank(data.alertLevel) && 
StringUtils.isNumeric(data.alertLevel)) {
-            ImsAlertLevel.withName(repaceParams.getOrDefault("monitorLevel", 
data.alertLevel))
-          } else {
+        val alertLevel = {
+          if (
+              repaceParams.containsKey("$alterLevel") && StringUtils.isNumeric(
+                repaceParams.get("$alterLevel")
+              )
+          ) {
             ImsAlertLevel.withName(
-              repaceParams.getOrDefault("monitorLevel", 
ImsAlertLevel.WARN.toString)
+              repaceParams.getOrDefault("monitorLevel", 
repaceParams.get("$alterLevel"))
             )
+          } else {
+            if (StringUtils.isNotBlank(data.alertLevel) && 
StringUtils.isNumeric(data.alertLevel)) {
+              ImsAlertLevel.withName(repaceParams.getOrDefault("monitorLevel", 
data.alertLevel))
+            } else {
+              ImsAlertLevel.withName(
+                repaceParams.getOrDefault("monitorLevel", 
ImsAlertLevel.WARN.toString)
+              )
+            }
           }
+        }
 
         val alertDesc = Utils.tryAndWarn(
           ImsAlertDesc(
diff --git 
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/utils/job/JohistoryUtils.scala
similarity index 58%
copy from 
linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
copy to 
linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/utils/job/JohistoryUtils.scala
index aa73471c49..deedf5847b 100644
--- 
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
+++ 
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/utils/job/JohistoryUtils.scala
@@ -15,17 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.linkis.monitor.jobhistory;
+package org.apache.linkis.monitor.utils.job
 
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Date;
+import org.apache.linkis.monitor.core.pac.ScannedData
 
-public class QueryUtils {
+import java.util
 
-  private static DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss.SSS");
+import scala.collection.JavaConverters._
 
-  public static String dateToString(Date date) {
-    return dateFormat.format(date);
+object JohistoryUtils {
+
+  def getJobhistorySanData(data: util.List[ScannedData]): List[Any] = {
+    if (data == null) {
+      return List.empty[ScannedData]
+    }
+    val scalaData = data.asScala
+    val result = scalaData.flatMap { dataList =>
+      if (dataList != null && dataList.getData() != null) {
+        dataList.getData().asScala
+      } else {
+        List.empty[ScannedData]
+      }
+    }.toList
+    result
   }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to