EMsnap commented on code in PR #9908:
URL: https://github.com/apache/inlong/pull/9908#discussion_r1545930338


##########
inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java:
##########
@@ -216,6 +218,147 @@ private boolean updateModules(List<ModuleConfig> 
managerModuleList) {
         managerModuleList.forEach((moduleConfig) -> {
             modulesFromManager.put(moduleConfig.getId(), moduleConfig);
         });
+        traverseManagerModulesToLocal(modulesFromManager);
+        traverseLocalModulesToManager(modulesFromManager);
+        return true;
+    }
+
+    private void traverseManagerModulesToLocal(Map<Integer, ModuleConfig> 
modulesFromManager) {
+        modulesFromManager.values().forEach((managerModule) -> {
+            ModuleConfig localModule = 
currentModules.get(managerModule.getId());
+            if (localModule == null) {
+                LOGGER.info("traverseManagerModulesToLocal module {} {} {} not 
found in local, add it",
+                        managerModule.getId(), managerModule.getName(), 
managerModule.getVersion());
+                addModule(managerModule);
+            } else {
+                if (managerModule.getMd5().equals(localModule.getMd5())) {
+                    LOGGER.info("traverseManagerModulesToLocal module {} {} {} 
md5 no change, do nothing",
+                            localModule.getId(), localModule.getName(), 
localModule.getVersion());
+                } else {
+                    LOGGER.info("traverseManagerModulesToLocal module {} {} {} 
md5 changed, update it",
+                            localModule.getId(), localModule.getName(), 
localModule.getVersion());
+                    updateModule(localModule, managerModule);
+                }
+            }
+        });
+    }
+
+    private void traverseLocalModulesToManager(Map<Integer, ModuleConfig> 
modulesFromManager) {
+        currentModules.values().forEach((localModule) -> {
+            ModuleConfig managerModule = 
modulesFromManager.get(localModule.getId());
+            if (managerModule == null) {
+                LOGGER.info("traverseLocalModulesToManager module {} {} {} not 
found in local, delete it",
+                        localModule.getId(), localModule.getName(), 
localModule.getVersion());
+                deleteModule(localModule);
+            }
+        });
+    }
+
+    private void addModule(ModuleConfig module) {
+        LOGGER.info("add module {} start", module.getName());
+        addAndSaveModuleConfig(module);
+        if (!downloadModule(module)) {
+            LOGGER.error("add module {} but download failed", 
module.getName());
+            return;
+        }
+        saveModuleState(module.getId(), ModuleStateEnum.DOWNLOADED);
+        installModule(module);
+        saveModuleState(module.getId(), ModuleStateEnum.INSTALLED);
+        startModule(module);
+        LOGGER.info("add module {} end", module.getId());
+    }
+
+    private void deleteModule(ModuleConfig module) {
+        LOGGER.info("delete module {} start", module.getId());
+        stopModule(module);
+        uninstallModule(module);
+        deleteAndSaveModuleConfig(module);
+        LOGGER.info("delete module {} end", module.getId());
+    }
+
+    private void updateModule(ModuleConfig localModule, ModuleConfig 
managerModule) {
+        LOGGER.info("update module {} start", localModule.getId());
+        deleteModule(localModule);
+        addModule(managerModule);
+        LOGGER.info("update module {} end", localModule.getId());
+    }
+
+    private void addAndSaveModuleConfig(ModuleConfig module) {
+        module.setState(ModuleStateEnum.NEW);
+        if (currentModules.containsKey(module.getId())) {
+            LOGGER.error("should not happen! module {} found! will force to 
replace it!", module.getId());
+        }
+        currentModules.put(module.getId(), module);
+        saveToLocalFile(confPath);
+    }
+
+    private void deleteAndSaveModuleConfig(ModuleConfig module) {
+        if (!currentModules.containsKey(module.getId())) {
+            LOGGER.error("should not happen! module {} not found!", 
module.getId());
+            return;
+        }
+        currentModules.remove(module.getId());
+        saveToLocalFile(confPath);
+    }
+
+    private boolean saveModuleState(Integer moduleId, ModuleStateEnum state) {
+        ModuleConfig module = currentModules.get(moduleId);
+        if (module == null) {
+            LOGGER.error("should not happen! module {} not found!", moduleId);
+            return false;
+        }
+        LOGGER.info("save module state to {} {}", moduleId, state);
+        module.setState(state);
+        saveToLocalFile(confPath);
+        return true;
+    }
+
+    private void installModule(ModuleConfig module) {
+        LOGGER.info("install module {} with cmd {}", module.getId(), 
module.getInstallCommand());
+        String ret = ExcuteLinux.exeCmd(module.getInstallCommand());
+        LOGGER.info("install module {} return {} ", module.getId(), ret);
+    }
+
+    private boolean startModule(ModuleConfig module) {
+        LOGGER.info("start module {} with cmd {}", module.getId(), 
module.getStartCommand());
+        for (int i = 0; i < module.getProcessesNum(); i++) {
+            String ret = ExcuteLinux.exeCmd(module.getStartCommand());
+            LOGGER.info("start [{}] module {} return {} ", i, module.getId(), 
ret);
+        }
+        if (isProcessAllStarted(module)) {
+            LOGGER.info("start module {} success", module.getId());
+            return true;
+        } else {
+            LOGGER.info("start module {} failed", module.getId());
+            return false;
+        }
+    }
+
+    private void stopModule(ModuleConfig module) {
+        LOGGER.info("stop module {} with cmd {}", module.getId(), 
module.getStopCommand());
+        String ret = ExcuteLinux.exeCmd(module.getStopCommand());
+        LOGGER.info("stop module {} return {} ", module.getId(), ret);
+    }
+
+    private void uninstallModule(ModuleConfig module) {
+        LOGGER.info("uninstall module {} with cmd {}", module.getId(), 
module.getUninstallCommand());
+        String ret = ExcuteLinux.exeCmd(module.getUninstallCommand());
+        LOGGER.info("uninstall module {} return {} ", module.getId(), ret);
+    }
+
+    private boolean isProcessAllStarted(ModuleConfig module) {
+        String ret = ExcuteLinux.exeCmd(module.getCheckCommand());
+        String[] processArray = ret.split("\n");
+        int cnt = 0;
+        for (int i = 0; i < processArray.length; i++) {
+            if (processArray[i].length() > 0) {
+                cnt++;
+            }
+        }
+        LOGGER.info("get module process num {} {}", module.getName(), cnt);
+        if (cnt < module.getProcessesNum()) {

Review Comment:
   return cnt >= module.getProcessesNum();



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to