This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new bd95eebd85 [INLONG-11387][Manager] Support multi-threaded processing
agent installation (#11388)
bd95eebd85 is described below
commit bd95eebd8548d5f2885b9b8b8fba818e0077cf7c
Author: fuweng11 <[email protected]>
AuthorDate: Tue Oct 22 21:01:24 2024 +0800
[INLONG-11387][Manager] Support multi-threaded processing agent
installation (#11388)
---
.../mappers/InlongClusterNodeEntityMapper.xml | 3 +
.../service/cluster/InlongClusterServiceImpl.java | 66 ++++++++++++++--------
.../service/heartbeat/HeartbeatManager.java | 9 +++
3 files changed, 53 insertions(+), 25 deletions(-)
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
index 7604362106..2b4be69d35 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
@@ -130,6 +130,9 @@
or port like CONCAT('%', #{keyword}, '%')
)
</if>
+ <if test="status != null and status != ''">
+ and status = #{status, jdbcType=INTEGER}
+ </if>
</where>
order by modify_time desc
</select>
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index 2b18ba582f..e2be48a4e0 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -117,14 +117,19 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import static
org.apache.inlong.manager.common.consts.InlongConstants.ALIVE_TIME_MS;
+import static
org.apache.inlong.manager.common.consts.InlongConstants.CORE_POOL_SIZE;
+import static
org.apache.inlong.manager.common.consts.InlongConstants.MAX_POOL_SIZE;
+import static
org.apache.inlong.manager.common.consts.InlongConstants.QUEUE_SIZE;
import static
org.apache.inlong.manager.pojo.cluster.InlongClusterTagExtParam.packExtParams;
import static
org.apache.inlong.manager.pojo.cluster.InlongClusterTagExtParam.unpackExtParams;
@@ -137,11 +142,11 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
private static final Logger LOGGER =
LoggerFactory.getLogger(InlongClusterServiceImpl.class);
private static final Gson GSON = new Gson();
private final ExecutorService executorService = new ThreadPoolExecutor(
- 5,
- 10,
- 10L,
- TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(100),
+ CORE_POOL_SIZE,
+ MAX_POOL_SIZE,
+ ALIVE_TIME_MS,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(QUEUE_SIZE),
new
ThreadFactoryBuilder().setNameFormat("agent-install-%s").build(),
new CallerRunsPolicy());
private final LinkedBlockingQueue<ClusterNodeRequest>
pendingInstallRequests = new LinkedBlockingQueue<>();
@@ -179,11 +184,18 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
@PostConstruct
private void startInstallTask() {
- InstallTaskRunnable installTaskRunnable = new InstallTaskRunnable();
- this.executorService.execute(installTaskRunnable);
+ processInstall();
+ setReloadTimer();
LOGGER.info("install task started successfully");
}
+ private void setReloadTimer() {
+ ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
+ long reloadInterval = 60000L;
+ executorService.scheduleWithFixedDelay(this::processInstall,
reloadInterval, reloadInterval,
+ TimeUnit.MILLISECONDS);
+ }
+
@Override
public Integer saveTag(ClusterTagRequest request, String operator) {
LOGGER.debug("begin to save cluster tag {}", request);
@@ -731,6 +743,7 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
Integer id = instance.saveOpt(request, operator);
if (request.getIsInstall()) {
request.setId(id);
+ clusterNodeMapper.updateOperateLogById(id,
NodeStatus.INSTALLING.getStatus(), "begin to install");
pendingInstallRequests.add(request);
}
return id;
@@ -881,6 +894,8 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
if (request.getIsInstall()) {
// when reinstall set install to false
request.setIsInstall(false);
+ clusterNodeMapper.updateOperateLogById(request.getId(),
NodeStatus.INSTALLING.getStatus(),
+ "begin to re install");
pendingInstallRequests.add(request);
}
return true;
@@ -1434,34 +1449,35 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
private class InstallTaskRunnable implements Runnable {
- private static final int WAIT_SECONDS = 60 * 1000;
+ private ClusterNodeRequest request;
- @Override
- public void run() {
- while (true) {
- try {
- processInstall();
- Thread.sleep(WAIT_SECONDS);
- } catch (Exception e) {
- LOGGER.error("exception occurred when install", e);
- }
- }
+ public InstallTaskRunnable(ClusterNodeRequest request) {
+ this.request = request;
}
- @Transactional(rollbackFor = Throwable.class)
- public void processInstall() {
- if (pendingInstallRequests.isEmpty()) {
+ @Override
+ public void run() {
+ if (request == null) {
return;
}
- ClusterNodeRequest request = pendingInstallRequests.poll();
- InlongClusterNodeInstallOperator clusterNodeInstallOperator =
clusterNodeInstallOperatorFactory.getInstance(
- request.getType());
+ InlongClusterNodeInstallOperator clusterNodeInstallOperator =
+
clusterNodeInstallOperatorFactory.getInstance(request.getType());
if (request.getIsInstall()) {
clusterNodeInstallOperator.install(request,
request.getCurrentUser());
} else {
clusterNodeInstallOperator.reInstall(request,
request.getCurrentUser());
}
+ }
+ }
+ @Transactional(rollbackFor = Throwable.class)
+ public void processInstall() {
+ LOGGER.info("begin to process install task");
+ while (!pendingInstallRequests.isEmpty()) {
+ ClusterNodeRequest request = pendingInstallRequests.poll();
+ InstallTaskRunnable installTaskRunnable = new
InstallTaskRunnable(request);
+ executorService.execute(installTaskRunnable);
}
+ LOGGER.info("success to process install task");
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
index e8add054d9..fc51b92cdf 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.heartbeat;
+import org.apache.inlong.common.enums.ComponentTypeEnum;
import org.apache.inlong.common.enums.NodeSrvStatus;
import org.apache.inlong.common.heartbeat.AbstractHeartbeatManager;
import org.apache.inlong.common.heartbeat.AddressInfo;
@@ -211,6 +212,10 @@ public class HeartbeatManager implements
AbstractHeartbeatManager {
} else {
heartbeatMsg.setProtocolType(protocolType);
}
+ if (Objects.equals(heartbeat.getComponentType(),
ComponentTypeEnum.Agent.getType())) {
+ heartbeatMsg.setProtocolType(null);
+ heartbeatMsg.setPort(null);
+ }
// uninstall node event
if
(NodeSrvStatus.SERVICE_UNINSTALL.equals(heartbeat.getNodeSrvStatus())) {
InlongClusterNodeEntity clusterNode =
getClusterNode(clusterInfo, heartbeatMsg);
@@ -296,6 +301,10 @@ public class HeartbeatManager implements
AbstractHeartbeatManager {
} else {
heartbeatMsg.setProtocolType(protocolType);
}
+ if (Objects.equals(heartbeat.getComponentType(),
ComponentTypeEnum.Agent.getType())) {
+ heartbeatMsg.setProtocolType(null);
+ heartbeatMsg.setPort(null);
+ }
InlongClusterNodeEntity clusterNode = getClusterNode(clusterInfo,
heartbeatMsg);
if (clusterNode == null) {
log.error("not found any cluster node by type={}, ip={},
port={}",