This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ebcd6d3230 [INLONG-11585][Manager] Support JDBC verification under 
dual write parameters  (#11586)
ebcd6d3230 is described below

commit ebcd6d3230b73abdc41edb93619be2a26c9b8c97
Author: fuweng11 <[email protected]>
AuthorDate: Sun Dec 8 21:55:49 2024 +0800

    [INLONG-11585][Manager] Support JDBC verification under dual write 
parameters  (#11586)
---
 .../manager/pojo/util/MySQLSensitiveUrlUtils.java      | 18 +++++++++++++++---
 .../manager/pojo/sink/mysql/MySQLSinkDTOTest.java      |  4 ++++
 .../service/cluster/InlongClusterServiceImpl.java      |  4 ++--
 .../queue/pulsar/PulsarQueueResourceOperator.java      |  2 +-
 4 files changed, 22 insertions(+), 6 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/util/MySQLSensitiveUrlUtils.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/util/MySQLSensitiveUrlUtils.java
index fd80460301..32655da1fb 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/util/MySQLSensitiveUrlUtils.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/util/MySQLSensitiveUrlUtils.java
@@ -71,11 +71,13 @@ public class MySQLSensitiveUrlUtils {
             }
             resultUrl = resultUrl.replaceAll(InlongConstants.REGEX_WHITESPACE, 
InlongConstants.EMPTY);
 
-            for (String key : SENSITIVE_REPLACE_PARAM_MAP.keySet()) {
-                resultUrl = StringUtils.replaceIgnoreCase(resultUrl, key + 
InlongConstants.EQUAL + "true",
+            String sensitiveKey = containSensitiveKey(resultUrl);
+            while (StringUtils.isNotBlank(sensitiveKey)) {
+                resultUrl = StringUtils.replaceIgnoreCase(resultUrl, 
sensitiveKey + InlongConstants.EQUAL + "true",
                         InlongConstants.EMPTY);
-                resultUrl = StringUtils.replaceIgnoreCase(resultUrl, key + 
InlongConstants.EQUAL + "yes",
+                resultUrl = StringUtils.replaceIgnoreCase(resultUrl, 
sensitiveKey + InlongConstants.EQUAL + "yes",
                         InlongConstants.EMPTY);
+                sensitiveKey = containSensitiveKey(resultUrl);
             }
             if (resultUrl.contains(InlongConstants.QUESTION_MARK)) {
                 StringBuilder builder = new StringBuilder();
@@ -114,4 +116,14 @@ public class MySQLSensitiveUrlUtils {
                     url, e.getMessage()));
         }
     }
+
+    public static String containSensitiveKey(String url) {
+        for (String key : SENSITIVE_REPLACE_PARAM_MAP.keySet()) {
+            if (url.contains(key + InlongConstants.EQUAL + "true")
+                    || url.contains(key + InlongConstants.EQUAL + "yes")) {
+                return key;
+            }
+        }
+        return null;
+    }
 }
diff --git 
a/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTOTest.java
 
b/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTOTest.java
index 3955a1538c..b4805af545 100644
--- 
a/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTOTest.java
+++ 
b/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTOTest.java
@@ -36,6 +36,10 @@ public class MySQLSinkDTOTest {
                 
"jdbc:mysql://127.0.0.1:3306?autoReconnect=true&autoDeserialize=false&allowUrlInLocalInfile=false&allowLoadLocalInfile=false",
                 originUrl);
 
+        originUrl = MySQLSinkDTO.filterSensitive(
+                
"jdbc:mysql://address=(host=127.0.0.1)(port=3306)(allowLoadallowLoadLocalInfile=trueLocalInfile=true)");
+        
Assertions.assertEquals("jdbc:mysql://address=(host=127.0.0.1)(port=3306)()", 
originUrl);
+
         originUrl = MySQLSinkDTO.filterSensitive(
                 
"jdbc:mysql://127.0.0.1:3306?autoReconnect=true&autoDeserialize = 
TRue&allowLoadLocalInfile=TRue&allowUrlInLocalInfile=TRue&allowLoadLocalInfileInPath=/");
         Assertions.assertEquals(
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index e2be48a4e0..a4b65e6c2e 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -960,7 +960,7 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
     public DataProxyNodeResponse getDataProxyNodes(String groupId, String 
protocolType) {
         LOGGER.debug("begin to get data proxy nodes for groupId={}, 
protocol={}", groupId, protocolType);
 
-        InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
+        InlongGroupEntity groupEntity = 
groupMapper.selectByGroupIdWithoutTenant(groupId);
         if (groupEntity == null) {
             String errMsg = String.format("group not found by groupId=%s", 
groupId);
             LOGGER.error(errMsg);
@@ -1082,7 +1082,7 @@ public class InlongClusterServiceImpl implements 
InlongClusterService {
     }
 
     private List<InlongClusterNodeEntity> getClusterNodes(String groupId, 
String clusterType, String protocolType) {
-        InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
+        InlongGroupEntity groupEntity = 
groupMapper.selectByGroupIdWithoutTenant(groupId);
         if (groupEntity == null) {
             LOGGER.warn("inlong group not exists for groupId={}", groupId);
             return Lists.newArrayList();
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
index efbdeabf20..67a1b39f7a 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
@@ -324,7 +324,7 @@ public class PulsarQueueResourceOperator implements 
QueueResourceOperator {
 
         int finalMsgCount = Math.min(request.getMessageCount(), 
briefMQMessages.size());
         if (finalMsgCount > 0) {
-            return briefMQMessages.subList(0, finalMsgCount);
+            return new ArrayList<>(briefMQMessages.subList(0, finalMsgCount));
         } else {
             return new ArrayList<>();
         }

Reply via email to