This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/master by this push:
new ef268fb33c Optimize monitor module (#5216)
ef268fb33c is described below
commit ef268fb33c0f1d1c4f8c15a139628e443fb02998
Author: Liboyistu <[email protected]>
AuthorDate: Mon Sep 22 17:14:51 2025 +0800
Optimize monitor module (#5216)
Co-authored-by: peacewong <[email protected]>
---
.../linkis/monitor/config/ListenerConfig.java | 4 +-
.../department/dao/UserDepartmentInfoMapper.java} | 34 ++--
.../department/entity/UserDepartmentInfo.java | 150 +++++++++++++++++
.../ClientSingleton.java} | 92 ++---------
.../linkis/monitor/jobhistory/QueryUtils.java | 2 +-
.../monitor/scheduled/EntranceTaskMonitor.java | 7 +-
.../monitor/scheduled/JobHistoryMonitor.java | 67 +++++++-
.../linkis/monitor/scheduled/ResourceMonitor.java | 2 +-
.../monitor/scheduled/UserDepartmentInfoSync.java | 146 +++++++++++++++++
.../linkis/monitor/scheduled/UserModeMonitor.java | 7 +-
.../apache/linkis/monitor/until/HttpsUntils.java | 181 ++++++++++-----------
.../linkis/monitor/until/JobMonitorUtils.java | 8 +-
.../apache/linkis/monitor/until/ThreadUtils.java | 9 +
.../resources/mapper/common/JobHistoryMapper.xml | 4 +-
.../mapper/common/UserDepartmentInfoMapper.xml | 57 +++++++
.../LinkisJobHistoryScanSpringConfiguration.scala | 5 +
.../linkis/monitor/client/MonitorHTTPClient.scala | 42 ++++-
.../client/MonitorHTTPClientClientImpl.scala | 4 +-
.../monitor/client/MonitorResourceClient.scala | 4 +-
.../monitor/client/MonitorResourceClientImpl.scala | 6 +-
.../linkis/monitor/constants/Constants.scala | 15 ++
.../linkis/monitor/factory/MapperFactory.scala | 11 ++
.../monitor/jobhistory/JobHistoryDataFetcher.scala | 94 +++++------
.../analyze/JobHistoryAnalyzeAlertSender.scala} | 21 ++-
.../analyze/JobHistoryAnalyzeHitEvent.scala} | 6 +-
.../jobhistory/analyze/JobHistoryAnalyzeRule.scala | 60 +++++++
.../jobhistory/jobtime/JobTimeExceedRule.scala | 3 +-
.../jobtime/StarrocksTimeExceedAlertSender.scala | 84 ++++++++++
.../jobtime/StarrocksTimeExceedHitEvent.scala} | 6 +-
.../jobtime/StarrocksTimeExceedRule.scala | 128 +++++++++++++++
.../jobtime/StarrocksTimeKillAlertSender.scala} | 14 +-
.../jobhistory/jobtime/StarrocksTimeKillRule.scala | 125 ++++++++++++++
...{EmsListAction.scala => AnalyzeJobAction.scala} | 44 ++---
.../monitor/request/DataSourceParamsAction.scala | 92 +++++++++++
.../linkis/monitor/request/EmsListAction.scala | 2 +-
.../monitor/request/EntranceTaskAction.scala | 2 +-
.../linkis/monitor/request/KeyvalueAction.scala | 80 +++++++++
.../linkis/monitor/request/KillJobAction.scala | 80 +++++++++
.../monitor/request/MonitorResourceAction.scala | 3 +-
.../apache/linkis/monitor/request/UserAction.scala | 10 +-
.../monitor/response/AnalyzeJobResultAction.scala} | 20 ++-
.../linkis/monitor/response/KeyvalueResult.scala} | 23 ++-
.../monitor/response/KillJobResultAction.scala} | 20 ++-
.../utils/alert/ims/MonitorAlertUtils.scala | 31 ++--
.../linkis/monitor/utils/job/JohistoryUtils.scala} | 28 +++-
45 files changed, 1458 insertions(+), 375 deletions(-)
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/config/ListenerConfig.java
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/config/ListenerConfig.java
index eb5c11af87..98aec85f00 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/config/ListenerConfig.java
+++
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/config/ListenerConfig.java
@@ -17,7 +17,7 @@
package org.apache.linkis.monitor.config;
-import org.apache.linkis.monitor.until.HttpsUntils;
+import org.apache.linkis.monitor.entity.ClientSingleton;
import org.apache.linkis.monitor.until.ThreadUtils;
import org.apache.linkis.monitor.utils.log.LogUtils;
@@ -38,7 +38,7 @@ public class ListenerConfig {
private void shutdownEntrance(ContextClosedEvent event) {
try {
ThreadUtils.executors.shutdown();
- HttpsUntils.client.close();
+ ClientSingleton.getInstance().close();
} catch (IOException e) {
logger.error("ListenerConfig error msg {}", e.getMessage());
}
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClientClientImpl.scala
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/department/dao/UserDepartmentInfoMapper.java
similarity index 51%
copy from
linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClientClientImpl.scala
copy to
linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/department/dao/UserDepartmentInfoMapper.java
index 5554701571..1b37d30cb5 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClientClientImpl.scala
+++
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/department/dao/UserDepartmentInfoMapper.java
@@ -15,25 +15,31 @@
* limitations under the License.
*/
-package org.apache.linkis.monitor.client
+package org.apache.linkis.monitor.department.dao;
-import org.apache.linkis.httpclient.dws.DWSHttpClient
-import org.apache.linkis.httpclient.dws.config.DWSClientConfig
-import org.apache.linkis.httpclient.request.Action
-import org.apache.linkis.httpclient.response.Result
-import org.apache.linkis.monitor.request.MonitorResourceAction
+import org.apache.linkis.monitor.department.entity.UserDepartmentInfo;
-class MonitorHTTPClientClientImpl(clientConfig: DWSClientConfig) extends
MonitorHTTPClient {
+import org.apache.ibatis.annotations.Mapper;
+import org.apache.ibatis.annotations.Param;
- private val dwsHttpClient =
- new DWSHttpClient(clientConfig, "Linkis-MonitorResource-Execution-Thread")
+import org.springframework.transaction.annotation.Transactional;
- override protected[client] def executeJob(ujesJobAction:
MonitorResourceAction): Result =
- ujesJobAction match {
+import java.util.List;
- case action: Action => dwsHttpClient.execute(action)
+@Mapper
+public interface UserDepartmentInfoMapper {
- }
+ void insertUser(UserDepartmentInfo user);
- override def close(): Unit = dwsHttpClient.close()
+ @Transactional(rollbackFor = Exception.class)
+ int batchInsertUsers(@Param("userDepartmentInfos") List<UserDepartmentInfo>
userDepartmentInfos);
+
+ void updateUser(UserDepartmentInfo user);
+
+ UserDepartmentInfo selectUser(@Param("userName") String userName);
+
+ @Transactional(rollbackFor = Exception.class)
+ void deleteUser();
+
+ List<UserDepartmentInfo> selectAllUsers();
}
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/department/entity/UserDepartmentInfo.java
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/department/entity/UserDepartmentInfo.java
new file mode 100644
index 0000000000..c5ea27e7ce
--- /dev/null
+++
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/department/entity/UserDepartmentInfo.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.monitor.department.entity;
+
+import java.util.Date;
+
+public class UserDepartmentInfo {
+
+ private String clusterCode;
+
+ private String userType;
+ private String userName;
+ private String orgId;
+ private String orgName;
+ private String queueName;
+ private String dbName;
+ private String interfaceUser;
+ private String isUnionAnalyse;
+ private Date createTime;
+ private String userItsmNo;
+
+ // 构造函数、getter和setter方法
+ public UserDepartmentInfo(
+ String clusterCode,
+ String userType,
+ String userName,
+ String orgId,
+ String orgName,
+ String queueName,
+ String dbName,
+ String interfaceUser,
+ String isUnionAnalyse,
+ Date createTime,
+ String userItsmNo) {
+ this.clusterCode = clusterCode;
+ this.userType = userType;
+ this.userName = userName;
+ this.orgId = orgId;
+ this.orgName = orgName;
+ this.queueName = queueName;
+ this.dbName = dbName;
+ this.interfaceUser = interfaceUser;
+ this.isUnionAnalyse = isUnionAnalyse;
+ this.createTime = createTime;
+ this.userItsmNo = userItsmNo;
+ }
+
+ public String getClusterCode() {
+ return clusterCode;
+ }
+
+ public void setClusterCode(String clusterCode) {
+ this.clusterCode = clusterCode;
+ }
+
+ public String getUserType() {
+ return userType;
+ }
+
+ public void setUserType(String userType) {
+ this.userType = userType;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+
+ public String getOrgId() {
+ return orgId;
+ }
+
+ public void setOrgId(String orgId) {
+ this.orgId = orgId;
+ }
+
+ public String getOrgName() {
+ return orgName;
+ }
+
+ public void setOrgName(String orgName) {
+ this.orgName = orgName;
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public void setQueueName(String queueName) {
+ this.queueName = queueName;
+ }
+
+ public String getDbName() {
+ return dbName;
+ }
+
+ public void setDbName(String dbName) {
+ this.dbName = dbName;
+ }
+
+ public String getInterfaceUser() {
+ return interfaceUser;
+ }
+
+ public void setInterfaceUser(String interfaceUser) {
+ this.interfaceUser = interfaceUser;
+ }
+
+ public String getIsUnionAnalyse() {
+ return isUnionAnalyse;
+ }
+
+ public void setIsUnionAnalyse(String isUnionAnalyse) {
+ this.isUnionAnalyse = isUnionAnalyse;
+ }
+
+ public Date getCreateTime() {
+ return createTime;
+ }
+
+ public void setCreateTime(Date createTime) {
+ this.createTime = createTime;
+ }
+
+ public String getUserItsmNo() {
+ return userItsmNo;
+ }
+
+ public void setUserItsmNo(String userItsmNo) {
+ this.userItsmNo = userItsmNo;
+ }
+}
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/HttpsUntils.java
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/entity/ClientSingleton.java
similarity index 50%
copy from
linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/HttpsUntils.java
copy to
linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/entity/ClientSingleton.java
index a504a9d41d..3c660426b3 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/HttpsUntils.java
+++
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/entity/ClientSingleton.java
@@ -15,65 +15,37 @@
* limitations under the License.
*/
-package org.apache.linkis.monitor.until;
+package org.apache.linkis.monitor.entity;
import org.apache.linkis.bml.conf.BmlConfiguration;
import org.apache.linkis.common.conf.Configuration;
-import org.apache.linkis.common.utils.Utils;
import
org.apache.linkis.httpclient.dws.authentication.TokenAuthenticationStrategy;
import org.apache.linkis.httpclient.dws.config.DWSClientConfig;
import org.apache.linkis.httpclient.dws.config.DWSClientConfigBuilder;
import org.apache.linkis.monitor.client.MonitorHTTPClient;
import org.apache.linkis.monitor.client.MonitorHTTPClientClientImpl;
-import org.apache.linkis.monitor.config.MonitorConfig;
-import org.apache.linkis.monitor.entity.IndexEntity;
-import org.apache.linkis.monitor.request.EmsListAction;
-import org.apache.linkis.monitor.request.EntranceTaskAction;
-import org.apache.linkis.monitor.response.EntranceTaskResult;
-import org.apache.linkis.server.BDPJettyServerHelper;
-import org.apache.linkis.ujes.client.response.EmsListResult;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.ContentType;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
-import org.springframework.util.Assert;
-
-import java.io.IOException;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class HttpsUntils {
- private static final Logger logger =
LoggerFactory.getLogger(HttpsUntils.class);
+public class ClientSingleton {
+ private static MonitorHTTPClient instance;
+ private static DWSClientConfig dwsClientConfig;
- public static DWSClientConfig dwsClientConfig = createClientConfig(null,
null);
- // IOUtils.closeQuietly(client);
- public static MonitorHTTPClient client = new
MonitorHTTPClientClientImpl(dwsClientConfig);
- public static final String localHost = Utils.getLocalHostname();
+ private ClientSingleton() {}
- public static Map<String, Object> sendHttp(String url, Map<String, Object>
properties)
- throws IOException {
- if (null == dwsClientConfig) {
- dwsClientConfig = createClientConfig(url, properties);
+ public static synchronized MonitorHTTPClient getInstance() {
+ if (instance == null) {
+ if (dwsClientConfig == null) {
+ dwsClientConfig = createClientConfig(null, null); // NOSONAR
+ }
+ instance = new MonitorHTTPClientClientImpl(dwsClientConfig);
}
- if (null == client) {
- client = new MonitorHTTPClientClientImpl(dwsClientConfig);
- }
- EmsListAction build = EmsListAction.newBuilder().setUser("hadoop").build();
- EmsListResult result = client.list(build);
- return result.getResultMap();
+ return instance;
}
public static DWSClientConfig createClientConfig(String url, Map<String,
Object> properties) {
@@ -89,7 +61,7 @@ public class HttpsUntils {
}
int maxConnection =
(int)
- parms.getOrDefault(
+ parms.getOrDefault( // NOSONAR
BmlConfiguration.CONNECTION_MAX_SIZE_SHORT_NAME(),
BmlConfiguration.CONNECTION_MAX_SIZE().getValue());
int connectTimeout =
@@ -132,42 +104,4 @@ public class HttpsUntils {
return clientConfig;
}
-
- public static Map<String, Object> getEntranceTask(String url, String user,
String Instance)
- throws IOException {
- if (null == dwsClientConfig) {
- dwsClientConfig = createClientConfig(null, null);
- }
- if (null == client) {
- client = new MonitorHTTPClientClientImpl(dwsClientConfig);
- }
- EntranceTaskAction build =
-
EntranceTaskAction.newBuilder().setUser(user).setInstance(Instance).build();
- EntranceTaskResult result = client.entranList(build);
- return result.getResultMap();
- }
-
- public static void sendIndex(List<IndexEntity> list) throws IOException {
- Map<String, Object> parm = new HashMap<>();
- parm.put("userAuthKey", MonitorConfig.ECM_TASK_USER_AUTHKEY.getValue());
- parm.put("metricDataList", list);
- String json = BDPJettyServerHelper.gson().toJson(parm);
-
- RequestConfig requestConfig = RequestConfig.DEFAULT;
- StringEntity entity =
- new StringEntity(
- json,
ContentType.create(ContentType.APPLICATION_JSON.getMimeType(), "UTF-8"));
- entity.setContentEncoding("UTF-8");
-
- HttpPost httpPost = new HttpPost(MonitorConfig.ECM_TASK_IMURL.getValue());
- httpPost.setConfig(requestConfig);
- httpPost.setEntity(entity);
-
- CloseableHttpClient httpClient = HttpClients.createDefault();
- CloseableHttpResponse execute = httpClient.execute(httpPost);
- String responseStr = EntityUtils.toString(execute.getEntity(), "UTF-8");
- Map<String, String> map =
BDPJettyServerHelper.gson().fromJson(responseStr, Map.class);
- logger.info("send index response :{}", map);
- Assert.isTrue(!"0".equals(map.get("resultCode")), map.get("resultMsg"));
- }
}
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
index aa73471c49..688fda5afb 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
+++
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
@@ -23,7 +23,7 @@ import java.util.Date;
public class QueryUtils {
- private static DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss.SSS");
+ private static DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss.SSS"); // NOSONAR
public static String dateToString(Date date) {
return dateFormat.format(date);
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/EntranceTaskMonitor.java
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/EntranceTaskMonitor.java
index edce194481..02ea5322e9 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/EntranceTaskMonitor.java
+++
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/EntranceTaskMonitor.java
@@ -121,7 +121,8 @@ public class EntranceTaskMonitor {
});
Map<String, Object> likisData = null;
try {
- likisData = MapUtils.getMap(HttpsUntils.getEntranceTask(null, "hadoop",
null), "data");
+ likisData =
+ MapUtils.getMap(HttpsUntils.getEntranceTask(null,
Constants.ADMIN_USER(), null), "data");
logger.info("TaskMonitor hadoop response {}:", likisData);
} catch (IOException e) {
logger.warn("failed to get EntranceTask data");
@@ -163,7 +164,9 @@ public class EntranceTaskMonitor {
try {
// 通过serviceInstance 获取entrance中任务数量信息
Map<String, Object> entranceData =
- MapUtils.getMap(HttpsUntils.getEntranceTask(null, "hadoop",
entranceService), "data");
+ MapUtils.getMap(
+ HttpsUntils.getEntranceTask(null, Constants.ADMIN_USER(),
entranceService),
+ "data");
int runningTaskNumber = 0;
int queuedTaskNumber = 0;
int totalTaskNumber = 0;
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java
index be7758f7a0..c1a7a16d06 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java
+++
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java
@@ -23,12 +23,13 @@ import org.apache.linkis.monitor.core.pac.DataFetcher;
import org.apache.linkis.monitor.core.scanner.AnomalyScanner;
import org.apache.linkis.monitor.core.scanner.DefaultScanner;
import org.apache.linkis.monitor.factory.MapperFactory;
+import
org.apache.linkis.monitor.jobhistory.analyze.JobHistoryAnalyzeAlertSender;
+import org.apache.linkis.monitor.jobhistory.analyze.JobHistoryAnalyzeRule;
import org.apache.linkis.monitor.jobhistory.errorcode.JobHistoryErrCodeRule;
import
org.apache.linkis.monitor.jobhistory.errorcode.JobHistoryErrorCodeAlertSender;
import org.apache.linkis.monitor.jobhistory.index.JobIndexRule;
import org.apache.linkis.monitor.jobhistory.index.JobIndexSender;
-import org.apache.linkis.monitor.jobhistory.jobtime.JobTimeExceedAlertSender;
-import org.apache.linkis.monitor.jobhistory.jobtime.JobTimeExceedRule;
+import org.apache.linkis.monitor.jobhistory.jobtime.*;
import org.apache.linkis.monitor.jobhistory.labels.JobHistoryLabelsAlertSender;
import org.apache.linkis.monitor.jobhistory.labels.JobHistoryLabelsRule;
import org.apache.linkis.monitor.jobhistory.runtime.CommonJobRunTimeRule;
@@ -75,7 +76,7 @@ public class JobHistoryMonitor {
@Scheduled(cron = "${linkis.monitor.jobHistory.finished.cron}")
public void jobHistoryFinishedScan() {
logger.info("Start scan jobHistoryFinishedScan");
- long intervalMs = 20 * 60 * 1000;
+ long intervalMs = 20 * 60 * 1000L;
long maxIntervalMs = Constants.ERRORCODE_MAX_INTERVALS_SECONDS() * 1000;
long endTime = System.currentTimeMillis();
long startTime = endTime - intervalMs;
@@ -98,7 +99,7 @@ public class JobHistoryMonitor {
logger.info("Get JobHistoryId from cache ID:" + id);
}
List<DataFetcher> fetchers =
- JobMonitorUtils.generateFetchersfortime(startTime, endTime, id,
"updated_time");
+ JobMonitorUtils.generateFetchersfortime(startTime, endTime, id,
"finished_job");
if (fetchers.isEmpty()) {
logger.warn("generated 0 dataFetchers, plz check input");
return;
@@ -169,6 +170,14 @@ public class JobHistoryMonitor {
} catch (Exception e) {
logger.warn("CommonJobRunTimeRule Scan Error msg: " + e.getMessage());
}
+ // 新增失败任务分析扫描
+ try {
+ JobHistoryAnalyzeRule jobHistoryAnalyzeRule =
+ new JobHistoryAnalyzeRule(new JobHistoryAnalyzeAlertSender());
+ scanner.addScanRule(jobHistoryAnalyzeRule);
+ } catch (Exception e) {
+ logger.warn("JobHistoryAnalyzeRule Scan Error msg: " + e.getMessage());
+ }
// 执行任务扫描
JobMonitorUtils.run(scanner, fetchers, true);
@@ -176,7 +185,7 @@ public class JobHistoryMonitor {
JobIndexRule jobIndexRule = new JobIndexRule(new JobIndexSender());
scannerIndex.addScanRule(jobIndexRule);
List<DataFetcher> createFetcher =
- JobMonitorUtils.generateFetchersfortime(startTime, endTime, id,
"department");
+ JobMonitorUtils.generateFetchersfortime(startTime, endTime, id, "");
JobMonitorUtils.run(scannerIndex, createFetcher, true);
}
@@ -193,7 +202,7 @@ public class JobHistoryMonitor {
AnomalyScanner scanner = new DefaultScanner();
boolean shouldStart = false;
List<DataFetcher> fetchers =
- JobMonitorUtils.generateFetchers(startTime, endTime, maxIntervalMs,
id, "created_time");
+ JobMonitorUtils.generateFetchers(startTime, endTime, maxIntervalMs,
id, "unfinished_job");
if (fetchers.isEmpty()) {
logger.warn("generated 0 dataFetchers, plz check input");
return;
@@ -215,4 +224,50 @@ public class JobHistoryMonitor {
}
JobMonitorUtils.run(scanner, fetchers, shouldStart);
}
+
+ /** * 每10分钟扫描一次,扫描两个小时之内的任务,告警要求:管理台配置告警相关参数 */
+ @Scheduled(cron = "${linkis.monitor.jdbc.timeout.alert.cron:0 0/10 0 * * ?}")
+ public void jdbcUnfinishedAlertScan() {
+ long id =
+
Optional.ofNullable(CacheUtils.cacheBuilder.getIfPresent("jdbcUnfinishedAlertScan"))
+ .orElse(MonitorConfig.JOB_HISTORY_TIME_EXCEED.getValue());
+ long intervalMs = 7200 * 1000L;
+ long maxIntervalMs = Constants.ERRORCODE_MAX_INTERVALS_SECONDS() * 1000;
+ long endTime = System.currentTimeMillis();
+ long startTime = endTime - intervalMs;
+ AnomalyScanner scanner = new DefaultScanner();
+ List<DataFetcher> fetchers =
+ JobMonitorUtils.generateFetchers(startTime, endTime, maxIntervalMs,
id, "");
+ if (fetchers.isEmpty()) {
+ logger.warn("jdbcUnfinishedScan generated 0 dataFetchers, plz check
input");
+ return;
+ }
+ StarrocksTimeExceedRule starrocksTimeExceedRule =
+ new StarrocksTimeExceedRule(new StarrocksTimeExceedAlertSender());
+ scanner.addScanRule(starrocksTimeExceedRule);
+ JobMonitorUtils.run(scanner, fetchers, true);
+ }
+
+ /** * 每10分钟扫描一次,扫描两个小时之内的任务,满足要求触发kill kill要求:数据源配置kill参数 */
+ @Scheduled(cron = "${linkis.monitor.jdbc.timeout.kill.cron:0 0/10 0 * * ?}")
+ public void jdbcUnfinishedKillScan() {
+ long id =
+
Optional.ofNullable(CacheUtils.cacheBuilder.getIfPresent("jdbcUnfinishedKillScan"))
+ .orElse(MonitorConfig.JOB_HISTORY_TIME_EXCEED.getValue());
+ long intervalMs = 7200 * 1000L;
+ long maxIntervalMs = Constants.ERRORCODE_MAX_INTERVALS_SECONDS() * 1000;
+ long endTime = System.currentTimeMillis();
+ long startTime = endTime - intervalMs;
+ AnomalyScanner scanner = new DefaultScanner();
+ List<DataFetcher> fetchers =
+ JobMonitorUtils.generateFetchers(startTime, endTime, maxIntervalMs,
id, "");
+ if (fetchers.isEmpty()) {
+ logger.warn("jdbcUnfinishedScan generated 0 dataFetchers, plz check
input");
+ return;
+ }
+ StarrocksTimeKillRule starrocksTimeKillRule =
+ new StarrocksTimeKillRule(new StarrocksTimeKillAlertSender());
+ scanner.addScanRule(starrocksTimeKillRule);
+ JobMonitorUtils.run(scanner, fetchers, true);
+ }
}
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/ResourceMonitor.java
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/ResourceMonitor.java
index e8658050e0..a9a1a30887 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/ResourceMonitor.java
+++
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/ResourceMonitor.java
@@ -61,7 +61,7 @@ public class ResourceMonitor {
// 获取emNode资源信息
List<Map<String, Object>> emNodeVoList = new ArrayList<>();
try {
- Map<String, Object> resultmap = HttpsUntils.sendHttp(null, null);
+ Map<String, Object> resultmap = HttpsUntils.getEmsResourceList();
// got interface data
Map<String, List<Map<String, Object>>> data = MapUtils.getMap(resultmap,
"data");
emNodeVoList = data.getOrDefault("EMs", new ArrayList<>());
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/UserDepartmentInfoSync.java
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/UserDepartmentInfoSync.java
new file mode 100644
index 0000000000..338d220489
--- /dev/null
+++
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/UserDepartmentInfoSync.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.monitor.scheduled;
+
+import org.apache.linkis.monitor.constants.Constants;
+import org.apache.linkis.monitor.department.dao.UserDepartmentInfoMapper;
+import org.apache.linkis.monitor.department.entity.UserDepartmentInfo;
+import org.apache.linkis.monitor.factory.MapperFactory;
+import org.apache.linkis.monitor.utils.alert.AlertDesc;
+import org.apache.linkis.monitor.utils.alert.ims.MonitorAlertUtils;
+import org.apache.linkis.monitor.utils.alert.ims.PooledImsAlertUtils;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.springframework.context.annotation.PropertySource;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import com.github.pagehelper.PageHelper;
+import com.github.pagehelper.PageInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Component
+@PropertySource(value = "classpath:linkis-et-monitor.properties", encoding =
"UTF-8")
+public class UserDepartmentInfoSync {
+
+ private static final Logger logger =
LoggerFactory.getLogger(ResourceMonitor.class);
+ private static final int pagesize = 5000;
+
+ private static final UserDepartmentInfoMapper userDepartmentInfoMapper =
+ MapperFactory.getUserDepartmentInfoMapper();
+
+ @Scheduled(cron = "${linkis.monitor.org.user.sync.cron:0 0 11 1/7 * ?}")
+ public static void DepartmentInfoSync() {
+ // 获取linkis_org_user_sync信息
+ // 收集异常用户
+ List<UserDepartmentInfo> alterList = new ArrayList<>();
+ int pageNum = 1; // 初始pageNum
+ while (true) {
+ List<UserDepartmentInfo> departSyncList = null;
+ PageHelper.startPage(pageNum, pagesize);
+ try {
+ departSyncList = userDepartmentInfoMapper.selectAllUsers();
+ } finally {
+ PageHelper.clearPage();
+ }
+ PageInfo<UserDepartmentInfo> pageInfo = new PageInfo<>(departSyncList);
+ // 处理 departSyncList 中的数据
+ processDepartSyncList(pageInfo.getList(), alterList);
+ if (!pageInfo.isHasNextPage()) {
+ break; // 没有更多记录,退出循环
+ }
+ pageNum++;
+ }
+ // 统计异常名称,然后发送告警
+ String usernames =
+ alterList.stream()
+ .filter(s -> StringUtils.isNotBlank(s.getUserName()))
+ .map(UserDepartmentInfo::getUserName)
+ .limit(5)
+ .collect(Collectors.joining(","));
+ if (StringUtils.isNotBlank(usernames)) {
+ HashMap<String, String> replaceParm = new HashMap<>();
+ replaceParm.put("$user", usernames);
+ replaceParm.put("$count", String.valueOf(alterList.size()));
+ Map<String, AlertDesc> ecmResourceAlerts =
+ MonitorAlertUtils.getAlerts(Constants.DEPARTMENT_USER_IM(),
replaceParm);
+ PooledImsAlertUtils.addAlert(ecmResourceAlerts.get("12019"));
+ }
+ }
+
+ private static void processDepartSyncList(
+ List<UserDepartmentInfo> departSyncList, List<UserDepartmentInfo>
alterList) {
+ if (CollectionUtils.isEmpty(departSyncList)) {
+ logger.info("No user department info to sync");
+ // 并且发送告警通知
+ return;
+ } else {
+ logger.info("Start to sync user department info");
+ // 收集异常用户
+ List<UserDepartmentInfo> errorUserList =
+ departSyncList.stream()
+ .filter(
+ userDepartmentInfo ->
+ StringUtils.isNotBlank(userDepartmentInfo.getUserName())
+ &&
(StringUtils.isBlank(userDepartmentInfo.getOrgId())
+ ||
StringUtils.isBlank(userDepartmentInfo.getOrgName())))
+ .collect(Collectors.toList());
+ // 收集需要同步用户
+ List<UserDepartmentInfo> syncList =
+ departSyncList.stream()
+ .filter(
+ userDepartmentInfo ->
+ StringUtils.isNotBlank(userDepartmentInfo.getUserName())
+ &&
StringUtils.isNotBlank(userDepartmentInfo.getOrgId())
+ &&
StringUtils.isNotBlank(userDepartmentInfo.getOrgName()))
+ .collect(Collectors.toList());
+ if (!CollectionUtils.isEmpty(errorUserList)) {
+ alterList.addAll(errorUserList);
+ }
+ if (!CollectionUtils.isEmpty(syncList)) {
+ // 同步用户
+ List<UserDepartmentInfo> insertList = new ArrayList<>();
+ syncList.forEach(
+ departSyncInfo -> {
+ UserDepartmentInfo userDepartmentInfo =
+
userDepartmentInfoMapper.selectUser(departSyncInfo.getUserName());
+ if (null == userDepartmentInfo) {
+ insertList.add(departSyncInfo);
+ } else {
+ if
((!departSyncInfo.getOrgId().equals(userDepartmentInfo.getOrgId()))
+ ||
(!departSyncInfo.getOrgName().equals(userDepartmentInfo.getOrgName()))) {
+ userDepartmentInfoMapper.updateUser(departSyncInfo);
+ }
+ }
+ });
+ if (!CollectionUtils.isEmpty(insertList)) {
+ userDepartmentInfoMapper.batchInsertUsers(insertList);
+ }
+ }
+ }
+ }
+}
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/UserModeMonitor.java
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/UserModeMonitor.java
index ad6f861479..e55e01352d 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/UserModeMonitor.java
+++
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/UserModeMonitor.java
@@ -24,7 +24,7 @@ import
org.apache.linkis.httpclient.dws.config.DWSClientConfig;
import org.apache.linkis.manager.label.constant.LabelKeyConstant;
import org.apache.linkis.monitor.config.MonitorConfig;
import org.apache.linkis.monitor.constants.Constants;
-import org.apache.linkis.monitor.until.HttpsUntils;
+import org.apache.linkis.monitor.entity.ClientSingleton;
import org.apache.linkis.monitor.utils.alert.AlertDesc;
import org.apache.linkis.monitor.utils.alert.ims.MonitorAlertUtils;
import org.apache.linkis.monitor.utils.alert.ims.PooledImsAlertUtils;
@@ -57,7 +57,8 @@ public class UserModeMonitor {
private static final Logger logger =
LoggerFactory.getLogger(UserModeMonitor.class);
- private static final DWSClientConfig clientConfig =
HttpsUntils.dwsClientConfig;
+ private static final DWSClientConfig clientConfig =
+ ClientSingleton.createClientConfig(null, null);
private static final UJESClient client = new UJESClientImpl(clientConfig);
@@ -138,7 +139,7 @@ public class UserModeMonitor {
public void dbJob() {
Map<String, Object> properties = new HashMap<>();
properties.put("readTimeout",
MonitorConfig.USER_MODE_INTERFACE_TIMEOUT.getValue());
- DWSClientConfig clientConfig = HttpsUntils.createClientConfig(null,
properties);
+ DWSClientConfig clientConfig = ClientSingleton.createClientConfig(null,
properties);
UJESClientImpl ujesClient = new UJESClientImpl(clientConfig);
GetTableStatisticInfoAction builder =
GetTableStatisticInfoAction.builder()
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/HttpsUntils.java
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/HttpsUntils.java
index a504a9d41d..0476765594 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/HttpsUntils.java
+++
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/HttpsUntils.java
@@ -17,24 +17,26 @@
package org.apache.linkis.monitor.until;
-import org.apache.linkis.bml.conf.BmlConfiguration;
-import org.apache.linkis.common.conf.Configuration;
import org.apache.linkis.common.utils.Utils;
-import
org.apache.linkis.httpclient.dws.authentication.TokenAuthenticationStrategy;
-import org.apache.linkis.httpclient.dws.config.DWSClientConfig;
-import org.apache.linkis.httpclient.dws.config.DWSClientConfigBuilder;
+import
org.apache.linkis.datasource.client.response.GetInfoPublishedByDataSourceNameResult;
+import org.apache.linkis.governance.common.conf.GovernanceCommonConf;
import org.apache.linkis.monitor.client.MonitorHTTPClient;
-import org.apache.linkis.monitor.client.MonitorHTTPClientClientImpl;
import org.apache.linkis.monitor.config.MonitorConfig;
+import org.apache.linkis.monitor.constants.Constants;
+import org.apache.linkis.monitor.entity.ClientSingleton;
import org.apache.linkis.monitor.entity.IndexEntity;
-import org.apache.linkis.monitor.request.EmsListAction;
-import org.apache.linkis.monitor.request.EntranceTaskAction;
+import org.apache.linkis.monitor.jobhistory.entity.JobHistory;
+import org.apache.linkis.monitor.request.*;
+import org.apache.linkis.monitor.response.AnalyzeJobResultAction;
import org.apache.linkis.monitor.response.EntranceTaskResult;
+import org.apache.linkis.monitor.response.KeyvalueResult;
+import org.apache.linkis.monitor.response.KillJobResultAction;
+import org.apache.linkis.protocol.utils.ZuulEntranceUtils;
import org.apache.linkis.server.BDPJettyServerHelper;
import org.apache.linkis.ujes.client.response.EmsListResult;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
@@ -47,10 +49,8 @@ import org.apache.http.util.EntityUtils;
import org.springframework.util.Assert;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,89 +58,18 @@ import org.slf4j.LoggerFactory;
public class HttpsUntils {
private static final Logger logger =
LoggerFactory.getLogger(HttpsUntils.class);
- public static DWSClientConfig dwsClientConfig = createClientConfig(null,
null);
- // IOUtils.closeQuietly(client);
- public static MonitorHTTPClient client = new
MonitorHTTPClientClientImpl(dwsClientConfig);
public static final String localHost = Utils.getLocalHostname();
- public static Map<String, Object> sendHttp(String url, Map<String, Object>
properties)
- throws IOException {
- if (null == dwsClientConfig) {
- dwsClientConfig = createClientConfig(url, properties);
- }
- if (null == client) {
- client = new MonitorHTTPClientClientImpl(dwsClientConfig);
- }
- EmsListAction build = EmsListAction.newBuilder().setUser("hadoop").build();
+ public static Map<String, Object> getEmsResourceList() throws IOException {
+ MonitorHTTPClient client = ClientSingleton.getInstance();
+ EmsListAction build =
EmsListAction.newBuilder().setUser(Constants.ADMIN_USER()).build();
EmsListResult result = client.list(build);
return result.getResultMap();
}
- public static DWSClientConfig createClientConfig(String url, Map<String,
Object> properties) {
- String realUrl = "";
- if (StringUtils.isBlank(url)) {
- realUrl = Configuration.getGateWayURL();
- } else {
- realUrl = url;
- }
- Map<String, Object> parms = new HashMap<>();
- if (MapUtils.isNotEmpty(properties)) {
- parms = properties;
- }
- int maxConnection =
- (int)
- parms.getOrDefault(
- BmlConfiguration.CONNECTION_MAX_SIZE_SHORT_NAME(),
- BmlConfiguration.CONNECTION_MAX_SIZE().getValue());
- int connectTimeout =
- (int)
- parms.getOrDefault(
- BmlConfiguration.CONNECTION_TIMEOUT_SHORT_NAME(),
- BmlConfiguration.CONNECTION_TIMEOUT().getValue());
- int readTimeout =
- (int)
- parms.getOrDefault(
- BmlConfiguration.CONNECTION_READ_TIMEOUT_SHORT_NAME(),
- BmlConfiguration.CONNECTION_READ_TIMEOUT().getValue());
- String tokenKey =
- (String)
- parms.getOrDefault(
- BmlConfiguration.AUTH_TOKEN_KEY_SHORT_NAME(),
- BmlConfiguration.AUTH_TOKEN_KEY().getValue());
- String tokenValue =
- (String)
- parms.getOrDefault(
- BmlConfiguration.AUTH_TOKEN_VALUE_SHORT_NAME(),
- BmlConfiguration.AUTH_TOKEN_VALUE().getValue());
-
- DWSClientConfig clientConfig =
- ((DWSClientConfigBuilder)
- (DWSClientConfigBuilder.newBuilder()
- .addServerUrl(realUrl)
- .connectionTimeout(connectTimeout)
- .discoveryEnabled(false)
- .discoveryFrequency(1, TimeUnit.MINUTES)
- .loadbalancerEnabled(false)
- .maxConnectionSize(maxConnection)
- .retryEnabled(false)
- .readTimeout(readTimeout)
- .setAuthenticationStrategy(new
TokenAuthenticationStrategy())
- .setAuthTokenKey(tokenKey)
- .setAuthTokenValue(tokenValue)))
- .setDWSVersion("v1")
- .build();
-
- return clientConfig;
- }
-
public static Map<String, Object> getEntranceTask(String url, String user,
String Instance)
throws IOException {
- if (null == dwsClientConfig) {
- dwsClientConfig = createClientConfig(null, null);
- }
- if (null == client) {
- client = new MonitorHTTPClientClientImpl(dwsClientConfig);
- }
+ MonitorHTTPClient client = ClientSingleton.getInstance();
EntranceTaskAction build =
EntranceTaskAction.newBuilder().setUser(user).setInstance(Instance).build();
EntranceTaskResult result = client.entranList(build);
@@ -156,8 +85,10 @@ public class HttpsUntils {
RequestConfig requestConfig = RequestConfig.DEFAULT;
StringEntity entity =
new StringEntity(
- json,
ContentType.create(ContentType.APPLICATION_JSON.getMimeType(), "UTF-8"));
- entity.setContentEncoding("UTF-8");
+ json,
+ ContentType.create(
+ ContentType.APPLICATION_JSON.getMimeType(),
StandardCharsets.UTF_8.toString()));
+ entity.setContentEncoding(StandardCharsets.UTF_8.toString());
HttpPost httpPost = new HttpPost(MonitorConfig.ECM_TASK_IMURL.getValue());
httpPost.setConfig(requestConfig);
@@ -165,9 +96,77 @@ public class HttpsUntils {
CloseableHttpClient httpClient = HttpClients.createDefault();
CloseableHttpResponse execute = httpClient.execute(httpPost);
- String responseStr = EntityUtils.toString(execute.getEntity(), "UTF-8");
+ String responseStr =
+ EntityUtils.toString(execute.getEntity(),
StandardCharsets.UTF_8.toString());
Map<String, String> map =
BDPJettyServerHelper.gson().fromJson(responseStr, Map.class);
logger.info("send index response :{}", map);
Assert.isTrue(!"0".equals(map.get("resultCode")), map.get("resultMsg"));
}
+
+ public static String getJDBCConf(String user, String conf) {
+ MonitorHTTPClient client = ClientSingleton.getInstance();
+ KeyvalueAction build =
+ KeyvalueAction.newBuilder()
+ .setVersion("4")
+ .setEngineType(Constants.JDBC_ENGINE())
+ .setCreator("IDE")
+ .setConfigKey(conf)
+ .setUser(user)
+ .build();
+ KeyvalueResult result = client.getConfKeyValue(build);
+ Map data = MapUtils.getMap(result.getResultMap(), "data", new HashMap<>());
+ ArrayList arrayList = (ArrayList) data.get("configValues");
+ if (CollectionUtils.isNotEmpty(arrayList)) {
+ String json = BDPJettyServerHelper.gson().toJson(arrayList.get(0));
+ Map map = BDPJettyServerHelper.gson().fromJson(json, Map.class);
+ return MapUtils.getString(map, "configValue", "");
+ } else {
+ return "";
+ }
+ }
+
+ public static Map getDatasourceConf(String user, String datasourceName) {
+ MonitorHTTPClient client = ClientSingleton.getInstance();
+ DataSourceParamsAction dataSourceParamsAction =
+ DataSourceParamsAction.builder()
+ .setSystem(Constants.ALERT_SUB_SYSTEM_ID())
+ .setDataSourceName(datasourceName)
+ .setUser(user)
+ .build();
+ GetInfoPublishedByDataSourceNameResult result =
+ client.getInfoByDataSourceInfo(dataSourceParamsAction);
+ Map data = MapUtils.getMap(result.getResultMap(), "data", new HashMap<>());
+ Map datasourceInfoMap = MapUtils.getMap(data, "info", new HashMap<>());
+ return datasourceInfoMap;
+ }
+
+ public static void killJob(JobHistory jobHistory) {
+ MonitorHTTPClient client = ClientSingleton.getInstance();
+ String[] split =
jobHistory.getInstances().split(Constants.SPLIT_DELIMITER());
+ String execID =
+ ZuulEntranceUtils.generateExecID(
+ jobHistory.getJobReqId(),
+ GovernanceCommonConf.ENTRANCE_SERVICE_NAME().getValue(),
+ split);
+ KillJobAction killJobAction =
+ KillJobAction.builder()
+ .setIdList(Collections.singletonList(execID))
+ .setTaskIDList(Collections.singletonList(jobHistory.getId()))
+ .setExecID(execID)
+ .setUser(jobHistory.getSubmitUser())
+ .build();
+ KillJobResultAction killJobResultAction = client.killJob(killJobAction);
+ Map data = MapUtils.getMap(killJobResultAction.getResultMap(), "data", new
HashMap<>());
+ }
+
+ public static void analyzeJob(JobHistory jobHistory) {
+ MonitorHTTPClient client = ClientSingleton.getInstance();
+
+ AnalyzeJobAction analyzeJobAction =
+ AnalyzeJobAction.newBuilder()
+ .setTaskID(String.valueOf(jobHistory.getId()))
+ .setUser(Constants.ADMIN_USER())
+ .build();
+ AnalyzeJobResultAction analyzeJobResultAction =
client.analyzeJob(analyzeJobAction);
+ }
}
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/JobMonitorUtils.java
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/JobMonitorUtils.java
index 3366014c89..66dd4a3b68 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/JobMonitorUtils.java
+++
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/JobMonitorUtils.java
@@ -44,14 +44,14 @@ public class JobMonitorUtils {
}
public static List<DataFetcher> generateFetchers(
- long startTime, long endTime, long maxIntervalMs, long id, String
timeType) {
+ long startTime, long endTime, long maxIntervalMs, long id, String
jobStatus) {
List<DataFetcher> ret = new ArrayList<>();
long pe = endTime;
long ps;
while (pe > startTime) {
ps = Math.max(pe - maxIntervalMs, startTime);
String[] fetcherArgs =
- new String[] {String.valueOf(ps), String.valueOf(pe),
String.valueOf(id), timeType};
+ new String[] {String.valueOf(ps), String.valueOf(pe),
String.valueOf(id), jobStatus};
ret.add(new JobHistoryDataFetcher(fetcherArgs,
MapperFactory.getJobHistoryMapper()));
logger.info(
"Generated dataFetcher for startTime: " + new Date(ps) + ". EndTime:
" + new Date(pe));
@@ -61,11 +61,11 @@ public class JobMonitorUtils {
}
public static List<DataFetcher> generateFetchersfortime(
- long startTime, long endTime, long id, String timeType) {
+ long startTime, long endTime, long id, String jobStatus) {
List<DataFetcher> fetchers = new ArrayList<>();
String[] fetcherArgs =
new String[] {
- String.valueOf(startTime), String.valueOf(endTime),
String.valueOf(id), timeType
+ String.valueOf(startTime), String.valueOf(endTime),
String.valueOf(id), jobStatus
};
fetchers.add(new JobHistoryDataFetcher(fetcherArgs,
MapperFactory.getJobHistoryMapper()));
logger.info(
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/ThreadUtils.java
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/ThreadUtils.java
index 15a2626379..5e4133aa90 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/ThreadUtils.java
+++
b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/ThreadUtils.java
@@ -20,6 +20,7 @@ package org.apache.linkis.monitor.until;
import org.apache.linkis.common.utils.Utils;
import org.apache.linkis.monitor.config.MonitorConfig;
import org.apache.linkis.monitor.constants.Constants;
+import org.apache.linkis.monitor.jobhistory.entity.JobHistory;
import org.apache.linkis.monitor.utils.alert.AlertDesc;
import org.apache.linkis.monitor.utils.alert.ims.MonitorAlertUtils;
import org.apache.linkis.monitor.utils.alert.ims.PooledImsAlertUtils;
@@ -42,6 +43,9 @@ public class ThreadUtils extends ApplicationContextEvent {
public static ExecutionContextExecutorService executors =
Utils.newCachedExecutionContext(5, "alert-pool-thread-", false);
+ public static ExecutionContextExecutorService executors_analyze =
+ Utils.newCachedExecutionContext(50, "analyze-pool-thread-", false);
+
public ThreadUtils(ApplicationContext source) {
super(source);
}
@@ -64,4 +68,9 @@ public class ThreadUtils extends ApplicationContextEvent {
}
return msg;
}
+
+ public static void analyzeRun(JobHistory jobHistory) {
+ FutureTask future = new FutureTask(() ->
HttpsUntils.analyzeJob(jobHistory), -1);
+ executors_analyze.submit(future);
+ }
}
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/resources/mapper/common/JobHistoryMapper.xml
b/linkis-extensions/linkis-et-monitor/src/main/resources/mapper/common/JobHistoryMapper.xml
index 7be8f93154..dcab938da9 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/resources/mapper/common/JobHistoryMapper.xml
+++
b/linkis-extensions/linkis-et-monitor/src/main/resources/mapper/common/JobHistoryMapper.xml
@@ -47,7 +47,7 @@
<sql id="jobhistory_query">
job.`id`,job.`job_req_id`,job.`submit_user`,job.`execute_user`,job.`labels`,job.`params`,job.`status`,job.`error_code`,job.`created_time`,
-
job.`updated_time`,job.`instances`,job.`observe_info`,org.`org_id`,org.`org_name`
+
job.`updated_time`,job.`instances`,job.`engine_type`,job.`observe_info`,org.`org_id`,org.`org_name`
</sql>
<select id="selectJobHistory" useCache="false" resultMap="jobHistoryMap"
@@ -127,7 +127,7 @@
<include refid="jobhistory_query"/>
FROM linkis_ps_job_history_group_history job JOIN linkis_org_user org
ON job.submit_user = org.user_name
<where>
- <if test="id != null">job.id > #{id}</if>
+ <if test="id != null">job.id >= #{id}</if>
<if test="umUser != null">and job.submit_user = #{umUser}</if>
<if test="engineType != null">and job.engine_type =
#{engineType}</if>
<if test="startDate != null">and job.created_time >= #{startDate}
AND job.created_time <![CDATA[<=]]>#{endDate}
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/resources/mapper/common/UserDepartmentInfoMapper.xml
b/linkis-extensions/linkis-et-monitor/src/main/resources/mapper/common/UserDepartmentInfoMapper.xml
new file mode 100644
index 0000000000..295cee560f
--- /dev/null
+++
b/linkis-extensions/linkis-et-monitor/src/main/resources/mapper/common/UserDepartmentInfoMapper.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+
+<mapper
namespace="org.apache.linkis.monitor.department.dao.UserDepartmentInfoMapper">
+
+ <!-- 新增方法 -->
+ <insert id="insertUser">
+ INSERT INTO linkis_org_user(cluster_code, user_type, user_name,
org_id, org_name, queue_name, db_name, interface_user, is_union_analyse,
create_time, user_itsm_no)
+ VALUES (#{clusterCode}, #{userType}, #{userName}, #{orgId},
#{orgName}, #{queueName}, #{dbName}, #{interfaceUser}, #{isUnionAnalyse},
#{createTime}, #{userItsmNo})
+ </insert>
+
+ <!-- 批量新增方法 -->
+ <insert id="batchInsertUsers" parameterType="java.util.List">
+ INSERT INTO linkis_org_user (cluster_code, user_type, user_name,
org_id, org_name, queue_name, db_name, interface_user, is_union_analyse,
create_time, user_itsm_no)
+ VALUES
+ <foreach item="user" index="index" collection="userDepartmentInfos"
separator=",">
+ (#{user.clusterCode}, #{user.userType}, #{user.userName},
#{user.orgId}, #{user.orgName}, #{user.queueName}, #{user.dbName},
#{user.interfaceUser}, #{user.isUnionAnalyse}, #{user.createTime},
#{user.userItsmNo})
+ </foreach>
+ </insert>
+
+ <!-- 修改方法 -->
+ <update id="updateUser">
+ UPDATE linkis_org_user
+ <set>
+ <if test="clusterCode != null">cluster_code = #{clusterCode},</if>
+ <if test="userType != null">user_type = #{userType},</if>
+ <if test="orgId != null">org_id = #{orgId},</if>
+ <if test="orgName != null">org_name = #{orgName},</if>
+ <if test="queueName != null">queue_name = #{queueName},</if>
+ <if test="dbName != null">db_name = #{dbName},</if>
+ <if test="interfaceUser != null">interface_user =
#{interfaceUser},</if>
+ <if test="isUnionAnalyse != null">is_union_analyse =
#{isUnionAnalyse},</if>
+ <if test="createTime != null">create_time = #{createTime},</if>
+ <if test="userItsmNo != null">user_itsm_no = #{userItsmNo},</if>
+ </set>
+ WHERE user_name = #{userName}
+ </update>
+
+ <!-- 查询方法 -->
+ <select id="selectUser"
resultType="org.apache.linkis.monitor.department.entity.UserDepartmentInfo">
+ SELECT * FROM linkis_org_user WHERE user_name = #{userName}
+ </select>
+
+ <!-- 删除方法 -->
+ <delete id="deleteUser">
+ DELETE FROM linkis_org_user
+ </delete>
+
+ <!-- 查询所有数据 -->
+ <select id="selectAllUsers"
resultType="org.apache.linkis.monitor.department.entity.UserDepartmentInfo">
+ SELECT * FROM linkis_org_user_sync
+ </select>
+
+
+
+
+</mapper>
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/LinkisJobHistoryScanSpringConfiguration.scala
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/LinkisJobHistoryScanSpringConfiguration.scala
index e3652306c1..c6b2b04d59 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/LinkisJobHistoryScanSpringConfiguration.scala
+++
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/LinkisJobHistoryScanSpringConfiguration.scala
@@ -17,6 +17,7 @@
package org.apache.linkis.monitor
+import org.apache.linkis.monitor.department.dao.UserDepartmentInfoMapper
import org.apache.linkis.monitor.factory.MapperFactory
import org.apache.linkis.monitor.instance.dao.InstanceInfoDao
import org.apache.linkis.monitor.jobhistory.dao.JobHistoryMapper
@@ -39,10 +40,14 @@ class LinkisJobHistoryScanSpringConfiguration {
@Autowired
private var instanceInfoMapper: InstanceInfoDao = _
+ @Autowired
+ private var userDepartmentInfoMapper: UserDepartmentInfoMapper = _
+
@PostConstruct
def init(): Unit = {
MapperFactory.setJobHistoryMapper(jobHistoryMapper)
MapperFactory.setInstanceInfoMapper(instanceInfoMapper)
+ MapperFactory.setUserDepartmentInfoMapper(userDepartmentInfoMapper)
}
}
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClient.scala
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClient.scala
index 4caccd73a3..ab22404d59 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClient.scala
+++
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClient.scala
@@ -17,12 +17,30 @@
package org.apache.linkis.monitor.client
+import org.apache.linkis.datasource.client.response.{
+ GetConnectParamsByDataSourceNameResult,
+ GetInfoByDataSourceNameResult,
+ GetInfoPublishedByDataSourceNameResult
+}
import org.apache.linkis.httpclient.authentication.AuthenticationStrategy
import
org.apache.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy
import org.apache.linkis.httpclient.dws.config.{DWSClientConfig,
DWSClientConfigBuilder}
import org.apache.linkis.httpclient.response.Result
-import org.apache.linkis.monitor.request.{EmsListAction, EntranceTaskAction,
MonitorResourceAction}
-import org.apache.linkis.monitor.response.EntranceTaskResult
+import org.apache.linkis.monitor.request.{
+ AnalyzeJobAction,
+ DataSourceParamsAction,
+ EmsListAction,
+ EntranceTaskAction,
+ KeyvalueAction,
+ KillJobAction,
+ MonitorAction
+}
+import org.apache.linkis.monitor.response.{
+ AnalyzeJobResultAction,
+ EntranceTaskResult,
+ KeyvalueResult,
+ KillJobResultAction
+}
import org.apache.linkis.ujes.client.response.EmsListResult
import java.io.Closeable
@@ -30,7 +48,7 @@ import java.util.concurrent.TimeUnit
abstract class MonitorHTTPClient extends Closeable {
- protected[client] def executeJob(ujesJobAction: MonitorResourceAction):
Result
+ protected[client] def executeJob(ujesJobAction: MonitorAction): Result
def list(emsListAction: EmsListAction): EmsListResult = {
executeJob(emsListAction).asInstanceOf[EmsListResult]
@@ -40,6 +58,24 @@ abstract class MonitorHTTPClient extends Closeable {
executeJob(entranceTaskAction).asInstanceOf[EntranceTaskResult]
}
+ def getConfKeyValue(keyvalueAction: KeyvalueAction): KeyvalueResult = {
+ executeJob(keyvalueAction).asInstanceOf[KeyvalueResult]
+ }
+
+ def getInfoByDataSourceInfo(
+ datasourceInfoAction: DataSourceParamsAction
+ ): GetInfoPublishedByDataSourceNameResult = {
+
executeJob(datasourceInfoAction).asInstanceOf[GetInfoPublishedByDataSourceNameResult]
+ }
+
+ def killJob(killJobAction: KillJobAction): KillJobResultAction = {
+ executeJob(killJobAction).asInstanceOf[KillJobResultAction]
+ }
+
+ def analyzeJob(analyzeJobAction: AnalyzeJobAction): AnalyzeJobResultAction =
{
+ executeJob(analyzeJobAction).asInstanceOf[AnalyzeJobResultAction]
+ }
+
}
object MonitorHTTPClient {
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClientClientImpl.scala
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClientClientImpl.scala
index 5554701571..8074aeeb62 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClientClientImpl.scala
+++
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClientClientImpl.scala
@@ -21,14 +21,14 @@ import org.apache.linkis.httpclient.dws.DWSHttpClient
import org.apache.linkis.httpclient.dws.config.DWSClientConfig
import org.apache.linkis.httpclient.request.Action
import org.apache.linkis.httpclient.response.Result
-import org.apache.linkis.monitor.request.MonitorResourceAction
+import org.apache.linkis.monitor.request.MonitorAction
class MonitorHTTPClientClientImpl(clientConfig: DWSClientConfig) extends
MonitorHTTPClient {
private val dwsHttpClient =
new DWSHttpClient(clientConfig, "Linkis-MonitorResource-Execution-Thread")
- override protected[client] def executeJob(ujesJobAction:
MonitorResourceAction): Result =
+ override protected[client] def executeJob(ujesJobAction: MonitorAction):
Result =
ujesJobAction match {
case action: Action => dwsHttpClient.execute(action)
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorResourceClient.scala
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorResourceClient.scala
index d0660e1116..77a3226ba3 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorResourceClient.scala
+++
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorResourceClient.scala
@@ -21,7 +21,7 @@ import
org.apache.linkis.httpclient.authentication.AuthenticationStrategy
import
org.apache.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy
import org.apache.linkis.httpclient.dws.config.{DWSClientConfig,
DWSClientConfigBuilder}
import org.apache.linkis.httpclient.response.Result
-import org.apache.linkis.monitor.request.{EmsListAction, MonitorResourceAction}
+import org.apache.linkis.monitor.request.{EmsListAction, MonitorAction}
import org.apache.linkis.ujes.client.response.EmsListResult
import java.io.Closeable
@@ -29,7 +29,7 @@ import java.util.concurrent.TimeUnit
abstract class MonitorResourceClient extends Closeable {
- protected[client] def executeJob(ujesJobAction: MonitorResourceAction):
Result
+ protected[client] def executeJob(monitorAction: MonitorAction): Result
def list(jobListAction: EmsListAction): EmsListResult = {
executeJob(jobListAction).asInstanceOf[EmsListResult]
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorResourceClientImpl.scala
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorResourceClientImpl.scala
index 06cff3b46a..3112b2b63f 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorResourceClientImpl.scala
+++
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorResourceClientImpl.scala
@@ -21,15 +21,15 @@ import org.apache.linkis.httpclient.dws.DWSHttpClient
import org.apache.linkis.httpclient.dws.config.DWSClientConfig
import org.apache.linkis.httpclient.request.Action
import org.apache.linkis.httpclient.response.Result
-import org.apache.linkis.monitor.request.MonitorResourceAction
+import org.apache.linkis.monitor.request.MonitorAction
class MonitorResourceClientImpl(clientConfig: DWSClientConfig) extends
MonitorResourceClient {
private val dwsHttpClient =
new DWSHttpClient(clientConfig, "Linkis-MonitorResource-Execution-Thread")
- override protected[client] def executeJob(ujesJobAction:
MonitorResourceAction): Result =
- ujesJobAction match {
+ override protected[client] def executeJob(monitorAction: MonitorAction):
Result =
+ monitorAction match {
case action: Action => dwsHttpClient.execute(action)
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/constants/Constants.scala
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/constants/Constants.scala
index 9da7e35ebd..affa0ccb83 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/constants/Constants.scala
+++
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/constants/Constants.scala
@@ -79,6 +79,7 @@ object Constants {
val BML_CLEAR_IM = "bml.clear.monitor.im."
val THREAD_TIME_OUT_IM = "thread.monitor.timeout.im."
val JOB_RESULT_IM = "jobhistory.result.monitor.im."
+ val DEPARTMENT_USER_IM = "department.user.sync.im."
val BML_VERSION_MAX_NUM: CommonVars[Int] =
CommonVars[Int]("linkis.monitor.bml.cleaner.version.max.num", 50)
@@ -98,4 +99,18 @@ object Constants {
val LINKIS_CLUSTER_NAME =
CommonVars.properties.getProperty("linkis.cluster.name", "")
+ val ADMIN_USER = "hadoop"
+
+ val SPLIT_DELIMITER = ";"
+
+ val JDBC_ALERT_TIME = "linkis.jdbc.task.timeout.alert.time"
+
+ val JDBC_ALERT_USER = "linkis.jdbc.task.timeout.alert.user"
+
+ val JDBC_ALERT_LEVEL = "linkis.jdbc.task.timeout.alert.level"
+
+ val JOB_DATASOURCE_CONF = "wds.linkis.engine.runtime.datasource"
+
+ val JDBC_ENGINE = "jdbc"
+
}
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/factory/MapperFactory.scala
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/factory/MapperFactory.scala
index eb503c52aa..3f81c66514 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/factory/MapperFactory.scala
+++
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/factory/MapperFactory.scala
@@ -17,6 +17,7 @@
package org.apache.linkis.monitor.factory
+import org.apache.linkis.monitor.department.dao.UserDepartmentInfoMapper
import org.apache.linkis.monitor.instance.dao.{
InsLabelRelationDao,
InstanceInfoDao,
@@ -34,6 +35,8 @@ object MapperFactory {
private var instanceLabelRelationMapper: InsLabelRelationDao = _
+ private var userDepartmentInfoMapper: UserDepartmentInfoMapper = _
+
def getJobHistoryMapper(): JobHistoryMapper = jobHistoryMapper
def setJobHistoryMapper(jobHistoryMapper: JobHistoryMapper): Unit = {
@@ -58,4 +61,12 @@ object MapperFactory {
MapperFactory.instanceLabelRelationMapper = instanceLabelRelationMapper
}
+ // 获取userDepartmentInfoMapper的值
+ def getUserDepartmentInfoMapper: UserDepartmentInfoMapper =
userDepartmentInfoMapper
+
+ // 设置userDepartmentInfoMapper的值
+ def setUserDepartmentInfoMapper(mapper: UserDepartmentInfoMapper): Unit = {
+ userDepartmentInfoMapper = mapper
+ }
+
}
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/JobHistoryDataFetcher.scala
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/JobHistoryDataFetcher.scala
index 4f43d86d40..4f553847f6 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/JobHistoryDataFetcher.scala
+++
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/JobHistoryDataFetcher.scala
@@ -54,60 +54,46 @@ class JobHistoryDataFetcher(args: Array[Any], mapper:
JobHistoryMapper)
"Wrong input for JobHistoryDataFetcher. DataType: " +
args.getClass.getCanonicalName
)
}
- if (args != null && args.length == 2) {
- val start = Utils.tryCatch(args(0).asInstanceOf[String].toLong) { t =>
- {
- logger.error("Failed to get data from DB: Illegal arguments.", t)
- throw t
- }
- }
- val end = Utils.tryCatch(args(1).asInstanceOf[String].toLong) { t =>
- {
- logger.error("Failed to get data from DB: Illegal arguments.", t)
- throw t
- }
- }
- mapper
- .search(null, null, null, new Date(start), new Date(end), null)
- .asInstanceOf[util.List[scala.Any]]
- } else if (args != null && args.length == 4) {
- val start = Utils.tryCatch(args(0).asInstanceOf[String].toLong) { t =>
- {
- logger.error("Failed to get data from DB: Illegal arguments.", t)
- throw t
- }
- }
- val end = Utils.tryCatch(args(1).asInstanceOf[String].toLong) { t =>
- {
- logger.error("Failed to get data from DB: Illegal arguments.", t)
- throw t
- }
- }
- val id = Utils.tryCatch(args(2).asInstanceOf[String].toLong) { t =>
- {
- logger.error("Failed to get data from DB: Illegal arguments.", t)
- throw t
- }
- }
- if (
- StringUtils.isNotBlank(args(3).asInstanceOf[String]) && args(3)
- .asInstanceOf[String]
- .equals("updated_time")
- ) {
- val list = new util.ArrayList[String]()
- Constants.DATA_FINISHED_JOB_STATUS_ARRAY.foreach(list.add)
- mapper
- .searchByCacheAndUpdateTime(id, null, list, new Date(start), new
Date(end), null)
- .asInstanceOf[util.List[scala.Any]]
- } else {
- var list = new util.ArrayList[String]()
- Constants.DATA_UNFINISHED_JOB_STATUS_ARRAY.foreach(list.add)
- if (args(3).asInstanceOf[String].equals("department")) {
- list = null;
- }
- mapper
- .searchByCache(id, null, list, new Date(start), new Date(end), null)
- .asInstanceOf[util.List[scala.Any]]
+ if (args != null) {
+ val start = args(0).asInstanceOf[String].toLong
+ val end = args(1).asInstanceOf[String].toLong
+ // 根据参数数量进行不同的处理
+ args.length match {
+ // 参数数量为2,则数据库查询仅筛选开始和结束时间
+ case 2 =>
+ mapper
+ .search(null, null, null, new Date(start), new Date(end), null)
+ .asInstanceOf[util.List[scala.Any]]
+ // 参数数量为4,根据第四个参数进行不同的查询
+ case 4 =>
+ val id = args(2).asInstanceOf[String].toLong
+ val parm = args(3).asInstanceOf[String]
+ parm match {
+ // 筛选任务包含id,时间,已完成状态任务
+ case "finished_job" =>
+ val list = new util.ArrayList[String]()
+ Constants.DATA_FINISHED_JOB_STATUS_ARRAY.foreach(list.add)
+ mapper
+ .searchByCacheAndUpdateTime(id, null, list, new Date(start),
new Date(end), null)
+ .asInstanceOf[util.List[scala.Any]]
+ // 筛选任务包含id,时间,未完成状态任务
+ case "unfinished_job" =>
+ var list = new util.ArrayList[String]()
+ Constants.DATA_UNFINISHED_JOB_STATUS_ARRAY.foreach(list.add)
+ mapper
+ .searchByCache(id, null, list, new Date(start), new Date(end),
null)
+ .asInstanceOf[util.List[scala.Any]]
+ // 筛选任务包含id,时间
+ case _ =>
+ mapper
+ .searchByCache(id, null, null, new Date(start), new Date(end),
null)
+ .asInstanceOf[util.List[scala.Any]]
+ }
+ case _ =>
+ throw new AnomalyScannerException(
+ 21304,
+ "Wrong input for JobHistoryDataFetcher. Data: " + args
+ )
}
} else {
throw new AnomalyScannerException(
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeAlertSender.scala
similarity index 55%
copy from
linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
copy to
linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeAlertSender.scala
index aa73471c49..2b17adb39b 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
+++
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeAlertSender.scala
@@ -15,17 +15,20 @@
* limitations under the License.
*/
-package org.apache.linkis.monitor.jobhistory;
+package org.apache.linkis.monitor.jobhistory.analyze
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Date;
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.monitor.constants.Constants
+import org.apache.linkis.monitor.core.ob.{Event, Observer}
+import org.apache.linkis.monitor.jobhistory.entity.JobHistory
+import org.apache.linkis.monitor.jobhistory.exception.AnomalyScannerException
+import org.apache.linkis.monitor.utils.alert.ims.{MonitorAlertUtils,
PooledImsAlertUtils}
-public class QueryUtils {
+import java.util
- private static DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss.SSS");
+import scala.collection.JavaConverters._
+
+class JobHistoryAnalyzeAlertSender() extends Observer with Logging {
+ override def update(e: Event, jobHistroyList: scala.Any): Unit = {}
- public static String dateToString(Date date) {
- return dateFormat.format(date);
- }
}
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/MonitorResourceAction.scala
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeHitEvent.scala
similarity index 82%
copy from
linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/MonitorResourceAction.scala
copy to
linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeHitEvent.scala
index 7ea2001481..675aacfc98 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/MonitorResourceAction.scala
+++
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeHitEvent.scala
@@ -15,8 +15,8 @@
* limitations under the License.
*/
-package org.apache.linkis.monitor.request
+package org.apache.linkis.monitor.jobhistory.analyze
-import org.apache.linkis.httpclient.dws.request.DWSHttpAction
+import org.apache.linkis.monitor.core.ob.SingleObserverEvent
-trait MonitorResourceAction extends DWSHttpAction with UserAction
+class JobHistoryAnalyzeHitEvent extends SingleObserverEvent
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeRule.scala
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeRule.scala
new file mode 100644
index 0000000000..aec128ffab
--- /dev/null
+++
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeRule.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.monitor.jobhistory.analyze
+
+import org.apache.linkis.common.utils.{Logging, Utils}
+import org.apache.linkis.monitor.constants.Constants
+import org.apache.linkis.monitor.core.ob.Observer
+import org.apache.linkis.monitor.core.pac.{AbstractScanRule, ScannedData}
+import org.apache.linkis.monitor.jobhistory.entity.JobHistory
+import org.apache.linkis.monitor.until.{CacheUtils, HttpsUntils, ThreadUtils}
+import org.apache.linkis.monitor.utils.job.JohistoryUtils
+
+import java.util
+
+class JobHistoryAnalyzeRule(hitObserver: Observer)
+ extends AbstractScanRule(event = new JobHistoryAnalyzeHitEvent, observer =
hitObserver)
+ with Logging {
+ private val scanRuleList = CacheUtils.cacheBuilder
+
+ /**
+ * if data match the pattern, return true and trigger observer should call
isMatched()
+ *
+ * @param data
+ * @return
+ */
+ override def triggerIfMatched(data: util.List[ScannedData]): Boolean = {
+ if (!getHitEvent.isRegistered) {
+ logger.error("ScanRule is not bind with an observer. Will not be
triggered")
+ return false
+ }
+ for (scanedData <- JohistoryUtils.getJobhistorySanData(data)) {
+ scanedData match {
+ case jobHistory: JobHistory =>
+ val jobStatus = jobHistory.getStatus.toUpperCase()
+ if (Constants.FINISHED_JOB_STATUS.contains(jobStatus) &&
jobStatus.equals("FAILED")) {
+ // 执行任务分析
+ ThreadUtils.analyzeRun(jobHistory)
+ }
+ case _ =>
+ }
+ }
+ true
+ }
+
+}
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/JobTimeExceedRule.scala
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/JobTimeExceedRule.scala
index f788173e43..0367bfc05e 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/JobTimeExceedRule.scala
+++
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/JobTimeExceedRule.scala
@@ -18,6 +18,7 @@
package org.apache.linkis.monitor.jobhistory.jobtime
import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.monitor.config.MonitorConfig
import org.apache.linkis.monitor.constants.Constants
import org.apache.linkis.monitor.core.ob.Observer
import org.apache.linkis.monitor.core.pac.{AbstractScanRule, ScannedData}
@@ -26,7 +27,7 @@ import
org.apache.linkis.monitor.jobhistory.exception.AnomalyScannerException
import org.apache.linkis.monitor.until.CacheUtils
import java.util
-import java.util.Locale
+import java.util.{Locale, Optional}
import scala.collection.JavaConverters._
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedAlertSender.scala
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedAlertSender.scala
new file mode 100644
index 0000000000..4e6e707335
--- /dev/null
+++
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedAlertSender.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.monitor.jobhistory.jobtime
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.monitor.constants.Constants
+import org.apache.linkis.monitor.core.ob.{Event, Observer}
+import org.apache.linkis.monitor.jobhistory.entity.JobHistory
+import org.apache.linkis.monitor.jobhistory.exception.AnomalyScannerException
+import org.apache.linkis.monitor.until.HttpsUntils
+import org.apache.linkis.monitor.utils.alert.ims.{MonitorAlertUtils,
PooledImsAlertUtils}
+
+import org.apache.commons.collections.MapUtils
+import org.apache.commons.lang3.StringUtils
+
+import java.util
+
+import scala.collection.JavaConverters.asScalaBufferConverter
+
+class StarrocksTimeExceedAlertSender extends Observer with Logging {
+
+ /**
+ * Observer Pattern
+ */
+ override def update(e: Event, jobHistroyList: scala.Any): Unit = {
+ if (!e.isInstanceOf[StarrocksTimeExceedHitEvent]) {
+ throw new AnomalyScannerException(
+ 21304,
+ "Wrong event that triggers JobHistoryErrorCodeAlertSender. Input
DataType: " + e.getClass.getCanonicalName
+ )
+ }
+ if (null == jobHistroyList || !jobHistroyList.isInstanceOf[util.List[_]]) {
+ throw new AnomalyScannerException(
+ 21304,
+ "Wrong input for JobHistoryErrorCodeAlertSender. Input DataType: " +
jobHistroyList.getClass.getCanonicalName
+ )
+ }
+ for (a <- jobHistroyList.asInstanceOf[util.List[_]].asScala) {
+ if (a == null) {
+ logger.warn("Ignore null input data")
+ } else if (!a.isInstanceOf[JobHistory]) {
+ logger.warn("Ignore wrong input data Type : " +
a.getClass.getCanonicalName)
+ } else {
+ val jobHistory = a.asInstanceOf[JobHistory]
+ val timeValue =
+ HttpsUntils.getJDBCConf(jobHistory.getSubmitUser,
Constants.JDBC_ALERT_TIME)
+ val userValue =
+ HttpsUntils.getJDBCConf(jobHistory.getSubmitUser,
Constants.JDBC_ALERT_USER)
+ var levelValue =
+ HttpsUntils.getJDBCConf(jobHistory.getSubmitUser,
Constants.JDBC_ALERT_LEVEL)
+ if (StringUtils.isNotBlank(timeValue) &&
StringUtils.isNotBlank(userValue)) {
+ val replaceParm: util.HashMap[String, String] = new
util.HashMap[String, String]
+ replaceParm.put("$id", String.valueOf(jobHistory.getId))
+ replaceParm.put("$timeoutTime", timeValue)
+ replaceParm.put("$alteruser", userValue)
+ replaceParm.put("$eccAlertUser", userValue)
+ replaceParm.put("$submitUser", jobHistory.getSubmitUser)
+ if (StringUtils.isBlank(levelValue)) {
+ levelValue = "3";
+ }
+ replaceParm.put("$alterLevel", levelValue)
+ val alters =
MonitorAlertUtils.getAlerts(Constants.USER_LABEL_MONITOR, replaceParm)
+ PooledImsAlertUtils.addAlert(alters.get("12020"))
+ }
+ }
+ }
+ }
+
+}
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/MonitorResourceAction.scala
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedHitEvent.scala
similarity index 82%
copy from
linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/MonitorResourceAction.scala
copy to
linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedHitEvent.scala
index 7ea2001481..af5432cd9f 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/MonitorResourceAction.scala
+++
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedHitEvent.scala
@@ -15,8 +15,8 @@
* limitations under the License.
*/
-package org.apache.linkis.monitor.request
+package org.apache.linkis.monitor.jobhistory.jobtime
-import org.apache.linkis.httpclient.dws.request.DWSHttpAction
+import org.apache.linkis.monitor.core.ob.SingleObserverEvent
-trait MonitorResourceAction extends DWSHttpAction with UserAction
+class StarrocksTimeExceedHitEvent extends SingleObserverEvent
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedRule.scala
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedRule.scala
new file mode 100644
index 0000000000..b616c5c02c
--- /dev/null
+++
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedRule.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.monitor.jobhistory.jobtime
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.monitor.constants.Constants
+import org.apache.linkis.monitor.core.ob.Observer
+import org.apache.linkis.monitor.core.pac.{AbstractScanRule, ScannedData}
+import org.apache.linkis.monitor.jobhistory.entity.JobHistory
+import org.apache.linkis.monitor.until.{CacheUtils, HttpsUntils}
+import org.apache.linkis.server.BDPJettyServerHelper
+
+import org.apache.commons.collections.MapUtils
+import org.apache.commons.lang3.StringUtils
+
+import java.util
+import java.util.{ArrayList, HashMap, List, Locale, Map}
+
+import scala.collection.JavaConverters._
+
+class StarrocksTimeExceedRule(hitObserver: Observer)
+ extends AbstractScanRule(event = new StarrocksTimeExceedHitEvent, observer
= hitObserver)
+ with Logging {
+
+ private val scanRuleList = CacheUtils.cacheBuilder
+
+ /**
+ * if data match the pattern, return true and trigger observer should call
isMatched()
+ *
+ * @param data
+ * @return
+ */
+ override def triggerIfMatched(data: util.List[ScannedData]): Boolean = {
+
+ if (!getHitEvent().isRegistered || data == null) {
+ logger.error("ScanRule is not bind with an observer. Will not be
triggered")
+ return false
+ }
+ val alertData: util.List[JobHistory] = new util.ArrayList[JobHistory]()
+ for (scannedData <- data.asScala) {
+ if (scannedData != null && scannedData.getData() != null) {
+ var taskMinID = 0L;
+ for (jobHistory <- scannedData.getData().asScala) {
+ jobHistory match {
+ case job: JobHistory =>
+ val status = job.getStatus.toUpperCase(Locale.getDefault)
+ val engineType = job.getEngineType.toUpperCase(Locale.getDefault)
+ if (
+ Constants.UNFINISHED_JOB_STATUS
+ .contains(status) && engineType.equals(
+ Constants.JDBC_ENGINE.toUpperCase(Locale.getDefault)
+ )
+ ) {
+ // 获取job所使用的数据源类型
+ val datasourceConfMap = getDatasourceConf(job)
+ logger.info("starock datasourceConfMap: {}",
datasourceConfMap)
+ // 计算任务执行时间
+ val elapse = System.currentTimeMillis() -
job.getCreatedTime.getTime
+ // 获取告警配置
+ val timeValue =
+ HttpsUntils.getJDBCConf(job.getSubmitUser,
Constants.JDBC_ALERT_TIME)
+ logger.info("starock timeValue: {},elapse {}", timeValue,
elapse)
+ if (StringUtils.isNotBlank(timeValue)) {
+ val timeoutInSeconds = timeValue.toDouble
+ val timeoutInMillis = (timeoutInSeconds * 60 * 1000).toLong
+ if (elapse > timeoutInMillis) {
+ // 发送告警
+ alertData.add(job)
+ }
+ }
+ }
+ if (taskMinID == 0L || taskMinID > job.getId) {
+ taskMinID = job.getId
+ scanRuleList.put("jdbcUnfinishedAlertScan", taskMinID)
+ }
+ case _ =>
+ logger.warn(
+ "Ignored wrong input data Type : " + jobHistory + ", " +
jobHistory.getClass.getCanonicalName
+ )
+ }
+ }
+ } else {
+ logger.warn("Ignored null scanned data")
+ }
+
+ }
+ logger.info("hit " + alertData.size() + " data in one iteration")
+ if (alertData.size() > 0) {
+ getHitEvent.notifyObserver(getHitEvent, alertData)
+ true
+ } else {
+ false
+ }
+ }
+
+ private def getDatasourceConf(job: JobHistory): util.Map[_, _] = {
+ // 获取任务参数中datasourcename
+ val parmMap =
+ BDPJettyServerHelper.gson.fromJson(job.getParams,
classOf[java.util.Map[String, String]])
+ val configurationMap =
+ MapUtils.getMap(parmMap, "configuration", new util.HashMap[String,
String]())
+ val runtimeMap =
+ MapUtils.getMap(configurationMap, "runtime", new util.HashMap[String,
String]())
+ val datasourceName = MapUtils.getString(runtimeMap,
Constants.JOB_DATASOURCE_CONF, "")
+ // 获取datasource信息
+ if (StringUtils.isNotBlank(datasourceName)) {
+ HttpsUntils.getDatasourceConf(job.getSubmitUser, datasourceName)
+ } else {
+ new util.HashMap[String, String]()
+ }
+ }
+
+}
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/UserAction.scala
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillAlertSender.scala
similarity index 70%
copy from
linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/UserAction.scala
copy to
linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillAlertSender.scala
index 4733a1b45f..18553f228e 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/UserAction.scala
+++
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillAlertSender.scala
@@ -15,12 +15,16 @@
* limitations under the License.
*/
-package org.apache.linkis.monitor.request
+package org.apache.linkis.monitor.jobhistory.jobtime
-trait UserAction extends org.apache.linkis.httpclient.request.UserAction {
- private var user: String = _
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.monitor.core.ob.{Event, Observer}
- override def setUser(user: String): Unit = this.user = user
+class StarrocksTimeKillAlertSender extends Observer with Logging {
+
+ /**
+ * Observer Pattern
+ */
+ override def update(e: Event, jobHistroyList: scala.Any): Unit = {}
- override def getUser: String = user
}
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillRule.scala
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillRule.scala
new file mode 100644
index 0000000000..e5df6f3ff7
--- /dev/null
+++
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillRule.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.monitor.jobhistory.jobtime
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.monitor.constants.Constants
+import org.apache.linkis.monitor.core.ob.Observer
+import org.apache.linkis.monitor.core.pac.{AbstractScanRule, ScannedData}
+import org.apache.linkis.monitor.jobhistory.entity.JobHistory
+import org.apache.linkis.monitor.until.{CacheUtils, HttpsUntils}
+import org.apache.linkis.server.BDPJettyServerHelper
+
+import org.apache.commons.collections.MapUtils
+import org.apache.commons.lang3.StringUtils
+
+import java.util
+import java.util.Locale
+
+import scala.collection.JavaConverters._
+
+class StarrocksTimeKillRule(hitObserver: Observer)
+ extends AbstractScanRule(event = new StarrocksTimeKillHitEvent, observer =
hitObserver)
+ with Logging {
+
+ private val scanRuleList = CacheUtils.cacheBuilder
+
+ /**
+ * if data match the pattern, return true and trigger observer should call
isMatched()
+ *
+ * @param data
+ * @return
+ */
+ override def triggerIfMatched(data: util.List[ScannedData]): Boolean = {
+
+ if (!getHitEvent().isRegistered || data == null) {
+ logger.error("ScanRule is not bind with an observer. Will not be
triggered")
+ return false
+ }
+ for (scannedData <- data.asScala) {
+ if (scannedData != null && scannedData.getData() != null) {
+ var taskMinID = 0L;
+ for (jobHistory <- scannedData.getData().asScala) {
+ jobHistory match {
+ case job: JobHistory =>
+ val status = job.getStatus.toUpperCase(Locale.getDefault)
+ val engineType = job.getEngineType.toUpperCase(Locale.getDefault)
+ if (
+ Constants.UNFINISHED_JOB_STATUS
+ .contains(status) && engineType.equals(
+ Constants.JDBC_ENGINE.toUpperCase(Locale.getDefault)
+ )
+ ) {
+ // 计算任务执行时间
+ val elapse = System.currentTimeMillis() -
job.getCreatedTime.getTime
+ // 获取超时kill配置信息
+ if (StringUtils.isNotBlank(job.getParams)) {
+ val connectParamsMap = MapUtils.getMap(
+ getDatasourceConf(job),
+ "connectParams",
+ new util.HashMap[AnyRef, AnyRef]
+ )
+ val killTime = MapUtils.getString(connectParamsMap,
"kill_task_time", "")
+ logger.info("starock killTime: {}", killTime)
+ if (StringUtils.isNotBlank(killTime) && elapse >
killTime.toLong * 60 * 1000) {
+ if (StringUtils.isNotBlank(killTime)) {
+ val timeoutInSeconds = killTime.toDouble
+ val timeoutInMillis = (timeoutInSeconds * 60 *
1000).toLong
+ if (elapse > timeoutInMillis) {
+ // 触发kill任务
+ HttpsUntils.killJob(job)
+ }
+ }
+ }
+ }
+ }
+ if (taskMinID == 0L || taskMinID > job.getId) {
+ taskMinID = job.getId
+ scanRuleList.put("jdbcUnfinishedKillScan", taskMinID)
+ }
+ case _ =>
+ logger.warn(
+ "Ignored wrong input data Type : " + jobHistory + ", " +
jobHistory.getClass.getCanonicalName
+ )
+ }
+ }
+ } else {
+ logger.warn("Ignored null scanned data")
+ }
+ }
+ true
+ }
+
+ private def getDatasourceConf(job: JobHistory): util.Map[_, _] = {
+ // 获取任务参数中datasourcename
+ val parmMap =
+ BDPJettyServerHelper.gson.fromJson(job.getParams,
classOf[java.util.Map[String, String]])
+ val configurationMap =
+ MapUtils.getMap(parmMap, "configuration", new util.HashMap[String,
String]())
+ val runtimeMap =
+ MapUtils.getMap(configurationMap, "runtime", new util.HashMap[String,
String]())
+ val datasourceName = MapUtils.getString(runtimeMap,
Constants.JOB_DATASOURCE_CONF, "")
+ // 获取datasource信息
+ if (StringUtils.isNotBlank(datasourceName)) {
+ HttpsUntils.getDatasourceConf(job.getSubmitUser, datasourceName)
+ } else {
+ new util.HashMap[String, String]()
+ }
+ }
+
+}
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/EmsListAction.scala
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/AnalyzeJobAction.scala
similarity index 50%
copy from
linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/EmsListAction.scala
copy to
linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/AnalyzeJobAction.scala
index 6f3158e869..333c7ff1c2 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/EmsListAction.scala
+++
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/AnalyzeJobAction.scala
@@ -21,35 +21,23 @@ import org.apache.linkis.httpclient.request.GetAction
import org.apache.commons.lang3.StringUtils
-import scala.collection.mutable.ArrayBuffer
+class AnalyzeJobAction extends GetAction with MonitorAction {
-class EmsListAction extends GetAction with MonitorResourceAction {
-
- override def suffixURLs: Array[String] = Array("linkisManager", "listAllEMs")
+ override def suffixURLs: Array[String] = Array("jobhistory",
"diagnosis-query")
}
-object EmsListAction {
+object AnalyzeJobAction {
+
def newBuilder(): Builder = new Builder
- class Builder private[EmsListAction] () {
- private var user: String = _
- private var instance: String = _
- private var nodeHealthy: String = _
- private var owner: String = _
+ class Builder private[AnalyzeJobAction] () {
- def setInstance(instance: String): Builder = {
- this.instance = instance
- this
- }
-
- def setNodeHealthy(nodeHealthy: String): Builder = {
- this.nodeHealthy = nodeHealthy
- this
- }
+ private var taskID: String = _
+ private var user: String = _
- def setOwner(owner: String): Builder = {
- this.owner = owner
+ def setTaskID(taskID: String): Builder = {
+ this.taskID = taskID
this
}
@@ -58,15 +46,11 @@ object EmsListAction {
this
}
- def build(): EmsListAction = {
- val emsListAction = new EmsListAction
- if (StringUtils.isNotBlank(instance))
emsListAction.setParameter("instance", instance)
- if (StringUtils.isNotBlank(nodeHealthy)) {
- emsListAction.setParameter("nodeHealthy", nodeHealthy)
- }
- if (StringUtils.isNotBlank(owner)) emsListAction.setParameter("owner",
owner)
- if (StringUtils.isNotBlank(user)) emsListAction.setUser(user)
- emsListAction
+ def build(): AnalyzeJobAction = {
+ val analyzeJobAction = new AnalyzeJobAction
+ if (StringUtils.isNotBlank(taskID))
analyzeJobAction.setParameter("taskID", taskID)
+ if (StringUtils.isNotBlank(user)) analyzeJobAction.setUser(user)
+ analyzeJobAction
}
}
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/DataSourceParamsAction.scala
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/DataSourceParamsAction.scala
new file mode 100644
index 0000000000..11b0167aef
--- /dev/null
+++
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/DataSourceParamsAction.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.monitor.request
+
+import
org.apache.linkis.datasource.client.config.DatasourceClientConfig.DATA_SOURCE_SERVICE_MODULE
+import
org.apache.linkis.datasource.client.errorcode.DatasourceClientErrorCodeSummary
+import
org.apache.linkis.datasource.client.exception.DataSourceClientBuilderException
+import org.apache.linkis.httpclient.request.GetAction
+import org.apache.linkis.monitor.request.KeyvalueAction.Builder
+
+import org.apache.commons.lang3.StringUtils
+
+import java.net.URLEncoder
+import java.nio.charset.StandardCharsets
+import java.text.MessageFormat
+
+class DataSourceParamsAction extends GetAction with MonitorAction {
+
+ private var dataSourceName: String = _
+
+ override def suffixURLs: Array[String] =
+ Array(DATA_SOURCE_SERVICE_MODULE.getValue, "publishedInfo", "name",
dataSourceName)
+
+ private var user: String = _
+
+ override def setUser(user: String): Unit = this.user = user
+
+ override def getUser: String = this.user
+}
+
+object DataSourceParamsAction {
+ def builder(): Builder = new Builder
+
+ class Builder private[DataSourceParamsAction] () {
+ private var dataSourceName: String = _
+ private var system: String = _
+ private var user: String = _
+
+ def setUser(user: String): Builder = {
+ this.user = user
+ this
+ }
+
+ def setDataSourceName(dataSourceName: String): Builder = {
+ this.dataSourceName = dataSourceName
+ this
+ }
+
+ def setSystem(system: String): Builder = {
+ this.system = system
+ this
+ }
+
+ def build(): DataSourceParamsAction = {
+ if (dataSourceName == null) {
+ throw new DataSourceClientBuilderException(
+ DatasourceClientErrorCodeSummary.DATASOURCENAME_NEEDED.getErrorDesc
+ )
+ }
+ if (system == null)
+ throw new DataSourceClientBuilderException(
+ DatasourceClientErrorCodeSummary.SYSTEM_NEEDED.getErrorDesc
+ )
+ if (user == null)
+ throw new DataSourceClientBuilderException(
+ DatasourceClientErrorCodeSummary.USER_NEEDED.getErrorDesc
+ )
+ val dataSourceParamsAction = new DataSourceParamsAction
+ dataSourceParamsAction.dataSourceName = this.dataSourceName
+ dataSourceParamsAction.setParameter("system", system)
+ dataSourceParamsAction.setUser(user)
+ dataSourceParamsAction
+ }
+
+ }
+
+}
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/EmsListAction.scala
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/EmsListAction.scala
index 6f3158e869..3ac1719de5 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/EmsListAction.scala
+++
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/EmsListAction.scala
@@ -23,7 +23,7 @@ import org.apache.commons.lang3.StringUtils
import scala.collection.mutable.ArrayBuffer
-class EmsListAction extends GetAction with MonitorResourceAction {
+class EmsListAction extends GetAction with MonitorAction {
override def suffixURLs: Array[String] = Array("linkisManager", "listAllEMs")
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/EntranceTaskAction.scala
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/EntranceTaskAction.scala
index f3175d802f..4e60b59a30 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/EntranceTaskAction.scala
+++
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/EntranceTaskAction.scala
@@ -21,7 +21,7 @@ import org.apache.linkis.httpclient.request.GetAction
import org.apache.commons.lang3.StringUtils
-class EntranceTaskAction extends GetAction with MonitorResourceAction {
+class EntranceTaskAction extends GetAction with MonitorAction {
override def suffixURLs: Array[String] = Array("entrance/operation/metrics",
"taskinfo")
}
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/KeyvalueAction.scala
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/KeyvalueAction.scala
new file mode 100644
index 0000000000..654de3ed28
--- /dev/null
+++
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/KeyvalueAction.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.monitor.request
+
+import org.apache.linkis.httpclient.request.GetAction
+import org.apache.linkis.ujes.client.request.UJESJobAction
+
+import org.apache.commons.lang3.StringUtils
+
+class KeyvalueAction extends GetAction with MonitorAction {
+
+ override def suffixURLs: Array[String] = Array("configuration", "keyvalue")
+
+}
+
+object KeyvalueAction {
+
+ def newBuilder(): Builder = new Builder
+
+ class Builder private[KeyvalueAction] () {
+
+ private var engineType: String = _
+ private var version: String = _
+ private var creator: String = _
+ private var configKey: String = _
+ private var user: String = _
+
+ def setEngineType(engineType: String): Builder = {
+ this.engineType = engineType
+ this
+ }
+
+ def setVersion(version: String): Builder = {
+ this.version = version
+ this
+ }
+
+ def setCreator(creator: String): Builder = {
+ this.creator = creator
+ this
+ }
+
+ def setConfigKey(configKey: String): Builder = {
+ this.configKey = configKey
+ this
+ }
+
+ def setUser(user: String): Builder = {
+ this.user = user
+ this
+ }
+
+ def build(): KeyvalueAction = {
+ val keyvalueAction = new KeyvalueAction
+ if (StringUtils.isNotBlank(engineType))
keyvalueAction.setParameter("engineType", engineType)
+ if (StringUtils.isNotBlank(version))
keyvalueAction.setParameter("version", version)
+ if (StringUtils.isNotBlank(creator))
keyvalueAction.setParameter("creator", creator)
+ if (StringUtils.isNotBlank(configKey))
keyvalueAction.setParameter("configKey", configKey)
+ if (StringUtils.isNotBlank(user)) keyvalueAction.setUser(user)
+ keyvalueAction
+ }
+
+ }
+
+}
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/KillJobAction.scala
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/KillJobAction.scala
new file mode 100644
index 0000000000..4b23626efb
--- /dev/null
+++
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/KillJobAction.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.monitor.request
+
+import org.apache.linkis.httpclient.dws.DWSHttpClient
+import org.apache.linkis.httpclient.request.POSTAction
+
+import java.util
+
+class KillJobAction extends POSTAction with MonitorAction {
+
+ private var execID: String = _
+
+ override def suffixURLs: Array[String] = Array("entrance", execID,
"killJobs")
+
+ override def getRequestPayload: String =
+ DWSHttpClient.jacksonJson.writeValueAsString(getRequestPayloads)
+
+}
+
+object KillJobAction {
+
+ def builder(): Builder = new Builder
+
+ class Builder private[KillJobAction] () {
+ private var user: String = _
+
+ private var idList: util.List[String] = _
+
+ private var taskIDList: util.List[Long] = _
+
+ private var execID: String = _
+
+ def setIdList(idList: util.List[String]): Builder = {
+ this.idList = idList
+ this
+ }
+
+ def setTaskIDList(taskIDList: util.List[Long]): Builder = {
+ this.taskIDList = taskIDList
+ this
+ }
+
+ def setExecID(execID: String): Builder = {
+ this.execID = execID
+ this
+ }
+
+ def setUser(user: String): Builder = {
+ this.user = user
+ this
+ }
+
+ def build(): KillJobAction = {
+ val killJobAction = new KillJobAction
+ killJobAction.setUser(user)
+ killJobAction.addRequestPayload("idList", idList)
+ killJobAction.addRequestPayload("taskIDList", taskIDList)
+ killJobAction.execID = execID
+ killJobAction
+ }
+
+ }
+
+}
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/MonitorResourceAction.scala
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/MonitorResourceAction.scala
index 7ea2001481..8c203303a0 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/MonitorResourceAction.scala
+++
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/MonitorResourceAction.scala
@@ -18,5 +18,6 @@
package org.apache.linkis.monitor.request
import org.apache.linkis.httpclient.dws.request.DWSHttpAction
+import org.apache.linkis.ujes.client.request.UserAction
-trait MonitorResourceAction extends DWSHttpAction with UserAction
+trait MonitorAction extends DWSHttpAction with UserAction
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/UserAction.scala
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/UserAction.scala
index 4733a1b45f..7cdabf33a8 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/UserAction.scala
+++
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/UserAction.scala
@@ -15,12 +15,8 @@
* limitations under the License.
*/
-package org.apache.linkis.monitor.request
+package org.apache.linkis.monitor.jobhistory.jobtime
-trait UserAction extends org.apache.linkis.httpclient.request.UserAction {
- private var user: String = _
+import org.apache.linkis.monitor.core.ob.SingleObserverEvent
- override def setUser(user: String): Unit = this.user = user
-
- override def getUser: String = user
-}
+class StarrocksTimeKillHitEvent extends SingleObserverEvent
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/response/AnalyzeJobResultAction.scala
similarity index 65%
copy from
linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
copy to
linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/response/AnalyzeJobResultAction.scala
index aa73471c49..3c7b0c03c3 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
+++
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/response/AnalyzeJobResultAction.scala
@@ -15,17 +15,19 @@
* limitations under the License.
*/
-package org.apache.linkis.monitor.jobhistory;
+package org.apache.linkis.monitor.response
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Date;
+import org.apache.linkis.httpclient.dws.annotation.DWSHttpMessageResult
+import org.apache.linkis.httpclient.dws.response.DWSResult
-public class QueryUtils {
+import java.util
- private static DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss.SSS");
+import scala.beans.BeanProperty
+
+@DWSHttpMessageResult("/api/rest_j/v\\d+/jobhistory/diagnosis-query")
+class AnalyzeJobResultAction extends DWSResult {
+
+ @BeanProperty
+ var messages: util.ArrayList[util.Map[String, AnyRef]] = _
- public static String dateToString(Date date) {
- return dateFormat.format(date);
- }
}
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/response/KeyvalueResult.scala
similarity index 63%
copy from
linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
copy to
linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/response/KeyvalueResult.scala
index aa73471c49..34289b188e 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
+++
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/response/KeyvalueResult.scala
@@ -15,17 +15,22 @@
* limitations under the License.
*/
-package org.apache.linkis.monitor.jobhistory;
+package org.apache.linkis.monitor.response
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Date;
+import org.apache.linkis.httpclient.dws.annotation.DWSHttpMessageResult
+import org.apache.linkis.httpclient.dws.response.DWSResult
-public class QueryUtils {
+import java.util
- private static DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss.SSS");
+import scala.beans.BeanProperty
+
+@DWSHttpMessageResult("/api/rest_j/v\\d+/configuration/keyvalue")
+class KeyvalueResult extends DWSResult {
+
+ @BeanProperty
+ var configValues: util.ArrayList[util.Map[String, AnyRef]] = _
+
+ @BeanProperty
+ var totalPage: Int = _
- public static String dateToString(Date date) {
- return dateFormat.format(date);
- }
}
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/response/KillJobResultAction.scala
similarity index 66%
copy from
linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
copy to
linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/response/KillJobResultAction.scala
index aa73471c49..8d8392d94f 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
+++
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/response/KillJobResultAction.scala
@@ -15,17 +15,19 @@
* limitations under the License.
*/
-package org.apache.linkis.monitor.jobhistory;
+package org.apache.linkis.monitor.response
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Date;
+import org.apache.linkis.httpclient.dws.annotation.DWSHttpMessageResult
+import org.apache.linkis.httpclient.dws.response.DWSResult
-public class QueryUtils {
+import java.util
- private static DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss.SSS");
+import scala.beans.BeanProperty
+
+@DWSHttpMessageResult("/api/rest_j/v\\d+/entrance/(\\S+)/killJobs")
+class KillJobResultAction extends DWSResult {
+
+ @BeanProperty
+ var messages: util.ArrayList[util.Map[String, AnyRef]] = _
- public static String dateToString(Date date) {
- return dateFormat.format(date);
- }
}
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/utils/alert/ims/MonitorAlertUtils.scala
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/utils/alert/ims/MonitorAlertUtils.scala
index 798ed62c95..a0e94abc04 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/utils/alert/ims/MonitorAlertUtils.scala
+++
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/utils/alert/ims/MonitorAlertUtils.scala
@@ -120,7 +120,10 @@ object MonitorAlertUtils extends Logging {
val eccReceivers = {
val set: util.Set[String] = new util.HashSet[String]
- if (StringUtils.isNotBlank(data.eccReceivers)) {
+ if (
+ StringUtils
+ .isNotBlank(data.eccReceivers) &&
(!data.eccReceivers.equals("$eccAlertUser"))
+ ) {
data.eccReceivers.split(",").map(r => set.add(r))
}
if (!repaceParams.containsKey("$eccAlertUser")) {
@@ -130,10 +133,7 @@ object MonitorAlertUtils extends Logging {
}
})
} else {
- set.add(repaceParams.get("$eccAlertUser"))
- }
- if (StringUtils.isNotBlank(repaceParams.get("eccReceiver"))) {
- repaceParams.get("eccReceiver").split(",").map(r => set.add(r))
+ repaceParams.get("$eccAlertUser").split(",").map(r => set.add(r))
}
set
}
@@ -141,14 +141,25 @@ object MonitorAlertUtils extends Logging {
val subSystemId = repaceParams.getOrDefault("subSystemId",
Constants.ALERT_SUB_SYSTEM_ID)
val alertTitle = "集群[" + Constants.LINKIS_CLUSTER_NAME + "]" +
repaceParams
.getOrDefault("title", data.alertTitle)
- val alertLevel =
- if (StringUtils.isNotBlank(data.alertLevel) &&
StringUtils.isNumeric(data.alertLevel)) {
- ImsAlertLevel.withName(repaceParams.getOrDefault("monitorLevel",
data.alertLevel))
- } else {
+ val alertLevel = {
+ if (
+ repaceParams.containsKey("$alterLevel") && StringUtils.isNumeric(
+ repaceParams.get("$alterLevel")
+ )
+ ) {
ImsAlertLevel.withName(
- repaceParams.getOrDefault("monitorLevel",
ImsAlertLevel.WARN.toString)
+ repaceParams.getOrDefault("monitorLevel",
repaceParams.get("$alterLevel"))
)
+ } else {
+ if (StringUtils.isNotBlank(data.alertLevel) &&
StringUtils.isNumeric(data.alertLevel)) {
+ ImsAlertLevel.withName(repaceParams.getOrDefault("monitorLevel",
data.alertLevel))
+ } else {
+ ImsAlertLevel.withName(
+ repaceParams.getOrDefault("monitorLevel",
ImsAlertLevel.WARN.toString)
+ )
+ }
}
+ }
val alertDesc = Utils.tryAndWarn(
ImsAlertDesc(
diff --git
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/utils/job/JohistoryUtils.scala
similarity index 58%
copy from
linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
copy to
linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/utils/job/JohistoryUtils.scala
index aa73471c49..deedf5847b 100644
---
a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java
+++
b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/utils/job/JohistoryUtils.scala
@@ -15,17 +15,29 @@
* limitations under the License.
*/
-package org.apache.linkis.monitor.jobhistory;
+package org.apache.linkis.monitor.utils.job
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Date;
+import org.apache.linkis.monitor.core.pac.ScannedData
-public class QueryUtils {
+import java.util
- private static DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss.SSS");
+import scala.collection.JavaConverters._
- public static String dateToString(Date date) {
- return dateFormat.format(date);
+object JohistoryUtils {
+
+ def getJobhistorySanData(data: util.List[ScannedData]): List[Any] = {
+ if (data == null) {
+ return List.empty[ScannedData]
+ }
+ val scalaData = data.asScala
+ val result = scalaData.flatMap { dataList =>
+ if (dataList != null && dataList.getData() != null) {
+ dataList.getData().asScala
+ } else {
+ List.empty[ScannedData]
+ }
+ }.toList
+ result
}
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]