This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ebcd6d3230 [INLONG-11585][Manager] Support JDBC verification under
dual write parameters (#11586)
ebcd6d3230 is described below
commit ebcd6d3230b73abdc41edb93619be2a26c9b8c97
Author: fuweng11 <[email protected]>
AuthorDate: Sun Dec 8 21:55:49 2024 +0800
[INLONG-11585][Manager] Support JDBC verification under dual write
parameters (#11586)
---
.../manager/pojo/util/MySQLSensitiveUrlUtils.java | 18 +++++++++++++++---
.../manager/pojo/sink/mysql/MySQLSinkDTOTest.java | 4 ++++
.../service/cluster/InlongClusterServiceImpl.java | 4 ++--
.../queue/pulsar/PulsarQueueResourceOperator.java | 2 +-
4 files changed, 22 insertions(+), 6 deletions(-)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/util/MySQLSensitiveUrlUtils.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/util/MySQLSensitiveUrlUtils.java
index fd80460301..32655da1fb 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/util/MySQLSensitiveUrlUtils.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/util/MySQLSensitiveUrlUtils.java
@@ -71,11 +71,13 @@ public class MySQLSensitiveUrlUtils {
}
resultUrl = resultUrl.replaceAll(InlongConstants.REGEX_WHITESPACE,
InlongConstants.EMPTY);
- for (String key : SENSITIVE_REPLACE_PARAM_MAP.keySet()) {
- resultUrl = StringUtils.replaceIgnoreCase(resultUrl, key +
InlongConstants.EQUAL + "true",
+ String sensitiveKey = containSensitiveKey(resultUrl);
+ while (StringUtils.isNotBlank(sensitiveKey)) {
+ resultUrl = StringUtils.replaceIgnoreCase(resultUrl,
sensitiveKey + InlongConstants.EQUAL + "true",
InlongConstants.EMPTY);
- resultUrl = StringUtils.replaceIgnoreCase(resultUrl, key +
InlongConstants.EQUAL + "yes",
+ resultUrl = StringUtils.replaceIgnoreCase(resultUrl,
sensitiveKey + InlongConstants.EQUAL + "yes",
InlongConstants.EMPTY);
+ sensitiveKey = containSensitiveKey(resultUrl);
}
if (resultUrl.contains(InlongConstants.QUESTION_MARK)) {
StringBuilder builder = new StringBuilder();
@@ -114,4 +116,14 @@ public class MySQLSensitiveUrlUtils {
url, e.getMessage()));
}
}
+
+ public static String containSensitiveKey(String url) {
+ for (String key : SENSITIVE_REPLACE_PARAM_MAP.keySet()) {
+ if (url.contains(key + InlongConstants.EQUAL + "true")
+ || url.contains(key + InlongConstants.EQUAL + "yes")) {
+ return key;
+ }
+ }
+ return null;
+ }
}
diff --git
a/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTOTest.java
b/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTOTest.java
index 3955a1538c..b4805af545 100644
---
a/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTOTest.java
+++
b/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/sink/mysql/MySQLSinkDTOTest.java
@@ -36,6 +36,10 @@ public class MySQLSinkDTOTest {
"jdbc:mysql://127.0.0.1:3306?autoReconnect=true&autoDeserialize=false&allowUrlInLocalInfile=false&allowLoadLocalInfile=false",
originUrl);
+ originUrl = MySQLSinkDTO.filterSensitive(
+
"jdbc:mysql://address=(host=127.0.0.1)(port=3306)(allowLoadallowLoadLocalInfile=trueLocalInfile=true)");
+
Assertions.assertEquals("jdbc:mysql://address=(host=127.0.0.1)(port=3306)()",
originUrl);
+
originUrl = MySQLSinkDTO.filterSensitive(
"jdbc:mysql://127.0.0.1:3306?autoReconnect=true&autoDeserialize =
TRue&allowLoadLocalInfile=TRue&allowUrlInLocalInfile=TRue&allowLoadLocalInfileInPath=/");
Assertions.assertEquals(
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index e2be48a4e0..a4b65e6c2e 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -960,7 +960,7 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
public DataProxyNodeResponse getDataProxyNodes(String groupId, String
protocolType) {
LOGGER.debug("begin to get data proxy nodes for groupId={},
protocol={}", groupId, protocolType);
- InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
+ InlongGroupEntity groupEntity =
groupMapper.selectByGroupIdWithoutTenant(groupId);
if (groupEntity == null) {
String errMsg = String.format("group not found by groupId=%s",
groupId);
LOGGER.error(errMsg);
@@ -1082,7 +1082,7 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
}
private List<InlongClusterNodeEntity> getClusterNodes(String groupId,
String clusterType, String protocolType) {
- InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
+ InlongGroupEntity groupEntity =
groupMapper.selectByGroupIdWithoutTenant(groupId);
if (groupEntity == null) {
LOGGER.warn("inlong group not exists for groupId={}", groupId);
return Lists.newArrayList();
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
index efbdeabf20..67a1b39f7a 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
@@ -324,7 +324,7 @@ public class PulsarQueueResourceOperator implements
QueueResourceOperator {
int finalMsgCount = Math.min(request.getMessageCount(),
briefMQMessages.size());
if (finalMsgCount > 0) {
- return briefMQMessages.subList(0, finalMsgCount);
+ return new ArrayList<>(briefMQMessages.subList(0, finalMsgCount));
} else {
return new ArrayList<>();
}