This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 52ed84f4a7 [INLONG-9475][Manager] Support setting dataNode when 
configuring streamSource for MySQL (#9486)
52ed84f4a7 is described below

commit 52ed84f4a7a8b82c20920e27714b6c11b51d90c3
Author: fuweng11 <[email protected]>
AuthorDate: Wed Dec 20 17:42:01 2023 +0800

    [INLONG-9475][Manager] Support setting dataNode when configuring 
streamSource for MySQL (#9486)
---
 .../client/api/inner/ClientFactoryTest.java        |  2 ++
 .../service/core/impl/AgentServiceImpl.java        |  7 +++-
 .../service/source/AbstractSourceOperator.java     |  5 +++
 .../service/source/StreamSourceOperator.java       |  2 ++
 .../source/binlog/BinlogSourceOperator.java        | 39 ++++++++++++++++++++++
 .../service/core/impl/AgentServiceTest.java        |  1 +
 .../service/source/StreamSourceServiceTest.java    |  1 +
 7 files changed, 56 insertions(+), 1 deletion(-)

diff --git 
a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
 
b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
index fdfcff0ef4..be4aa14413 100644
--- 
a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
+++ 
b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
@@ -236,6 +236,7 @@ class ClientFactoryTest {
                                                 .inlongGroupId("1")
                                                 .inlongStreamId("2")
                                                 
.sourceType(SourceType.MYSQL_BINLOG)
+                                                .hostname("127.0.0.1")
                                                 .status(1)
                                                 .user("root")
                                                 .password("pwd")
@@ -560,6 +561,7 @@ class ClientFactoryTest {
                 MySQLBinlogSource.builder()
                         .id(2)
                         .sourceType(SourceType.MYSQL_BINLOG)
+                        .hostname("127.0.0.1")
                         .user("user")
                         .password("pwd")
                         .build(),
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 5d6fb1101e..83c858971d 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -57,7 +57,9 @@ import 
org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
 import org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.service.core.AgentService;
+import org.apache.inlong.manager.service.source.SourceOperatorFactory;
 import org.apache.inlong.manager.service.source.SourceSnapshotOperator;
+import org.apache.inlong.manager.service.source.StreamSourceOperator;
 
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
@@ -152,6 +154,8 @@ public class AgentServiceImpl implements AgentService {
     private InlongClusterEntityMapper clusterMapper;
     @Autowired
     private InlongClusterNodeEntityMapper clusterNodeMapper;
+    @Autowired
+    private SourceOperatorFactory operatorFactory;
 
     /**
      * Start the update task
@@ -593,7 +597,8 @@ public class AgentServiceImpl implements AgentService {
 
         InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
         InlongStreamEntity streamEntity = 
streamMapper.selectByIdentifier(groupId, streamId);
-        String extParams = entity.getExtParams();
+        StreamSourceOperator sourceOperator = 
operatorFactory.getInstance(entity.getSourceType());
+        String extParams = sourceOperator.getExtParams(entity);
         if (groupEntity != null && streamEntity != null) {
             dataConfig.setState(
                     
SourceStatus.NORMAL_STATUS_SET.contains(SourceStatus.forCode(entity.getStatus()))
 ? 1 : 0);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index bb54145a06..935a688d82 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -78,6 +78,11 @@ public abstract class AbstractSourceOperator implements 
StreamSourceOperator {
      */
     protected abstract void setTargetEntity(SourceRequest request, 
StreamSourceEntity targetEntity);
 
+    @Override
+    public String getExtParams(StreamSourceEntity sourceEntity) {
+        return sourceEntity.getExtParams();
+    }
+
     @Override
     @Transactional(rollbackFor = Throwable.class)
     public Integer saveOpt(SourceRequest request, Integer groupStatus, String 
operator) {
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
index be1768b599..5e7168879b 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
@@ -43,6 +43,8 @@ public interface StreamSourceOperator {
      */
     Boolean accept(String sourceType);
 
+    String getExtParams(StreamSourceEntity sourceEntity);
+
     /**
      * Save the source info.
      *
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperator.java
index 2a42b24b4f..5342d2e56a 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperator.java
@@ -17,24 +17,31 @@
 
 package org.apache.inlong.manager.service.source.binlog;
 
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.pojo.node.mysql.MySQLDataNodeInfo;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 import org.apache.inlong.manager.pojo.source.StreamSource;
 import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSource;
 import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSourceDTO;
 import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSourceRequest;
 import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
 import org.apache.inlong.manager.service.source.AbstractSourceOperator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.util.List;
+import java.util.Objects;
 
 /**
  * Binlog source operator
@@ -42,6 +49,8 @@ import java.util.List;
 @Service
 public class BinlogSourceOperator extends AbstractSourceOperator {
 
+    @Autowired
+    protected DataNodeOperateHelper dataNodeHelper;
     @Autowired
     private ObjectMapper objectMapper;
 
@@ -55,6 +64,23 @@ public class BinlogSourceOperator extends 
AbstractSourceOperator {
         return SourceType.MYSQL_BINLOG;
     }
 
+    @Override
+    public String getExtParams(StreamSourceEntity sourceEntity) {
+        MySQLBinlogSourceDTO mySQLBinlogSourceDTO = 
JsonUtils.parseObject(sourceEntity.getExtParams(),
+                MySQLBinlogSourceDTO.class);
+        if (Objects.nonNull(mySQLBinlogSourceDTO) && 
StringUtils.isBlank(mySQLBinlogSourceDTO.getHostname())) {
+            MySQLDataNodeInfo dataNodeInfo = (MySQLDataNodeInfo) 
dataNodeHelper.getDataNodeInfo(
+                    sourceEntity.getDataNodeName(), DataNodeType.MYSQL);
+            CommonBeanUtils.copyProperties(dataNodeInfo, mySQLBinlogSourceDTO, 
true);
+            mySQLBinlogSourceDTO.setUser(dataNodeInfo.getUsername());
+            mySQLBinlogSourceDTO.setPassword(dataNodeInfo.getToken());
+            
mySQLBinlogSourceDTO.setHostname(dataNodeInfo.getUrl().split(InlongConstants.COLON)[0]);
+            
mySQLBinlogSourceDTO.setPort(Integer.valueOf(dataNodeInfo.getUrl().split(InlongConstants.COLON)[1]));
+            return JsonUtils.toJsonString(mySQLBinlogSourceDTO);
+        }
+        return sourceEntity.getExtParams();
+    }
+
     @Override
     protected void setTargetEntity(SourceRequest request, StreamSourceEntity 
targetEntity) {
         MySQLBinlogSourceRequest sourceRequest = (MySQLBinlogSourceRequest) 
request;
@@ -76,6 +102,19 @@ public class BinlogSourceOperator extends 
AbstractSourceOperator {
         }
 
         MySQLBinlogSourceDTO dto = 
MySQLBinlogSourceDTO.getFromJson(entity.getExtParams());
+        if (StringUtils.isBlank(dto.getHostname())) {
+            if (StringUtils.isBlank(entity.getDataNodeName())) {
+                throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+                        "mysql url and data node is blank");
+            }
+            MySQLDataNodeInfo dataNodeInfo = (MySQLDataNodeInfo) 
dataNodeHelper.getDataNodeInfo(
+                    entity.getDataNodeName(), DataNodeType.MYSQL);
+            CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
+            dto.setUser(dataNodeInfo.getUsername());
+            dto.setPassword(dataNodeInfo.getToken());
+            
dto.setHostname(dataNodeInfo.getUrl().split(InlongConstants.COLON)[0]);
+            
dto.setPort(Integer.valueOf(dataNodeInfo.getUrl().split(InlongConstants.COLON)[1]));
+        }
         CommonBeanUtils.copyProperties(entity, source, true);
         CommonBeanUtils.copyProperties(dto, source, true);
 
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
index 4cdd5b9c66..70034803e8 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
@@ -105,6 +105,7 @@ class AgentServiceTest extends ServiceBaseTest {
         sourceInfo.setInlongStreamId(GLOBAL_STREAM_ID);
         sourceInfo.setSourceType(SourceType.MYSQL_BINLOG);
         sourceInfo.setSourceName("binlog_source_in_agent_service_test");
+        sourceInfo.setHostname("127.0.0.1");
         return sourceService.save(sourceInfo, GLOBAL_OPERATOR);
     }
 
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/StreamSourceServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/StreamSourceServiceTest.java
index 4d343a6dd8..45c41eb4b8 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/StreamSourceServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/StreamSourceServiceTest.java
@@ -54,6 +54,7 @@ public class StreamSourceServiceTest extends ServiceBaseTest {
         String sourceName = "stream_source_service_test";
         sourceInfo.setSourceName(sourceName);
         sourceInfo.setSourceType(SourceType.MYSQL_BINLOG);
+        sourceInfo.setHostname("127.0.0.1");
         Map<String, Object> properties = Maps.newLinkedHashMap();
         properties.put("append-mode", "true");
         sourceInfo.setProperties(properties);

Reply via email to