This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 093333de40 [INLONG-9906][Agent] Add configuration comparison logic and
processing of comparison results (#9908)
093333de40 is described below
commit 093333de4084f2345e56d8c8646e72508e9a8503
Author: justinwwhuang <[email protected]>
AuthorDate: Mon Apr 1 15:36:02 2024 +0800
[INLONG-9906][Agent] Add configuration comparison logic and processing of
comparison results (#9908)
* [INLONG-9906][Agent] Add configuration comparison logic and processing of
comparison results
* [INLONG-9906][Agent] Modify based on comments
* Update
inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
Co-authored-by: AloysZhang <[email protected]>
---------
Co-authored-by: AloysZhang <[email protected]>
---
.../inlong/agent/installer/ModuleManager.java | 151 +++++++++++++++++++++
1 file changed, 151 insertions(+)
diff --git
a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
index bc7bf89e8c..00dfc9668d 100755
---
a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
+++
b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
@@ -22,10 +22,12 @@ import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.installer.conf.InstallerConfiguration;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.ExcuteLinux;
import org.apache.inlong.agent.utils.HttpManager;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.pojo.agent.installer.ConfigResult;
import org.apache.inlong.common.pojo.agent.installer.ModuleConfig;
+import org.apache.inlong.common.pojo.agent.installer.ModuleStateEnum;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
@@ -216,9 +218,158 @@ public class ModuleManager extends AbstractDaemon {
managerModuleList.forEach((moduleConfig) -> {
modulesFromManager.put(moduleConfig.getId(), moduleConfig);
});
+ traverseManagerModulesToLocal(modulesFromManager);
+ traverseLocalModulesToManager(modulesFromManager);
return true;
}
+ private void traverseManagerModulesToLocal(Map<Integer, ModuleConfig>
modulesFromManager) {
+ modulesFromManager.values().forEach((managerModule) -> {
+ ModuleConfig localModule =
currentModules.get(managerModule.getId());
+ if (localModule == null) {
+ LOGGER.info("traverseManagerModulesToLocal module {} {} {} not
found in local, add it",
+ managerModule.getId(), managerModule.getName(),
managerModule.getVersion());
+ addModule(managerModule);
+ } else {
+ if (managerModule.getMd5().equals(localModule.getMd5())) {
+ LOGGER.info("traverseManagerModulesToLocal module {} {} {}
md5 no change, do nothing",
+ localModule.getId(), localModule.getName(),
localModule.getVersion());
+ } else {
+ LOGGER.info("traverseManagerModulesToLocal module {} {} {}
md5 changed, update it",
+ localModule.getId(), localModule.getName(),
localModule.getVersion());
+ updateModule(localModule, managerModule);
+ }
+ }
+ });
+ }
+
+ private void traverseLocalModulesToManager(Map<Integer, ModuleConfig>
modulesFromManager) {
+ currentModules.values().forEach((localModule) -> {
+ ModuleConfig managerModule =
modulesFromManager.get(localModule.getId());
+ if (managerModule == null) {
+ LOGGER.info("traverseLocalModulesToManager module {} {} {} not
found in local, delete it",
+ localModule.getId(), localModule.getName(),
localModule.getVersion());
+ deleteModule(localModule);
+ }
+ });
+ }
+
+ private void addModule(ModuleConfig module) {
+ LOGGER.info("add module {} start", module.getName());
+ addAndSaveModuleConfig(module);
+ if (!downloadModule(module)) {
+ LOGGER.error("add module {} but download failed",
module.getName());
+ return;
+ }
+ saveModuleState(module.getId(), ModuleStateEnum.DOWNLOADED);
+ installModule(module);
+ saveModuleState(module.getId(), ModuleStateEnum.INSTALLED);
+ startModule(module);
+ LOGGER.info("add module {} end", module.getId());
+ }
+
+ private void deleteModule(ModuleConfig module) {
+ LOGGER.info("delete module {} start", module.getId());
+ stopModule(module);
+ uninstallModule(module);
+ deleteAndSaveModuleConfig(module);
+ LOGGER.info("delete module {} end", module.getId());
+ }
+
+ private void updateModule(ModuleConfig localModule, ModuleConfig
managerModule) {
+ LOGGER.info("update module {} start", localModule.getId());
+ if
(localModule.getPackageConfig().getMd5().equals(managerModule.getPackageConfig().getMd5()))
{
+ LOGGER.info("package md5 changed, will reinstall",
localModule.getId());
+ deleteModule(localModule);
+ addModule(managerModule);
+ } else {
+ LOGGER.info("package md5 no chang, will restart",
localModule.getId());
+ restartModule(localModule, managerModule);
+ }
+ LOGGER.info("update module {} end", localModule.getId());
+ }
+
+ private void addAndSaveModuleConfig(ModuleConfig module) {
+ module.setState(ModuleStateEnum.NEW);
+ if (currentModules.containsKey(module.getId())) {
+ LOGGER.error("should not happen! module {} found! will force to
replace it!", module.getId());
+ }
+ currentModules.put(module.getId(), module);
+ saveToLocalFile(confPath);
+ }
+
+ private void deleteAndSaveModuleConfig(ModuleConfig module) {
+ if (!currentModules.containsKey(module.getId())) {
+ LOGGER.error("should not happen! module {} not found!",
module.getId());
+ return;
+ }
+ currentModules.remove(module.getId());
+ saveToLocalFile(confPath);
+ }
+
+ private boolean saveModuleState(Integer moduleId, ModuleStateEnum state) {
+ ModuleConfig module = currentModules.get(moduleId);
+ if (module == null) {
+ LOGGER.error("should not happen! module {} not found!", moduleId);
+ return false;
+ }
+ module.setState(state);
+ saveToLocalFile(confPath);
+ LOGGER.info("save module state to {} {}", moduleId, state);
+ return true;
+ }
+
+ private void restartModule(ModuleConfig localModule, ModuleConfig
managerModule) {
+ stopModule(localModule);
+ startModule(managerModule);
+ }
+
+ private void installModule(ModuleConfig module) {
+ LOGGER.info("install module {} with cmd {}", module.getId(),
module.getInstallCommand());
+ String ret = ExcuteLinux.exeCmd(module.getInstallCommand());
+ LOGGER.info("install module {} return {} ", module.getId(), ret);
+ }
+
+ private boolean startModule(ModuleConfig module) {
+ LOGGER.info("start module {} with cmd {}", module.getId(),
module.getStartCommand());
+ for (int i = 0; i < module.getProcessesNum(); i++) {
+ String ret = ExcuteLinux.exeCmd(module.getStartCommand());
+ LOGGER.info("start [{}] module {} return {} ", i, module.getId(),
ret);
+ }
+ if (isProcessAllStarted(module)) {
+ LOGGER.info("start module {} success", module.getId());
+ return true;
+ } else {
+ LOGGER.info("start module {} failed", module.getId());
+ return false;
+ }
+ }
+
+ private void stopModule(ModuleConfig module) {
+ LOGGER.info("stop module {} with cmd {}", module.getId(),
module.getStopCommand());
+ String ret = ExcuteLinux.exeCmd(module.getStopCommand());
+ LOGGER.info("stop module {} return {} ", module.getId(), ret);
+ }
+
+ private void uninstallModule(ModuleConfig module) {
+ LOGGER.info("uninstall module {} with cmd {}", module.getId(),
module.getUninstallCommand());
+ String ret = ExcuteLinux.exeCmd(module.getUninstallCommand());
+ LOGGER.info("uninstall module {} return {} ", module.getId(), ret);
+ }
+
+ private boolean isProcessAllStarted(ModuleConfig module) {
+ String ret = ExcuteLinux.exeCmd(module.getCheckCommand());
+ String[] processArray = ret.split("\n");
+ int cnt = 0;
+ for (int i = 0; i < processArray.length; i++) {
+ if (processArray[i].length() > 0) {
+ cnt++;
+ }
+ }
+ LOGGER.info("get module process num {} {}", module.getName(), cnt);
+ return cnt >= module.getProcessesNum();
+ }
+
private boolean downloadModule(ModuleConfig module) {
LOGGER.info("download module {} begin with url {}", module.getId(),
module.getPackageConfig().getDownloadUrl());
try {