This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 52ed84f4a7 [INLONG-9475][Manager] Support setting dataNode when
configuring streamSource for MySQL (#9486)
52ed84f4a7 is described below
commit 52ed84f4a7a8b82c20920e27714b6c11b51d90c3
Author: fuweng11 <[email protected]>
AuthorDate: Wed Dec 20 17:42:01 2023 +0800
[INLONG-9475][Manager] Support setting dataNode when configuring
streamSource for MySQL (#9486)
---
.../client/api/inner/ClientFactoryTest.java | 2 ++
.../service/core/impl/AgentServiceImpl.java | 7 +++-
.../service/source/AbstractSourceOperator.java | 5 +++
.../service/source/StreamSourceOperator.java | 2 ++
.../source/binlog/BinlogSourceOperator.java | 39 ++++++++++++++++++++++
.../service/core/impl/AgentServiceTest.java | 1 +
.../service/source/StreamSourceServiceTest.java | 1 +
7 files changed, 56 insertions(+), 1 deletion(-)
diff --git
a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
index fdfcff0ef4..be4aa14413 100644
---
a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
+++
b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
@@ -236,6 +236,7 @@ class ClientFactoryTest {
.inlongGroupId("1")
.inlongStreamId("2")
.sourceType(SourceType.MYSQL_BINLOG)
+ .hostname("127.0.0.1")
.status(1)
.user("root")
.password("pwd")
@@ -560,6 +561,7 @@ class ClientFactoryTest {
MySQLBinlogSource.builder()
.id(2)
.sourceType(SourceType.MYSQL_BINLOG)
+ .hostname("127.0.0.1")
.user("user")
.password("pwd")
.build(),
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 5d6fb1101e..83c858971d 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -57,7 +57,9 @@ import
org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
import org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.core.AgentService;
+import org.apache.inlong.manager.service.source.SourceOperatorFactory;
import org.apache.inlong.manager.service.source.SourceSnapshotOperator;
+import org.apache.inlong.manager.service.source.StreamSourceOperator;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
@@ -152,6 +154,8 @@ public class AgentServiceImpl implements AgentService {
private InlongClusterEntityMapper clusterMapper;
@Autowired
private InlongClusterNodeEntityMapper clusterNodeMapper;
+ @Autowired
+ private SourceOperatorFactory operatorFactory;
/**
* Start the update task
@@ -593,7 +597,8 @@ public class AgentServiceImpl implements AgentService {
InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
InlongStreamEntity streamEntity =
streamMapper.selectByIdentifier(groupId, streamId);
- String extParams = entity.getExtParams();
+ StreamSourceOperator sourceOperator =
operatorFactory.getInstance(entity.getSourceType());
+ String extParams = sourceOperator.getExtParams(entity);
if (groupEntity != null && streamEntity != null) {
dataConfig.setState(
SourceStatus.NORMAL_STATUS_SET.contains(SourceStatus.forCode(entity.getStatus()))
? 1 : 0);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index bb54145a06..935a688d82 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -78,6 +78,11 @@ public abstract class AbstractSourceOperator implements
StreamSourceOperator {
*/
protected abstract void setTargetEntity(SourceRequest request,
StreamSourceEntity targetEntity);
+ @Override
+ public String getExtParams(StreamSourceEntity sourceEntity) {
+ return sourceEntity.getExtParams();
+ }
+
@Override
@Transactional(rollbackFor = Throwable.class)
public Integer saveOpt(SourceRequest request, Integer groupStatus, String
operator) {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
index be1768b599..5e7168879b 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
@@ -43,6 +43,8 @@ public interface StreamSourceOperator {
*/
Boolean accept(String sourceType);
+ String getExtParams(StreamSourceEntity sourceEntity);
+
/**
* Save the source info.
*
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperator.java
index 2a42b24b4f..5342d2e56a 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperator.java
@@ -17,24 +17,31 @@
package org.apache.inlong.manager.service.source.binlog;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.pojo.node.mysql.MySQLDataNodeInfo;
import org.apache.inlong.manager.pojo.source.SourceRequest;
import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSource;
import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSourceDTO;
import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSourceRequest;
import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
import org.apache.inlong.manager.service.source.AbstractSourceOperator;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
+import java.util.Objects;
/**
* Binlog source operator
@@ -42,6 +49,8 @@ import java.util.List;
@Service
public class BinlogSourceOperator extends AbstractSourceOperator {
+ @Autowired
+ protected DataNodeOperateHelper dataNodeHelper;
@Autowired
private ObjectMapper objectMapper;
@@ -55,6 +64,23 @@ public class BinlogSourceOperator extends
AbstractSourceOperator {
return SourceType.MYSQL_BINLOG;
}
+ @Override
+ public String getExtParams(StreamSourceEntity sourceEntity) {
+ MySQLBinlogSourceDTO mySQLBinlogSourceDTO =
JsonUtils.parseObject(sourceEntity.getExtParams(),
+ MySQLBinlogSourceDTO.class);
+ if (Objects.nonNull(mySQLBinlogSourceDTO) &&
StringUtils.isBlank(mySQLBinlogSourceDTO.getHostname())) {
+ MySQLDataNodeInfo dataNodeInfo = (MySQLDataNodeInfo)
dataNodeHelper.getDataNodeInfo(
+ sourceEntity.getDataNodeName(), DataNodeType.MYSQL);
+ CommonBeanUtils.copyProperties(dataNodeInfo, mySQLBinlogSourceDTO,
true);
+ mySQLBinlogSourceDTO.setUser(dataNodeInfo.getUsername());
+ mySQLBinlogSourceDTO.setPassword(dataNodeInfo.getToken());
+
mySQLBinlogSourceDTO.setHostname(dataNodeInfo.getUrl().split(InlongConstants.COLON)[0]);
+
mySQLBinlogSourceDTO.setPort(Integer.valueOf(dataNodeInfo.getUrl().split(InlongConstants.COLON)[1]));
+ return JsonUtils.toJsonString(mySQLBinlogSourceDTO);
+ }
+ return sourceEntity.getExtParams();
+ }
+
@Override
protected void setTargetEntity(SourceRequest request, StreamSourceEntity
targetEntity) {
MySQLBinlogSourceRequest sourceRequest = (MySQLBinlogSourceRequest)
request;
@@ -76,6 +102,19 @@ public class BinlogSourceOperator extends
AbstractSourceOperator {
}
MySQLBinlogSourceDTO dto =
MySQLBinlogSourceDTO.getFromJson(entity.getExtParams());
+ if (StringUtils.isBlank(dto.getHostname())) {
+ if (StringUtils.isBlank(entity.getDataNodeName())) {
+ throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+ "mysql url and data node is blank");
+ }
+ MySQLDataNodeInfo dataNodeInfo = (MySQLDataNodeInfo)
dataNodeHelper.getDataNodeInfo(
+ entity.getDataNodeName(), DataNodeType.MYSQL);
+ CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
+ dto.setUser(dataNodeInfo.getUsername());
+ dto.setPassword(dataNodeInfo.getToken());
+
dto.setHostname(dataNodeInfo.getUrl().split(InlongConstants.COLON)[0]);
+
dto.setPort(Integer.valueOf(dataNodeInfo.getUrl().split(InlongConstants.COLON)[1]));
+ }
CommonBeanUtils.copyProperties(entity, source, true);
CommonBeanUtils.copyProperties(dto, source, true);
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
index 4cdd5b9c66..70034803e8 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
@@ -105,6 +105,7 @@ class AgentServiceTest extends ServiceBaseTest {
sourceInfo.setInlongStreamId(GLOBAL_STREAM_ID);
sourceInfo.setSourceType(SourceType.MYSQL_BINLOG);
sourceInfo.setSourceName("binlog_source_in_agent_service_test");
+ sourceInfo.setHostname("127.0.0.1");
return sourceService.save(sourceInfo, GLOBAL_OPERATOR);
}
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/StreamSourceServiceTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/StreamSourceServiceTest.java
index 4d343a6dd8..45c41eb4b8 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/StreamSourceServiceTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/StreamSourceServiceTest.java
@@ -54,6 +54,7 @@ public class StreamSourceServiceTest extends ServiceBaseTest {
String sourceName = "stream_source_service_test";
sourceInfo.setSourceName(sourceName);
sourceInfo.setSourceType(SourceType.MYSQL_BINLOG);
+ sourceInfo.setHostname("127.0.0.1");
Map<String, Object> properties = Maps.newLinkedHashMap();
properties.put("append-mode", "true");
sourceInfo.setProperties(properties);