This is an automated email from the ASF dual-hosted git repository.
vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d698b5cde6 [INLONG-10527][Sort] EsSink support switch metadata acquire
mode (#10552)
d698b5cde6 is described below
commit d698b5cde64f9e591a8f8f92c66b35c1a62f966a
Author: vernedeng <[email protected]>
AuthorDate: Tue Jul 2 14:19:31 2024 +0800
[INLONG-10527][Sort] EsSink support switch metadata acquire mode (#10552)
* [INLONG-10527][Sort] EsSink support switch metadata acquire mode
---
.../sink/elasticsearch/EsSinkContext.java | 267 +++++++++++++--------
.../src/test/java/common.properties | 1 +
.../src/test/resources/common.properties | 2 +-
3 files changed, 171 insertions(+), 99 deletions(-)
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
index 6357dc8330..66ad584427 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
@@ -20,16 +20,21 @@ package
org.apache.inlong.sort.standalone.sink.elasticsearch;
import org.apache.inlong.common.pojo.sort.ClusterTagConfig;
import org.apache.inlong.common.pojo.sort.TaskConfig;
import org.apache.inlong.common.pojo.sort.node.EsNodeConfig;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
import org.apache.inlong.sort.standalone.sink.SinkContext;
import org.apache.inlong.sort.standalone.utils.BufferQueue;
+import org.apache.inlong.sort.standalone.utils.Constants;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
@@ -49,7 +54,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
/**
- *
* EsSinkContext
*/
public class EsSinkContext extends SinkContext {
@@ -114,14 +118,6 @@ public class EsSinkContext extends SinkContext {
private String strHttpHosts;
private HttpHost[] httpHosts;
- /**
- * Constructor
- *
- * @param sinkName
- * @param context
- * @param channel
- * @param dispatchQueue
- */
public EsSinkContext(String sinkName, Context context, Channel channel,
BufferQueue<EsIndexRequest> dispatchQueue) {
super(sinkName, context, channel);
@@ -138,61 +134,30 @@ public class EsSinkContext extends SinkContext {
LOG.info("SortTask:{},dispatchQueue:{},offer:{},take:{},back:{}",
taskName, dispatchQueue.size(), offerCounter.getAndSet(0),
takeCounter.getAndSet(0), backCounter.getAndSet(0));
- TaskConfig newSortTaskConfig =
SortConfigHolder.getTaskConfig(taskName);
- if (this.taskConfig != null &&
this.taskConfig.equals(newSortTaskConfig)) {
+ TaskConfig newTaskConfig =
SortConfigHolder.getTaskConfig(taskName);
+ SortTaskConfig newSortTaskConfig =
SortClusterConfigHolder.getTaskConfig(taskName);
+ if ((newTaskConfig == null || newTaskConfig.equals(taskConfig))
+ && (newSortTaskConfig == null ||
newSortTaskConfig.equals(sortTaskConfig))) {
return;
}
- LOG.info("get new SortTaskConfig:taskName:{}:config:{}", taskName,
- objectMapper.writeValueAsString(newSortTaskConfig));
- this.taskConfig = newSortTaskConfig;
- EsNodeConfig requestNodeConfig = (EsNodeConfig)
taskConfig.getNodeConfig();
+ LOG.info("get new SortTaskConfig:taskName:{}", taskName);
+
+ EsNodeConfig requestNodeConfig = (EsNodeConfig)
newTaskConfig.getNodeConfig();
if (esNodeConfig == null || requestNodeConfig.getVersion() >
esNodeConfig.getVersion()) {
this.esNodeConfig = requestNodeConfig;
}
- Map<String, String> properties =
this.taskConfig.getNodeConfig().getProperties();
- this.sinkContext = new Context(properties != null ? properties :
new HashMap<>());
+ this.taskConfig = newTaskConfig;
+ this.sortTaskConfig = newSortTaskConfig;
+
// change current config
- this.idConfigMap = this.taskConfig.getClusterTagConfigs()
- .stream()
- .map(ClusterTagConfig::getDataFlowConfigs)
- .flatMap(Collection::stream)
- .map(EsIdConfig::create)
- .collect(Collectors.toMap(
- config ->
InlongId.generateUid(config.getInlongGroupId(), config.getInlongStreamId()),
- v -> v,
- (flow1, flow2) -> flow1));
- // rest client
- this.username = esNodeConfig.getUsername();
- this.password = esNodeConfig.getPassword();
- this.bulkAction = esNodeConfig.getBulkAction();
- this.bulkSizeMb = esNodeConfig.getBulkSizeMb();
- this.flushInterval = esNodeConfig.getFlushInterval();
- this.concurrentRequests = esNodeConfig.getConcurrentRequests();
- this.maxConnect = esNodeConfig.getMaxConnect();
- this.keywordMaxLength = esNodeConfig.getKeywordMaxLength();
- this.isUseIndexId = esNodeConfig.getIsUseIndexId();
-
- this.maxConnectPerRoute =
sinkContext.getInteger(KEY_MAX_CONNECT_PER_ROUTE,
DEFAULT_MAX_CONNECT_PER_ROUTE);
- this.connectionRequestTimeout =
- sinkContext.getInteger(KEY_CONNECTION_REQUEST_TIMEOUT,
DEFAULT_CONNECTION_REQUEST_TIMEOUT);
- this.socketTimeout = sinkContext.getInteger(KEY_SOCKET_TIMEOUT,
DEFAULT_SOCKET_TIMEOUT);
- this.maxRedirects = sinkContext.getInteger(KEY_MAX_REDIRECTS,
DEFAULT_MAX_REDIRECTS);
- this.logMaxLength = sinkContext.getInteger(KEY_LOG_MAX_LENGTH,
DEFAULT_LOG_MAX_LENGTH);
- // http host
- this.strHttpHosts = esNodeConfig.getHttpHosts();
- if (!StringUtils.isBlank(strHttpHosts)) {
- String[] strHttpHostArray = strHttpHosts.split("\\s+");
- List<HttpHost> newHttpHosts = new
ArrayList<>(strHttpHostArray.length);
- for (String strHttpHost : strHttpHostArray) {
- String[] ipPort = strHttpHost.split(":");
- if (ipPort.length == 2 && NumberUtils.isDigits(ipPort[1]))
{
- newHttpHosts.add(new HttpHost(ipPort[0],
NumberUtils.toInt(ipPort[1])));
- }
- }
- if (newHttpHosts.size() > 0) {
- HttpHost[] newHostHostArray = new
HttpHost[newHttpHosts.size()];
- this.httpHosts = newHttpHosts.toArray(newHostHostArray);
- }
+ Map<String, EsIdConfig> fromTaskConfig =
reloadIdParamsFromTaskConfig(taskConfig);
+ Map<String, EsIdConfig> fromSortTaskConfig =
reloadIdParamsFromSortTaskConfig(sortTaskConfig);
+ if (unifiedConfiguration) {
+ idConfigMap = fromTaskConfig;
+ reloadClientsFromNodeConfig(esNodeConfig);
+ } else {
+ idConfigMap = fromSortTaskConfig;
+ reloadClientsFromSortTaskConfig(sortTaskConfig);
}
// log
LOG.info("end to get
SortTaskConfig:taskName:{}:newIdConfigMap:{}", taskName,
@@ -202,9 +167,115 @@ public class EsSinkContext extends SinkContext {
}
}
+ private Map<String, EsIdConfig> reloadIdParamsFromTaskConfig(TaskConfig
taskConfig) {
+ if (taskConfig == null) {
+ return new HashMap<>();
+ }
+ return taskConfig.getClusterTagConfigs()
+ .stream()
+ .map(ClusterTagConfig::getDataFlowConfigs)
+ .flatMap(Collection::stream)
+ .map(EsIdConfig::create)
+ .collect(Collectors.toMap(
+ config ->
InlongId.generateUid(config.getInlongGroupId(), config.getInlongStreamId()),
+ v -> v,
+ (flow1, flow2) -> flow1));
+ }
+
+ private Map<String, EsIdConfig>
reloadIdParamsFromSortTaskConfig(SortTaskConfig sortTaskConfig)
+ throws JsonProcessingException {
+ if (sortTaskConfig == null) {
+ return new HashMap<>();
+ }
+ Map<String, EsIdConfig> newIdConfigMap = new ConcurrentHashMap<>();
+ List<Map<String, String>> idList = this.sortTaskConfig.getIdParams();
+ ObjectMapper objectMapper = new ObjectMapper();
+
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
+ for (Map<String, String> idParam : idList) {
+ String inlongGroupId = idParam.get(Constants.INLONG_GROUP_ID);
+ String inlongStreamId = idParam.get(Constants.INLONG_STREAM_ID);
+ String uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
+ String jsonIdConfig = objectMapper.writeValueAsString(idParam);
+ EsIdConfig idConfig = objectMapper.readValue(jsonIdConfig,
EsIdConfig.class);
+ newIdConfigMap.put(uid, idConfig);
+ }
+ return newIdConfigMap;
+ }
+
+ private void reloadClientsFromNodeConfig(EsNodeConfig esNodeConfig) {
+ Map<String, String> properties = esNodeConfig.getProperties();
+ this.sinkContext = new Context(properties != null ? properties : new
HashMap<>());
+ this.username = esNodeConfig.getUsername();
+ this.password = esNodeConfig.getPassword();
+ this.bulkAction = esNodeConfig.getBulkAction();
+ this.bulkSizeMb = esNodeConfig.getBulkSizeMb();
+ this.flushInterval = esNodeConfig.getFlushInterval();
+ this.concurrentRequests = esNodeConfig.getConcurrentRequests();
+ this.maxConnect = esNodeConfig.getMaxConnect();
+ this.keywordMaxLength = esNodeConfig.getKeywordMaxLength();
+ this.isUseIndexId = esNodeConfig.getIsUseIndexId();
+
+ this.maxConnectPerRoute =
sinkContext.getInteger(KEY_MAX_CONNECT_PER_ROUTE,
DEFAULT_MAX_CONNECT_PER_ROUTE);
+ this.connectionRequestTimeout =
+ sinkContext.getInteger(KEY_CONNECTION_REQUEST_TIMEOUT,
DEFAULT_CONNECTION_REQUEST_TIMEOUT);
+ this.socketTimeout = sinkContext.getInteger(KEY_SOCKET_TIMEOUT,
DEFAULT_SOCKET_TIMEOUT);
+ this.maxRedirects = sinkContext.getInteger(KEY_MAX_REDIRECTS,
DEFAULT_MAX_REDIRECTS);
+ this.logMaxLength = sinkContext.getInteger(KEY_LOG_MAX_LENGTH,
DEFAULT_LOG_MAX_LENGTH);
+ // http host
+ this.strHttpHosts = esNodeConfig.getHttpHosts();
+ if (!StringUtils.isBlank(strHttpHosts)) {
+ String[] strHttpHostArray = strHttpHosts.split("\\s+");
+ List<HttpHost> newHttpHosts = new
ArrayList<>(strHttpHostArray.length);
+ for (String strHttpHost : strHttpHostArray) {
+ String[] ipPort = strHttpHost.split(":");
+ if (ipPort.length == 2 && NumberUtils.isDigits(ipPort[1])) {
+ newHttpHosts.add(new HttpHost(ipPort[0],
NumberUtils.toInt(ipPort[1])));
+ }
+ }
+ if (newHttpHosts.size() > 0) {
+ HttpHost[] newHostHostArray = new
HttpHost[newHttpHosts.size()];
+ this.httpHosts = newHttpHosts.toArray(newHostHostArray);
+ }
+ }
+ }
+
+ private void reloadClientsFromSortTaskConfig(SortTaskConfig
sortTaskConfig) {
+ this.sinkContext = new Context(sortTaskConfig.getSinkParams());
+ this.username = sinkContext.getString(KEY_USERNAME);
+ this.password = sinkContext.getString(KEY_PASSWORD);
+ this.bulkAction = sinkContext.getInteger(KEY_BULK_ACTION,
DEFAULT_BULK_ACTION);
+ this.bulkSizeMb = sinkContext.getInteger(KEY_BULK_SIZE_MB,
DEFAULT_BULK_SIZE_MB);
+ this.flushInterval = sinkContext.getInteger(KEY_FLUSH_INTERVAL,
DEFAULT_FLUSH_INTERVAL);
+ this.concurrentRequests =
sinkContext.getInteger(KEY_CONCURRENT_REQUESTS, DEFAULT_CONCURRENT_REQUESTS);
+ this.maxConnect = sinkContext.getInteger(KEY_MAX_CONNECT_TOTAL,
DEFAULT_MAX_CONNECT_TOTAL);
+
+ this.maxConnectPerRoute =
sinkContext.getInteger(KEY_MAX_CONNECT_PER_ROUTE,
DEFAULT_MAX_CONNECT_PER_ROUTE);
+ this.connectionRequestTimeout =
+ sinkContext.getInteger(KEY_CONNECTION_REQUEST_TIMEOUT,
DEFAULT_CONNECTION_REQUEST_TIMEOUT);
+ this.socketTimeout = sinkContext.getInteger(KEY_SOCKET_TIMEOUT,
DEFAULT_SOCKET_TIMEOUT);
+ this.maxRedirects = sinkContext.getInteger(KEY_MAX_REDIRECTS,
DEFAULT_MAX_REDIRECTS);
+ this.logMaxLength = sinkContext.getInteger(KEY_LOG_MAX_LENGTH,
DEFAULT_LOG_MAX_LENGTH);
+ // http host
+ this.strHttpHosts = sinkContext.getString(KEY_HTTP_HOSTS);
+ if (!StringUtils.isBlank(strHttpHosts)) {
+ String[] strHttpHostArray = strHttpHosts.split("\\s+");
+ List<HttpHost> newHttpHosts = new
ArrayList<>(strHttpHostArray.length);
+ for (String strHttpHost : strHttpHostArray) {
+ String[] ipPort = strHttpHost.split(":");
+ if (ipPort.length == 2 && NumberUtils.isDigits(ipPort[1])) {
+ newHttpHosts.add(new HttpHost(ipPort[0],
NumberUtils.toInt(ipPort[1])));
+ }
+ }
+ if (newHttpHosts.size() > 0) {
+ HttpHost[] newHostHostArray = new
HttpHost[newHttpHosts.size()];
+ this.httpHosts = newHttpHosts.toArray(newHostHostArray);
+ }
+ }
+ }
+
/**
* addSendMetric
- *
+ *
* @param currentRecord
* @param bid
*/
@@ -242,7 +313,7 @@ public class EsSinkContext extends SinkContext {
/**
* addSendResultMetric
- *
+ *
* @param currentRecord
* @param bid
* @param result
@@ -281,8 +352,8 @@ public class EsSinkContext extends SinkContext {
/**
* getIdConfig
- *
- * @param uid
+ *
+ * @param uid
* @return
*/
public EsIdConfig getIdConfig(String uid) {
@@ -291,7 +362,7 @@ public class EsSinkContext extends SinkContext {
/**
* get nodeId
- *
+ *
* @return the nodeId
*/
public String getNodeId() {
@@ -300,7 +371,7 @@ public class EsSinkContext extends SinkContext {
/**
* get idConfigMap
- *
+ *
* @return the idConfigMap
*/
public Map<String, EsIdConfig> getIdConfigMap() {
@@ -309,7 +380,7 @@ public class EsSinkContext extends SinkContext {
/**
* get sinkContext
- *
+ *
* @return the sinkContext
*/
public Context getSinkContext() {
@@ -318,7 +389,7 @@ public class EsSinkContext extends SinkContext {
/**
* set sinkContext
- *
+ *
* @param sinkContext the sinkContext to set
*/
public void setSinkContext(Context sinkContext) {
@@ -327,8 +398,8 @@ public class EsSinkContext extends SinkContext {
/**
* offerDispatchQueue
- *
- * @param indexRequest
+ *
+ * @param indexRequest
* @return
*/
public void offerDispatchQueue(EsIndexRequest indexRequest) {
@@ -339,7 +410,7 @@ public class EsSinkContext extends SinkContext {
/**
* takeDispatchQueue
- *
+ *
* @return
*/
public EsIndexRequest takeDispatchQueue() {
@@ -352,8 +423,8 @@ public class EsSinkContext extends SinkContext {
/**
* backDispatchQueue
- *
- * @param indexRequest
+ *
+ * @param indexRequest
* @return
*/
public void backDispatchQueue(EsIndexRequest indexRequest) {
@@ -363,8 +434,8 @@ public class EsSinkContext extends SinkContext {
/**
* releaseDispatchQueue
- *
- * @param indexRequest
+ *
+ * @param indexRequest
* @return
*/
public void releaseDispatchQueue(EsIndexRequest indexRequest) {
@@ -373,7 +444,7 @@ public class EsSinkContext extends SinkContext {
/**
* get bulkAction
- *
+ *
* @return the bulkAction
*/
public int getBulkAction() {
@@ -382,7 +453,7 @@ public class EsSinkContext extends SinkContext {
/**
* set bulkAction
- *
+ *
* @param bulkAction the bulkAction to set
*/
public void setBulkAction(int bulkAction) {
@@ -391,7 +462,7 @@ public class EsSinkContext extends SinkContext {
/**
* get bulkSizeMb
- *
+ *
* @return the bulkSizeMb
*/
public int getBulkSizeMb() {
@@ -400,7 +471,7 @@ public class EsSinkContext extends SinkContext {
/**
* set bulkSizeMb
- *
+ *
* @param bulkSizeMb the bulkSizeMb to set
*/
public void setBulkSizeMb(int bulkSizeMb) {
@@ -409,7 +480,7 @@ public class EsSinkContext extends SinkContext {
/**
* get flushInterval
- *
+ *
* @return the flushInterval
*/
public int getFlushInterval() {
@@ -418,7 +489,7 @@ public class EsSinkContext extends SinkContext {
/**
* set flushInterval
- *
+ *
* @param flushInterval the flushInterval to set
*/
public void setFlushInterval(int flushInterval) {
@@ -427,7 +498,7 @@ public class EsSinkContext extends SinkContext {
/**
* get concurrentRequests
- *
+ *
* @return the concurrentRequests
*/
public int getConcurrentRequests() {
@@ -436,7 +507,7 @@ public class EsSinkContext extends SinkContext {
/**
* set concurrentRequests
- *
+ *
* @param concurrentRequests the concurrentRequests to set
*/
public void setConcurrentRequests(int concurrentRequests) {
@@ -445,7 +516,7 @@ public class EsSinkContext extends SinkContext {
/**
* get maxConnect
- *
+ *
* @return the maxConnect
*/
public int getMaxConnect() {
@@ -489,7 +560,7 @@ public class EsSinkContext extends SinkContext {
/**
* set maxConnect
- *
+ *
* @param maxConnect the maxConnect to set
*/
public void setMaxConnect(int maxConnect) {
@@ -498,7 +569,7 @@ public class EsSinkContext extends SinkContext {
/**
* get strHttpHosts
- *
+ *
* @return the strHttpHosts
*/
public String getStrHttpHosts() {
@@ -507,7 +578,7 @@ public class EsSinkContext extends SinkContext {
/**
* set strHttpHosts
- *
+ *
* @param strHttpHosts the strHttpHosts to set
*/
public void setStrHttpHosts(String strHttpHosts) {
@@ -516,7 +587,7 @@ public class EsSinkContext extends SinkContext {
/**
* get httpHosts
- *
+ *
* @return the httpHosts
*/
public HttpHost[] getHttpHosts() {
@@ -525,7 +596,7 @@ public class EsSinkContext extends SinkContext {
/**
* set httpHosts
- *
+ *
* @param httpHosts the httpHosts to set
*/
public void setHttpHosts(HttpHost[] httpHosts) {
@@ -534,7 +605,7 @@ public class EsSinkContext extends SinkContext {
/**
* set nodeId
- *
+ *
* @param nodeId the nodeId to set
*/
public void setNodeId(String nodeId) {
@@ -543,7 +614,7 @@ public class EsSinkContext extends SinkContext {
/**
* set idConfigMap
- *
+ *
* @param idConfigMap the idConfigMap to set
*/
public void setIdConfigMap(Map<String, EsIdConfig> idConfigMap) {
@@ -552,7 +623,7 @@ public class EsSinkContext extends SinkContext {
/**
* get username
- *
+ *
* @return the username
*/
public String getUsername() {
@@ -561,7 +632,7 @@ public class EsSinkContext extends SinkContext {
/**
* set username
- *
+ *
* @param username the username to set
*/
public void setUsername(String username) {
@@ -570,7 +641,7 @@ public class EsSinkContext extends SinkContext {
/**
* get password
- *
+ *
* @return the password
*/
public String getPassword() {
@@ -579,7 +650,7 @@ public class EsSinkContext extends SinkContext {
/**
* set password
- *
+ *
* @param password the password to set
*/
public void setPassword(String password) {
@@ -588,7 +659,7 @@ public class EsSinkContext extends SinkContext {
/**
* get keywordMaxLength
- *
+ *
* @return the keywordMaxLength
*/
public int getKeywordMaxLength() {
@@ -597,7 +668,7 @@ public class EsSinkContext extends SinkContext {
/**
* set keywordMaxLength
- *
+ *
* @param keywordMaxLength the keywordMaxLength to set
*/
public void setKeywordMaxLength(int keywordMaxLength) {
@@ -606,7 +677,7 @@ public class EsSinkContext extends SinkContext {
/**
* get isUseIndexId
- *
+ *
* @return the isUseIndexId
*/
public boolean isUseIndexId() {
@@ -615,7 +686,7 @@ public class EsSinkContext extends SinkContext {
/**
* set isUseIndexId
- *
+ *
* @param isUseIndexId the isUseIndexId to set
*/
public void setUseIndexId(boolean isUseIndexId) {
@@ -624,7 +695,7 @@ public class EsSinkContext extends SinkContext {
/**
* create indexRequestHandler
- *
+ *
* @return the indexRequestHandler
*/
public IEvent2IndexRequestHandler createIndexRequestHandler() {
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/test/java/common.properties
b/inlong-sort-standalone/sort-standalone-source/src/test/java/common.properties
index 5f79465039..d7a0a2978e 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/test/java/common.properties
+++
b/inlong-sort-standalone/sort-standalone-source/src/test/java/common.properties
@@ -28,6 +28,7 @@
sortClusterConfig.type=org.apache.inlong.sort.standalone.config.loader.ClassReso
sortConfig.type=org.apache.inlong.sort.standalone.config.loader.v2.ClassResourceSortClusterConfigLoader
indexRequestHandler=org.apache.inlong.sort.standalone.sink.elasticsearch.DefaultEvent2IndexRequestHandler
+useUnifiedConfiguration=true
maxThreads=10
reloadInterval=60000
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/test/resources/common.properties
b/inlong-sort-standalone/sort-standalone-source/src/test/resources/common.properties
index 5f79465039..e448826938 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/test/resources/common.properties
+++
b/inlong-sort-standalone/sort-standalone-source/src/test/resources/common.properties
@@ -28,7 +28,7 @@
sortClusterConfig.type=org.apache.inlong.sort.standalone.config.loader.ClassReso
sortConfig.type=org.apache.inlong.sort.standalone.config.loader.v2.ClassResourceSortClusterConfigLoader
indexRequestHandler=org.apache.inlong.sort.standalone.sink.elasticsearch.DefaultEvent2IndexRequestHandler
-
+useUnifiedConfiguration=true
maxThreads=10
reloadInterval=60000
processInterval=100