This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2f01025  [Fix-3299][dao&server] Fix 3299,when Json string parsing 
problem caused by non-standard json format. (#3552)
2f01025 is described below

commit 2f0102580268ff045e6042c3a691f1dee4c49962
Author: felix.wang <59079269+felix-thinkingd...@users.noreply.github.com>
AuthorDate: Wed Aug 19 16:27:39 2020 +0800

    [Fix-3299][dao&server] Fix 3299,when Json string parsing problem caused by 
non-standard json format. (#3552)
    
    * #3299  Json string parsing problem caused by non-standard json format.
    
    * #3299  Json string parsing problem caused by non-standard json format.
    
    * #3299  Json string parsing problem caused by non-standard json format. 
fix  code style
    
    * #3299  Json string parsing problem caused by non-standard json format. 
fix  code style
    
    Co-authored-by: wangjianda <fe...@thinkingdata.com>
---
 .../org/apache/dolphinscheduler/dao/AlertDao.java  |  81 ++++++++++-----
 .../server/utils/AlertManager.java                 | 109 +++++++++------------
 2 files changed, 104 insertions(+), 86 deletions(-)

diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
index 49b8c01..685d72c 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
@@ -14,15 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dolphinscheduler.dao;
 
+package org.apache.dolphinscheduler.dao;
 
-import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
 import org.apache.dolphinscheduler.common.enums.AlertStatus;
 import org.apache.dolphinscheduler.common.enums.AlertType;
 import org.apache.dolphinscheduler.common.enums.ShowType;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.dao.datasource.ConnectionFactory;
 import org.apache.dolphinscheduler.dao.entity.Alert;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
@@ -30,13 +29,17 @@ import 
org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.AlertMapper;
 import org.apache.dolphinscheduler.dao.mapper.UserAlertGroupMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.LinkedHashMap;
 import java.util.List;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
 @Component
 public class AlertDao extends AbstractBaseDao {
 
@@ -56,21 +59,23 @@ public class AlertDao extends AbstractBaseDao {
 
     /**
      * insert alert
+     *
      * @param alert alert
      * @return add alert result
      */
-    public int addAlert(Alert alert){
+    public int addAlert(Alert alert) {
         return alertMapper.insert(alert);
     }
 
     /**
      * update alert
+     *
      * @param alertStatus alertStatus
      * @param log log
      * @param id id
      * @return update alert result
      */
-    public int updateAlert(AlertStatus alertStatus,String log,int id){
+    public int updateAlert(AlertStatus alertStatus, String log, int id) {
         Alert alert = alertMapper.selectById(id);
         alert.setAlertStatus(alertStatus);
         alert.setUpdateTime(new Date());
@@ -80,46 +85,61 @@ public class AlertDao extends AbstractBaseDao {
 
     /**
      * query user list by alert group id
+     *
      * @param alerGroupId alerGroupId
      * @return user list
      */
-    public List<User> queryUserByAlertGroupId(int alerGroupId){
+    public List<User> queryUserByAlertGroupId(int alerGroupId) {
 
         return userAlertGroupMapper.listUserByAlertgroupId(alerGroupId);
     }
 
     /**
      * MasterServer or WorkerServer stoped
+     *
      * @param alertgroupId alertgroupId
      * @param host host
      * @param serverType serverType
      */
-    public void sendServerStopedAlert(int alertgroupId,String host,String 
serverType){
+    public void sendServerStopedAlert(int alertgroupId, String host, String 
serverType) {
         Alert alert = new Alert();
-        String content = 
String.format("[{'type':'%s','host':'%s','event':'server down','warning 
level':'serious'}]",
-                serverType, host);
+        List<LinkedHashMap> serverStopList = new ArrayList<>(1);
+        LinkedHashMap<String, String> serverStopedMap = new LinkedHashMap();
+        serverStopedMap.put("type", serverType);
+        serverStopedMap.put("host", host);
+        serverStopedMap.put("event", "server down");
+        serverStopedMap.put("warning level", "serious");
+        serverStopList.add(serverStopedMap);
+        String content = JSONUtils.toJsonString(serverStopList);
         alert.setTitle("Fault tolerance warning");
         saveTaskTimeoutAlert(alert, content, alertgroupId, null, null);
     }
 
     /**
      * process time out alert
+     *
      * @param processInstance processInstance
      * @param processDefinition processDefinition
      */
-    public void sendProcessTimeoutAlert(ProcessInstance processInstance, 
ProcessDefinition processDefinition){
+    public void sendProcessTimeoutAlert(ProcessInstance processInstance, 
ProcessDefinition processDefinition) {
         int alertgroupId = processInstance.getWarningGroupId();
         String receivers = processDefinition.getReceivers();
         String receiversCc = processDefinition.getReceiversCc();
         Alert alert = new Alert();
-        String content = 
String.format("[{'id':'%d','name':'%s','event':'timeout','warnLevel':'middle'}]",
-                processInstance.getId(), processInstance.getName());
+        List<LinkedHashMap> processTimeoutList = new ArrayList<>(1);
+        LinkedHashMap<String, String> processTimeoutMap = new LinkedHashMap();
+        processTimeoutMap.put("id", String.valueOf(processInstance.getId()));
+        processTimeoutMap.put("name", processInstance.getName());
+        processTimeoutMap.put("event", "timeout");
+        processTimeoutMap.put("warnLevel", "middle");
+        processTimeoutList.add(processTimeoutMap);
+        String content = JSONUtils.toJsonString(processTimeoutList);
         alert.setTitle("Process Timeout Warn");
         saveTaskTimeoutAlert(alert, content, alertgroupId, receivers, 
receiversCc);
     }
 
-    private void  saveTaskTimeoutAlert(Alert alert, String content, int 
alertgroupId,
-                                    String receivers,  String receiversCc){
+    private void saveTaskTimeoutAlert(Alert alert, String content, int 
alertgroupId,
+                                      String receivers, String receiversCc) {
         alert.setShowType(ShowType.TABLE);
         alert.setContent(content);
         alert.setAlertType(AlertType.EMAIL);
@@ -135,9 +155,9 @@ public class AlertDao extends AbstractBaseDao {
         alertMapper.insert(alert);
     }
 
-
     /**
      * task timeout warn
+     *
      * @param alertgroupId alertgroupId
      * @param receivers receivers
      * @param receiversCc receiversCc
@@ -146,34 +166,45 @@ public class AlertDao extends AbstractBaseDao {
      * @param taskId taskId
      * @param taskName taskName
      */
-    public void sendTaskTimeoutAlert(int alertgroupId,String receivers,String 
receiversCc, int processInstanceId,
-                                     String processInstanceName, int 
taskId,String taskName){
+    public void sendTaskTimeoutAlert(int alertgroupId, String receivers, 
String receiversCc, int processInstanceId,
+                                     String processInstanceName, int taskId, 
String taskName) {
         Alert alert = new Alert();
-        String content = String.format("[{'process instance id':'%d','task 
name':'%s','task id':'%d','task name':'%s'," +
-                        "'event':'timeout','warnLevel':'middle'}]", 
processInstanceId, processInstanceName, taskId, taskName);
+        List<LinkedHashMap> taskTimeoutList = new ArrayList<>(1);
+        LinkedHashMap<String, String> taskTimeoutMap = new LinkedHashMap();
+        taskTimeoutMap.put("process instance id", 
String.valueOf(processInstanceId));
+        taskTimeoutMap.put("process name", processInstanceName);
+        taskTimeoutMap.put("task id", String.valueOf(taskId));
+        taskTimeoutMap.put("task name", taskName);
+        taskTimeoutMap.put("event", "timeout");
+        taskTimeoutMap.put("warnLevel", "middle");
+        taskTimeoutList.add(taskTimeoutMap);
+        String content = JSONUtils.toJsonString(taskTimeoutList);
         alert.setTitle("Task Timeout Warn");
         saveTaskTimeoutAlert(alert, content, alertgroupId, receivers, 
receiversCc);
     }
 
     /**
      * list the alert information of waiting to be executed
+     *
      * @return alert list
      */
-    public List<Alert> listWaitExecutionAlert(){
+    public List<Alert> listWaitExecutionAlert() {
         return alertMapper.listAlertByStatus(AlertStatus.WAIT_EXECUTION);
     }
 
     /**
      * list user information by alert group id
+     *
      * @param alertgroupId alertgroupId
      * @return user list
      */
-    public List<User> listUserByAlertgroupId(int alertgroupId){
+    public List<User> listUserByAlertgroupId(int alertgroupId) {
         return userAlertGroupMapper.listUserByAlertgroupId(alertgroupId);
     }
 
     /**
      * for test
+     *
      * @return AlertMapper
      */
     public AlertMapper getAlertMapper() {
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java
index 49ec9d3..08c6022 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java
@@ -14,29 +14,30 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dolphinscheduler.server.utils;
 
+package org.apache.dolphinscheduler.server.utils;
 
 import org.apache.dolphinscheduler.common.enums.AlertType;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.ShowType;
 import org.apache.dolphinscheduler.common.enums.WarningType;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.*;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.AlertDao;
 import org.apache.dolphinscheduler.dao.DaoFactory;
 import org.apache.dolphinscheduler.dao.entity.Alert;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.LinkedHashMap;
 import java.util.List;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * alert manager
  */
@@ -50,8 +51,7 @@ public class AlertManager {
     /**
      * alert dao
      */
-    private AlertDao alertDao = DaoFactory.getDaoInstance(AlertDao.class);
-
+    private final AlertDao alertDao = 
DaoFactory.getDaoInstance(AlertDao.class);
 
     /**
      * command type convert chinese
@@ -87,49 +87,36 @@ public class AlertManager {
     }
 
     /**
-     * process instance format
-     */
-    private static final String PROCESS_INSTANCE_FORMAT =
-            "\"id:%d\"," +
-            "\"name:%s\"," +
-            "\"job type: %s\"," +
-            "\"state: %s\"," +
-            "\"recovery:%s\"," +
-            "\"run time: %d\"," +
-            "\"start time: %s\"," +
-            "\"end time: %s\"," +
-            "\"host: %s\"" ;
-
-    /**
      * get process instance content
-     * @param processInstance   process instance
-     * @param taskInstances     task instance list
+     *
+     * @param processInstance process instance
+     * @param taskInstances task instance list
      * @return process instance format content
      */
     public String getContentProcessInstance(ProcessInstance processInstance,
-                                            List<TaskInstance> taskInstances){
+                                            List<TaskInstance> taskInstances) {
 
         String res = "";
-        if(processInstance.getState().typeIsSuccess()){
-            res = String.format(PROCESS_INSTANCE_FORMAT,
-                    processInstance.getId(),
-                    processInstance.getName(),
-                    getCommandCnName(processInstance.getCommandType()),
-                    processInstance.getState().toString(),
-                    processInstance.getRecovery().toString(),
-                    processInstance.getRunTimes(),
-                    DateUtils.dateToString(processInstance.getStartTime()),
-                    DateUtils.dateToString(processInstance.getEndTime()),
-                    processInstance.getHost()
-
-            );
-            res = "[" + res + "]";
-        }else if(processInstance.getState().typeIsFailure()){
+        if (processInstance.getState().typeIsSuccess()) {
+            List<LinkedHashMap> successTaskList = new ArrayList<>(1);
+            LinkedHashMap<String, String> successTaskMap = new LinkedHashMap();
+            successTaskMap.put("id", String.valueOf(processInstance.getId()));
+            successTaskMap.put("name", processInstance.getName());
+            successTaskMap.put("job type", 
getCommandCnName(processInstance.getCommandType()));
+            successTaskMap.put("state", processInstance.getState().toString());
+            successTaskMap.put("recovery", 
processInstance.getRecovery().toString());
+            successTaskMap.put("run time", 
String.valueOf(processInstance.getRunTimes()));
+            successTaskMap.put("start time", 
DateUtils.dateToString(processInstance.getStartTime()));
+            successTaskMap.put("end time", 
DateUtils.dateToString(processInstance.getEndTime()));
+            successTaskMap.put("host", processInstance.getHost());
+            successTaskList.add(successTaskMap);
+            res = JSONUtils.toJsonString(successTaskList);
+        } else if (processInstance.getState().typeIsFailure()) {
 
             List<LinkedHashMap> failedTaskList = new ArrayList<>();
 
-            for(TaskInstance task : taskInstances){
-                if(task.getState().typeIsSuccess()){
+            for (TaskInstance task : taskInstances) {
+                if (task.getState().typeIsSuccess()) {
                     continue;
                 }
                 LinkedHashMap<String, String> failedTaskMap = new 
LinkedHashMap();
@@ -154,15 +141,15 @@ public class AlertManager {
     /**
      * getting worker fault tolerant content
      *
-     * @param processInstance   process instance
+     * @param processInstance process instance
      * @param toleranceTaskList tolerance task list
      * @return worker tolerance content
      */
-    private String getWorkerToleranceContent(ProcessInstance processInstance, 
List<TaskInstance> toleranceTaskList){
+    private String getWorkerToleranceContent(ProcessInstance processInstance, 
List<TaskInstance> toleranceTaskList) {
 
-        List<LinkedHashMap<String, String>> toleranceTaskInstanceList =  new 
ArrayList<>();
+        List<LinkedHashMap<String, String>> toleranceTaskInstanceList = new 
ArrayList<>();
 
-        for(TaskInstance taskInstance: toleranceTaskList){
+        for (TaskInstance taskInstance : toleranceTaskList) {
             LinkedHashMap<String, String> toleranceWorkerContentMap = new 
LinkedHashMap();
             toleranceWorkerContentMap.put("process name", 
processInstance.getName());
             toleranceWorkerContentMap.put("task name", taskInstance.getName());
@@ -176,11 +163,11 @@ public class AlertManager {
     /**
      * send worker alert fault tolerance
      *
-     * @param processInstance   process instance
+     * @param processInstance process instance
      * @param toleranceTaskList tolerance task list
      */
-    public void sendAlertWorkerToleranceFault(ProcessInstance processInstance, 
List<TaskInstance> toleranceTaskList){
-        try{
+    public void sendAlertWorkerToleranceFault(ProcessInstance processInstance, 
List<TaskInstance> toleranceTaskList) {
+        try {
             Alert alert = new Alert();
             alert.setTitle("worker fault tolerance");
             alert.setShowType(ShowType.TABLE);
@@ -188,13 +175,13 @@ public class AlertManager {
             alert.setContent(content);
             alert.setAlertType(AlertType.EMAIL);
             alert.setCreateTime(new Date());
-            alert.setAlertGroupId(processInstance.getWarningGroupId() == null 
? 1:processInstance.getWarningGroupId());
+            alert.setAlertGroupId(processInstance.getWarningGroupId() == null 
? 1 : processInstance.getWarningGroupId());
             
alert.setReceivers(processInstance.getProcessDefinition().getReceivers());
             
alert.setReceiversCc(processInstance.getProcessDefinition().getReceiversCc());
             alertDao.addAlert(alert);
             logger.info("add alert to db , alert : {}", alert.toString());
 
-        }catch (Exception e){
+        } catch (Exception e) {
             logger.error("send alert failed:{} ", e.getMessage());
         }
 
@@ -202,40 +189,40 @@ public class AlertManager {
 
     /**
      * send process instance alert
-     * @param processInstance   process instance
-     * @param taskInstances     task instance list
+     *
+     * @param processInstance process instance
+     * @param taskInstances task instance list
      */
     public void sendAlertProcessInstance(ProcessInstance processInstance,
-                                         List<TaskInstance> taskInstances){
+                                         List<TaskInstance> taskInstances) {
 
         boolean sendWarnning = false;
         WarningType warningType = processInstance.getWarningType();
-        switch (warningType){
+        switch (warningType) {
             case ALL:
-                if(processInstance.getState().typeIsFinished()){
+                if (processInstance.getState().typeIsFinished()) {
                     sendWarnning = true;
                 }
                 break;
             case SUCCESS:
-                if(processInstance.getState().typeIsSuccess()){
+                if (processInstance.getState().typeIsSuccess()) {
                     sendWarnning = true;
                 }
                 break;
             case FAILURE:
-                if(processInstance.getState().typeIsFailure()){
+                if (processInstance.getState().typeIsFailure()) {
                     sendWarnning = true;
                 }
                 break;
-                default:
+            default:
         }
-        if(!sendWarnning){
+        if (!sendWarnning) {
             return;
         }
         Alert alert = new Alert();
 
-
         String cmdName = getCommandCnName(processInstance.getCommandType());
-        String success = processInstance.getState().typeIsSuccess() ? 
"success" :"failed";
+        String success = processInstance.getState().typeIsSuccess() ? 
"success" : "failed";
         alert.setTitle(cmdName + " " + success);
         ShowType showType = processInstance.getState().typeIsSuccess() ? 
ShowType.TEXT : ShowType.TABLE;
         alert.setShowType(showType);
@@ -254,7 +241,7 @@ public class AlertManager {
     /**
      * send process timeout alert
      *
-     * @param processInstance   process instance
+     * @param processInstance process instance
      * @param processDefinition process definition
      */
     public void sendProcessTimeoutAlert(ProcessInstance processInstance, 
ProcessDefinition processDefinition) {

Reply via email to