This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9493c040b9 [INLONG-11702][SDK] Optimize Sender factory implementation 
(#11705)
9493c040b9 is described below

commit 9493c040b958d25fda2f21708fa698ae61a5f8e5
Author: Goson Zhang <[email protected]>
AuthorDate: Wed Jan 22 14:30:30 2025 +0800

    [INLONG-11702][SDK] Optimize Sender factory implementation (#11705)
    
    Co-authored-by: gosonzhang <[email protected]>
---
 .../inlong/sdk/dataproxy/BaseMsgSenderFactory.java | 267 +++++++++++++++++++++
 .../inlong/sdk/dataproxy/DefaultMessageSender.java |   4 +
 .../apache/inlong/sdk/dataproxy/MessageSender.java |   1 +
 .../inlong/sdk/dataproxy/MessageSenderFactory.java |  22 --
 .../inlong/sdk/dataproxy/MsgSenderFactory.java     |  91 +++++++
 .../sdk/dataproxy/MsgSenderMultiFactory.java       | 101 ++++++++
 .../sdk/dataproxy/MsgSenderSingleFactory.java      | 108 +++++++++
 .../sdk/dataproxy/common/ProxyClientConfig.java    |  14 +-
 .../sdk/dataproxy/common/SendMessageCallback.java  |   5 +
 .../inlong/sdk/dataproxy/example/ExampleUtils.java | 259 ++++++++++++++++++++
 .../dataproxy/example/InLongFactoryExample.java    | 127 ++++++++++
 .../dataproxy/example/InLongTcpClientExample.java  |  71 ++++++
 .../inlong/sdk/dataproxy/sender/BaseSender.java    |  30 ++-
 .../dataproxy/sender/tcp/InLongTcpMsgSender.java   |  63 +++--
 .../sdk/dataproxy/sender/tcp/TcpMsgSender.java     |  73 ++++--
 .../inlong/sdk/dataproxy/utils/ProxyUtils.java     |  16 ++
 16 files changed, 1182 insertions(+), 70 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
new file mode 100644
index 0000000000..0bc0cf4bb9
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy;
+
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig;
+import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
+import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
+import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
+import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
+import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Base Message Sender Factory
+ *
+ * Used to manage the instance relationship of the sender factory.
+ * Since both singleton and multiple instances involve the same relationship,
+ * they are abstracted and maintained separately here.
+ */
+public class BaseMsgSenderFactory {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(BaseMsgSenderFactory.class);
+    private static final LogCounter exptCounter = new LogCounter(10, 100000, 
60 * 1000L);
+    // msg send factory
+    private final MsgSenderFactory msgSenderFactory;
+    private final String factoryNo;
+    // for senders
+    private final ReentrantReadWriteLock senderCacheLock = new 
ReentrantReadWriteLock();
+    // for inlong groupId -- Sender map
+    private final ConcurrentHashMap<String, BaseSender> groupIdSenderMap = new 
ConcurrentHashMap<>();
+    // for inlong clusterId -- Sender map
+    private final ConcurrentHashMap<String, BaseSender> clusterIdSenderMap = 
new ConcurrentHashMap<>();
+
+    public BaseMsgSenderFactory(MsgSenderFactory msgSenderFactory, String 
factoryNo) {
+        this.msgSenderFactory = msgSenderFactory;
+        this.factoryNo = factoryNo;
+        logger.info("MsgSenderFactory({}) started", this.factoryNo);
+    }
+
+    public void close() {
+        int totalSenderCnt;
+        int totalTDBankCnt;
+        senderCacheLock.writeLock().lock();
+        try {
+            // release groupId mapped senders
+            totalSenderCnt = innReleaseAllGroupIdSenders(groupIdSenderMap);
+            // release clusterId mapped senders
+            totalSenderCnt += 
innReleaseAllClusterIdSenders(clusterIdSenderMap);
+        } finally {
+            senderCacheLock.writeLock().unlock();
+        }
+        logger.info("MsgSenderFactory({}) closed, release {} inlong senders",
+                this.factoryNo, totalSenderCnt);
+    }
+
+    public void removeClient(BaseSender msgSender) {
+        if (msgSender == null
+                || msgSender.getSenderFactory() == null
+                || msgSender.getSenderFactory() != msgSenderFactory) {
+            return;
+        }
+        boolean removed;
+        String senderId = msgSender.getSenderId();
+        senderCacheLock.writeLock().lock();
+        try {
+            if (msgSender.getFactoryClusterIdKey() == null) {
+                removed = innRemoveGroupIdSender(msgSender, groupIdSenderMap);
+            } else {
+                removed = innRemoveClusterIdSender(msgSender, 
clusterIdSenderMap);
+            }
+        } finally {
+            senderCacheLock.writeLock().unlock();
+        }
+        if (removed) {
+            logger.info("MsgSenderFactory({}) removed sender({})", 
this.factoryNo, senderId);
+        }
+    }
+
+    public int getMsgSenderCount() {
+        return groupIdSenderMap.size() + clusterIdSenderMap.size();
+    }
+
+    public InLongTcpMsgSender genTcpSenderByGroupId(
+            TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory) 
throws ProxySdkException {
+        ProxyUtils.validProxyConfigNotNull(configure);
+        // query cached sender
+        String metaConfigKey = configure.getGroupMetaConfigKey();
+        InLongTcpMsgSender messageSender =
+                (InLongTcpMsgSender) groupIdSenderMap.get(metaConfigKey);
+        if (messageSender != null) {
+            return messageSender;
+        }
+        // valid configure info
+        ProcessResult procResult = new ProcessResult();
+        qryProxyMetaConfigure(configure, procResult);
+        // generate sender
+        senderCacheLock.writeLock().lock();
+        try {
+            // re-get the created sender based on the groupId key after locked
+            messageSender = (InLongTcpMsgSender) 
groupIdSenderMap.get(metaConfigKey);
+            if (messageSender != null) {
+                return messageSender;
+            }
+            // build a new sender based on groupId
+            messageSender = new InLongTcpMsgSender(configure, 
selfDefineFactory, msgSenderFactory, null);
+            if (!messageSender.start(procResult)) {
+                messageSender.close();
+                throw new ProxySdkException("Failed to start groupId sender: " 
+ procResult);
+            }
+            groupIdSenderMap.put(metaConfigKey, messageSender);
+            logger.info("MsgSenderFactory({}) generated a new groupId({}) 
sender({})",
+                    this.factoryNo, metaConfigKey, 
messageSender.getSenderId());
+            return messageSender;
+        } catch (Throwable ex) {
+            if (exptCounter.shouldPrint()) {
+                logger.warn("MsgSenderFactory({}) build groupId sender({}) 
exception",
+                        this.factoryNo, metaConfigKey, ex);
+            }
+            throw new ProxySdkException("Failed to build groupId sender: " + 
ex.getMessage());
+        } finally {
+            senderCacheLock.writeLock().unlock();
+        }
+    }
+
+    public InLongTcpMsgSender genTcpSenderByClusterId(
+            TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory) 
throws ProxySdkException {
+        ProxyUtils.validProxyConfigNotNull(configure);
+        // get groupId's clusterIdKey
+        ProcessResult procResult = new ProcessResult();
+        ProxyConfigEntry proxyConfigEntry = qryProxyMetaConfigure(configure, 
procResult);;
+        String clusterIdKey = ProxyUtils.buildClusterIdKey(
+                configure.getDataRptProtocol(), configure.getRegionName(), 
proxyConfigEntry.getClusterId());
+        // get local built sender
+        InLongTcpMsgSender messageSender = (InLongTcpMsgSender) 
clusterIdSenderMap.get(clusterIdKey);
+        if (messageSender != null) {
+            return messageSender;
+        }
+        // generate sender
+        senderCacheLock.writeLock().lock();
+        try {
+            // re-get the created sender based on the clusterId Key after 
locked
+            messageSender = (InLongTcpMsgSender) 
clusterIdSenderMap.get(clusterIdKey);
+            if (messageSender != null) {
+                return messageSender;
+            }
+            // build a new sender based on clusterId Key
+            messageSender = new InLongTcpMsgSender(configure,
+                    selfDefineFactory, msgSenderFactory, clusterIdKey);
+            if (!messageSender.start(procResult)) {
+                messageSender.close();
+                throw new ProxySdkException("Failed to start cluster sender: " 
+ procResult);
+            }
+            clusterIdSenderMap.put(clusterIdKey, messageSender);
+            logger.info("MsgSenderFactory({}) generated a new clusterId({}) 
sender({})",
+                    this.factoryNo, clusterIdKey, messageSender.getSenderId());
+            return messageSender;
+        } catch (Throwable ex) {
+            if (exptCounter.shouldPrint()) {
+                logger.warn("MsgSenderFactory({}) build cluster sender({}) 
exception",
+                        this.factoryNo, clusterIdKey, ex);
+            }
+            throw new ProxySdkException("Failed to build cluster sender: " + 
ex.getMessage());
+        } finally {
+            senderCacheLock.writeLock().unlock();
+        }
+    }
+
+    private ProxyConfigEntry qryProxyMetaConfigure(
+            ProxyClientConfig proxyConfig, ProcessResult procResult) throws 
ProxySdkException {
+        ProxyConfigManager inlongMetaQryMgr = new 
ProxyConfigManager(proxyConfig);
+        // check whether valid configure
+        if (!inlongMetaQryMgr.getGroupIdConfigure(true, procResult)) {
+            throw new ProxySdkException("Failed to query remote group config: 
" + procResult);
+        }
+        if (proxyConfig.isEnableReportEncrypt()
+                && !inlongMetaQryMgr.getEncryptConfigure(true, procResult)) {
+            throw new ProxySdkException("Failed to query remote encrypt 
config: " + procResult);
+        }
+        return inlongMetaQryMgr.getProxyConfigEntry();
+    }
+
+    private boolean innRemoveGroupIdSender(BaseSender msgSender, Map<String, 
BaseSender> senderMap) {
+        BaseSender tmpSender = senderMap.get(msgSender.getMetaConfigKey());
+        if (tmpSender == null
+                || !tmpSender.getSenderId().equals(msgSender.getSenderId())) {
+            return false;
+        }
+        return senderMap.remove(msgSender.getMetaConfigKey()) != null;
+    }
+
+    private boolean innRemoveClusterIdSender(BaseSender msgSender, Map<String, 
BaseSender> senderMap) {
+        BaseSender tmpSender = 
senderMap.get(msgSender.getFactoryClusterIdKey());
+        if (tmpSender == null
+                || !tmpSender.getSenderId().equals(msgSender.getSenderId())) {
+            return false;
+        }
+        return senderMap.remove(msgSender.getFactoryClusterIdKey()) != null;
+    }
+
+    private int innReleaseAllGroupIdSenders(Map<String, BaseSender> senderMap) 
{
+        int totalSenderCnt = 0;
+        for (Map.Entry<String, BaseSender> entry : senderMap.entrySet()) {
+            if (entry == null || entry.getValue() == null) {
+                continue;
+            }
+            try {
+                entry.getValue().close();
+            } catch (Throwable ex) {
+                if (exptCounter.shouldPrint()) {
+                    logger.warn("MsgSenderFactory({}) close groupId({})'s 
sender failure",
+                            this.factoryNo, entry.getKey(), ex);
+                }
+            }
+            totalSenderCnt++;
+        }
+        senderMap.clear();
+        return totalSenderCnt;
+    }
+
+    private int innReleaseAllClusterIdSenders(Map<String, BaseSender> 
senderMap) {
+        int totalSenderCnt = 0;
+        for (Map.Entry<String, BaseSender> entry : senderMap.entrySet()) {
+            if (entry == null
+                    || entry.getKey() == null
+                    || entry.getValue() == null) {
+                continue;
+            }
+            try {
+                entry.getValue().close();
+            } catch (Throwable ex) {
+                if (exptCounter.shouldPrint()) {
+                    logger.warn("MsgSenderFactory({}) close clusterId({})'s 
sender failure",
+                            this.factoryNo, entry.getKey(), ex);
+                }
+            }
+            totalSenderCnt++;
+        }
+        senderMap.clear();
+        return totalSenderCnt;
+    }
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index 52f2116e88..ecdc820a7d 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -44,6 +44,10 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 
+@Deprecated
+/**
+ * Replace by InLongTcpMsgSender
+ */
 public class DefaultMessageSender implements MessageSender {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultMessageSender.class);
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
index 862586ab77..26fe170b90 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
@@ -24,6 +24,7 @@ import 
org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
 import java.util.List;
 import java.util.Map;
 
+@Deprecated
 public interface MessageSender {
 
     void close();
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSenderFactory.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSenderFactory.java
deleted file mode 100644
index a70dfa37a0..0000000000
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSenderFactory.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy;
-
-public class MessageSenderFactory {
-
-}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java
new file mode 100644
index 0000000000..a6a4e20b4c
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy;
+
+import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
+import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
+
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * Message Sender Factory interface
+ *
+ * Used to define the sender factory common methods
+ */
+public interface MsgSenderFactory {
+
+    /**
+     * Shutdown all senders at the factory
+     *
+     */
+    void shutdownAll() throws ProxySdkException;
+
+    /**
+     * Remove the specified sender from the factory
+     *
+     * @param msgSender the specified sender
+     */
+    void removeClient(BaseSender msgSender);
+
+    /**
+     * Remove the sender number int the factory
+     *
+     * @return the number of senders currently in use
+     */
+    int getMsgSenderCount();
+
+    /**
+     * Get or generate a sender from the factory according to groupId
+     *
+     * @param configure  the sender configure
+     * @return the sender
+     */
+    InLongTcpMsgSender genTcpSenderByGroupId(
+            TcpMsgSenderConfig configure) throws ProxySdkException;
+
+    /**
+     * Get or generate a sender from the factory according to groupId
+     *
+     * @param configure  the sender configure
+     * @param selfDefineFactory  the self defined network threads factory
+     * @return the sender
+     */
+    InLongTcpMsgSender genTcpSenderByGroupId(
+            TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory) 
throws ProxySdkException;
+
+    /**
+     * Get or generate a sender from the factory according to clusterId
+     *
+     * @param configure  the sender configure
+     * @return the sender
+     */
+    InLongTcpMsgSender genTcpSenderByClusterId(
+            TcpMsgSenderConfig configure) throws ProxySdkException;
+
+    /**
+     * Get or generate a sender from the factory according to clusterId
+     *
+     * @param configure  the sender configure
+     * @param selfDefineFactory  the self defined network threads factory
+     * @return the sender
+     */
+    InLongTcpMsgSender genTcpSenderByClusterId(
+            TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory) 
throws ProxySdkException;
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
new file mode 100644
index 0000000000..0cb595c261
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy;
+
+import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
+import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Multiple Instances Message Sender Factory
+ *
+ * Used to define the Multiple instance sender factory
+ */
+public class MsgSenderMultiFactory implements MsgSenderFactory {
+
+    private static final AtomicLong refCounter = new AtomicLong(0);
+    private final AtomicBoolean initialized = new AtomicBoolean(false);
+    private final BaseMsgSenderFactory baseMsgSenderFactory;
+
+    public MsgSenderMultiFactory() {
+        this.baseMsgSenderFactory = new BaseMsgSenderFactory(
+                this, "iMultiFact-" + refCounter.incrementAndGet());
+        this.initialized.set(true);
+    }
+
+    @Override
+    public void shutdownAll() throws ProxySdkException {
+        if (!this.initialized.get()) {
+            throw new ProxySdkException("Please initialize the factory 
first!");
+        }
+        this.baseMsgSenderFactory.close();
+    }
+
+    @Override
+    public void removeClient(BaseSender msgSender) {
+        if (msgSender == null
+                || msgSender.getSenderFactory() == null
+                || msgSender.getSenderFactory() != this) {
+            return;
+        }
+        this.baseMsgSenderFactory.removeClient(msgSender);
+    }
+
+    @Override
+    public int getMsgSenderCount() {
+        return this.baseMsgSenderFactory.getMsgSenderCount();
+    }
+
+    @Override
+    public InLongTcpMsgSender genTcpSenderByGroupId(
+            TcpMsgSenderConfig configure) throws ProxySdkException {
+        return genTcpSenderByGroupId(configure, null);
+    }
+
+    @Override
+    public InLongTcpMsgSender genTcpSenderByGroupId(
+            TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory) 
throws ProxySdkException {
+        if (!this.initialized.get()) {
+            throw new ProxySdkException("Please initialize the factory 
first!");
+        }
+        ProxyUtils.validProxyConfigNotNull(configure);
+        return this.baseMsgSenderFactory.genTcpSenderByGroupId(configure, 
selfDefineFactory);
+    }
+
+    @Override
+    public InLongTcpMsgSender genTcpSenderByClusterId(
+            TcpMsgSenderConfig configure) throws ProxySdkException {
+        return genTcpSenderByClusterId(configure, null);
+    }
+
+    @Override
+    public InLongTcpMsgSender genTcpSenderByClusterId(
+            TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory) 
throws ProxySdkException {
+        if (!this.initialized.get()) {
+            throw new ProxySdkException("Please initialize the factory 
first!");
+        }
+        ProxyUtils.validProxyConfigNotNull(configure);
+        return this.baseMsgSenderFactory.genTcpSenderByClusterId(configure, 
selfDefineFactory);
+    }
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
new file mode 100644
index 0000000000..91b1735155
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy;
+
+import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
+import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Singleton Message Sender Factory
+ *
+ * Used to define the singleton sender factory
+ */
+public class MsgSenderSingleFactory implements MsgSenderFactory {
+
+    private static final AtomicBoolean initialized = new AtomicBoolean(false);
+    private static final AtomicLong singletonRefCounter = new AtomicLong(0);
+    private static BaseMsgSenderFactory baseMsgSenderFactory;
+
+    public MsgSenderSingleFactory() {
+        if (singletonRefCounter.incrementAndGet() == 1) {
+            baseMsgSenderFactory = new BaseMsgSenderFactory(this, 
"iSingleFct");
+            initialized.set(true);
+        }
+        while (!initialized.get()) {
+            ProxyUtils.sleepSomeTime(50L);
+        }
+    }
+
+    @Override
+    public void shutdownAll() throws ProxySdkException {
+        if (!initialized.get()) {
+            throw new ProxySdkException("Please initialize the factory 
first!");
+        }
+        if (singletonRefCounter.decrementAndGet() > 0) {
+            return;
+        }
+        baseMsgSenderFactory.close();
+    }
+
+    @Override
+    public void removeClient(BaseSender msgSender) {
+        if (msgSender == null
+                || msgSender.getSenderFactory() == null
+                || msgSender.getSenderFactory() != this) {
+            return;
+        }
+        baseMsgSenderFactory.removeClient(msgSender);
+    }
+
+    @Override
+    public int getMsgSenderCount() {
+        return baseMsgSenderFactory.getMsgSenderCount();
+    }
+
+    @Override
+    public InLongTcpMsgSender genTcpSenderByGroupId(
+            TcpMsgSenderConfig configure) throws ProxySdkException {
+        return genTcpSenderByGroupId(configure, null);
+    }
+
+    @Override
+    public InLongTcpMsgSender genTcpSenderByGroupId(
+            TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory) 
throws ProxySdkException {
+        if (!initialized.get()) {
+            throw new ProxySdkException("Please initialize the factory 
first!");
+        }
+        ProxyUtils.validProxyConfigNotNull(configure);
+        return baseMsgSenderFactory.genTcpSenderByGroupId(configure, 
selfDefineFactory);
+    }
+
+    @Override
+    public InLongTcpMsgSender genTcpSenderByClusterId(
+            TcpMsgSenderConfig configure) throws ProxySdkException {
+        return genTcpSenderByClusterId(configure, null);
+    }
+
+    @Override
+    public InLongTcpMsgSender genTcpSenderByClusterId(
+            TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory) 
throws ProxySdkException {
+        if (!initialized.get()) {
+            throw new ProxySdkException("Please initialize the factory 
first!");
+        }
+        ProxyUtils.validProxyConfigNotNull(configure);
+        return baseMsgSenderFactory.genTcpSenderByClusterId(configure, 
selfDefineFactory);
+    }
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java
index 48039c7c65..274b24f16d 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sdk.dataproxy.common;
 
 import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
 import org.apache.inlong.sdk.dataproxy.metric.MetricConfig;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
 
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
@@ -112,7 +113,8 @@ public class ProxyClientConfig implements Cloneable {
         this.inlongGroupId = groupId.trim();
         this.dataRptProtocol = rptProtocol.name();
         this.setRegionName(regionName);
-        this.groupMetaConfigKey = this.buildMgrConfigKey();
+        this.groupMetaConfigKey = ProxyUtils.buildGroupIdConfigKey(
+                this.dataRptProtocol, this.regionName, this.inlongGroupId);
     }
 
     protected ProxyClientConfig(String managerAddress,
@@ -124,7 +126,8 @@ public class ProxyClientConfig implements Cloneable {
         this.inlongGroupId = groupId.trim();
         this.dataRptProtocol = rptProtocol.name();
         this.setRegionName(regionName);
-        this.groupMetaConfigKey = this.buildMgrConfigKey();
+        this.groupMetaConfigKey = ProxyUtils.buildGroupIdConfigKey(
+                this.dataRptProtocol, this.regionName, this.inlongGroupId);
     }
 
     public void setMgrAuthzInfo(boolean needMgrAuthz,
@@ -226,7 +229,8 @@ public class ProxyClientConfig implements Cloneable {
     public void setRegionName(String regionName) {
         if (StringUtils.isNotBlank(regionName)) {
             this.regionName = regionName.trim().toLowerCase();
-            this.groupMetaConfigKey = this.buildMgrConfigKey();
+            this.groupMetaConfigKey = ProxyUtils.buildGroupIdConfigKey(
+                    this.dataRptProtocol, this.regionName, this.inlongGroupId);
         }
     }
 
@@ -520,8 +524,4 @@ public class ProxyClientConfig implements Cloneable {
         }
         this.managerPort = tmValue;
     }
-
-    private String buildMgrConfigKey() {
-        return this.inlongGroupId + ":" + this.regionName + ":" + 
this.dataRptProtocol;
-    }
 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java
index 9e83f4673c..48ce607037 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java
@@ -17,6 +17,11 @@
 
 package org.apache.inlong.sdk.dataproxy.common;
 
+@Deprecated
+/**
+ * Replace by MsgSendCallback
+ *
+ */
 public interface SendMessageCallback {
 
     /* Invoked when a message is confirmed by TDBus. */
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/ExampleUtils.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/ExampleUtils.java
new file mode 100644
index 0000000000..b271aeccf1
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/ExampleUtils.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.example;
+
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpEventInfo;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSender;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpEventInfo;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSender;
+
+import org.apache.commons.codec.binary.StringUtils;
+
+import java.nio.ByteBuffer;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ExampleUtils {
+
+    private static final SecureRandom cntRandom = new SecureRandom(
+            Long.toString(System.nanoTime()).getBytes());
+
+    public static void sendTcpMessages(TcpMsgSender msgSender, boolean isSync, 
boolean isMultiItem,
+            String groupId, String streamId, int reqCnt, int baseBodyLen, int 
msgItemCnt, ProcessResult procResult) {
+        int sucCnt = 0;
+        int curCount = 0;
+        TcpEventInfo eventInfo;
+        byte[] itemBody = buildBoydData(baseBodyLen);
+        List<byte[]> multiBodys = new ArrayList<>();
+        for (int i = 0; i < msgItemCnt; i++) {
+            multiBodys.add(itemBody);
+        }
+        Map<String, String> localAttrs = new HashMap<>();
+        if (isSync) {
+            if (isMultiItem) {
+                // send single message
+                while (curCount++ < reqCnt) {
+                    try {
+                        if (curCount > 1) {
+                            localAttrs.clear();
+                            localAttrs.put("index", String.valueOf(curCount));
+                        }
+                        eventInfo = new TcpEventInfo(groupId, streamId,
+                                System.currentTimeMillis(), localAttrs, 
multiBodys);
+                    } catch (Throwable ex) {
+                        System.out.println("Build tcp event failure, ex=" + 
ex);
+                        continue;
+                    }
+                    if (!msgSender.sendMessage(eventInfo, procResult)) {
+                        System.out.println("Sync request index=" + curCount + 
", process result=" + procResult);
+                        continue;
+                    }
+                    curCount++;
+                }
+            } else {
+                // send single message
+                while (curCount++ < reqCnt) {
+                    try {
+                        if (curCount > 1) {
+                            localAttrs.clear();
+                            localAttrs.put("index", String.valueOf(curCount));
+                            localAttrs.put("multi", String.valueOf(false));
+                        }
+                        eventInfo = new TcpEventInfo(groupId, streamId,
+                                System.currentTimeMillis(), localAttrs, 
itemBody);
+                    } catch (Throwable ex) {
+                        System.out.println("Build tcp event failure, ex=" + 
ex);
+                        continue;
+                    }
+                    if (!msgSender.sendMessage(eventInfo, procResult)) {
+                        System.out.println("Sync request index=" + curCount + 
", process result=" + procResult);
+                        continue;
+                    }
+                    curCount++;
+                }
+            }
+        } else {
+            if (isMultiItem) {
+                // send multiple message
+                while (curCount++ < reqCnt) {
+                    try {
+                        if (curCount > 1) {
+                            localAttrs.clear();
+                            localAttrs.put("index", String.valueOf(curCount));
+                            localAttrs.put("multi", String.valueOf(true));
+                        }
+                        eventInfo = new TcpEventInfo(groupId, streamId,
+                                System.currentTimeMillis(), localAttrs, 
multiBodys);
+                    } catch (Throwable ex) {
+                        System.out.println("Build multiple tcp event failure, 
ex=" + ex);
+                        continue;
+                    }
+                    if (!msgSender.asyncSendMessage(eventInfo, new 
MyMsgSendBack(curCount), procResult)) {
+                        System.out.println("Async request index=" + curCount + 
", post result=" + procResult);
+                        continue;
+                    }
+                    curCount++;
+                }
+            } else {
+                // send single message
+                while (curCount++ < reqCnt) {
+                    try {
+                        eventInfo = new TcpEventInfo(groupId, streamId,
+                                System.currentTimeMillis(), null, itemBody);
+                    } catch (Throwable ex) {
+                        System.out.println("Build tcp event failure, ex=" + 
ex);
+                        continue;
+                    }
+                    if (!msgSender.asyncSendMessage(eventInfo, new 
MyMsgSendBack(curCount), procResult)) {
+                        System.out.println("Async request index=" + curCount + 
", post result=" + procResult);
+                        continue;
+                    }
+                    curCount++;
+                }
+            }
+        }
+    }
+
+    public static void sendHttpMessages(HttpMsgSender msgSender, boolean 
isSync, boolean isMultiItem,
+            String groupId, String streamId, int reqCnt, int baseBodyLen, int 
msgItemCnt, ProcessResult procResult) {
+        int sucCnt = 0;
+        int curCount = 0;
+        HttpEventInfo eventInfo;
+        String itemBody = getRandomString(baseBodyLen);
+        List<String> multiBodys = new ArrayList<>();
+        for (int i = 0; i < msgItemCnt; i++) {
+            multiBodys.add(itemBody);
+        }
+        if (isSync) {
+            if (isMultiItem) {
+                // send multiple message
+                while (curCount++ < reqCnt) {
+                    try {
+                        eventInfo = new HttpEventInfo(groupId, streamId,
+                                System.currentTimeMillis(), multiBodys);
+                    } catch (Throwable ex) {
+                        System.out.println("Build multiple http event failure, 
ex=" + ex);
+                        continue;
+                    }
+                    if (!msgSender.syncSendMessage(eventInfo, procResult)) {
+                        System.out.println("Sync request index=" + curCount + 
", process result=" + procResult);
+                        continue;
+                    }
+                    curCount++;
+                }
+            } else {
+                // send single message
+                while (curCount++ < reqCnt) {
+                    try {
+                        eventInfo = new HttpEventInfo(groupId, streamId,
+                                System.currentTimeMillis(), itemBody);
+                    } catch (Throwable ex) {
+                        System.out.println("Build single http event failure, 
ex=" + ex);
+                        continue;
+                    }
+                    if (!msgSender.syncSendMessage(eventInfo, procResult)) {
+                        System.out.println("Sync request index=" + curCount + 
", process result=" + procResult);
+                        continue;
+                    }
+                    curCount++;
+                }
+            }
+        } else {
+            if (isMultiItem) {
+                // send multiple message
+                while (curCount++ < reqCnt) {
+                    try {
+                        eventInfo = new HttpEventInfo(groupId, streamId,
+                                System.currentTimeMillis(), multiBodys);
+                    } catch (Throwable ex) {
+                        System.out.println("Build multiple http event failure, 
ex=" + ex);
+                        continue;
+                    }
+                    if (!msgSender.asyncSendMessage(eventInfo, new 
MyMsgSendBack(curCount), procResult)) {
+                        System.out.println("Async request index=" + curCount + 
", post result=" + procResult);
+                        continue;
+                    }
+                    curCount++;
+                }
+            } else {
+                // send single message
+                while (curCount++ < reqCnt) {
+                    try {
+                        eventInfo = new HttpEventInfo(groupId, streamId, 
System.currentTimeMillis(), itemBody);
+                    } catch (Throwable ex) {
+                        System.out.println("Build single http event failure, 
ex=" + ex);
+                        continue;
+                    }
+                    if (!msgSender.asyncSendMessage(eventInfo, new 
MyMsgSendBack(curCount), procResult)) {
+                        System.out.println("Async request: index=" + curCount 
+ ", post result=" + procResult);
+                        continue;
+                    }
+                    curCount++;
+                }
+            }
+        }
+    }
+
+    private static String getRandomString(int length) {
+        String strBase = 
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < length; i++) {
+            int number = cntRandom.nextInt(strBase.length());
+            sb.append(strBase.charAt(number));
+        }
+        return sb.toString();
+    }
+
+    private static byte[] buildBoydData(int bodySize) {
+        final byte[] itemBaseData =
+                StringUtils.getBytesUtf8("inglong tcp test data!");
+        final ByteBuffer dataBuffer = ByteBuffer.allocate(bodySize);
+        while (dataBuffer.hasRemaining()) {
+            int offset = dataBuffer.arrayOffset();
+            dataBuffer.put(itemBaseData, offset,
+                    Math.min(dataBuffer.remaining(), itemBaseData.length));
+        }
+        dataBuffer.flip();
+        return dataBuffer.array();
+    }
+
+    private static class MyMsgSendBack implements MsgSendCallback {
+
+        private final int msgId;
+
+        public MyMsgSendBack(int msgId) {
+            this.msgId = msgId;
+        }
+
+        @Override
+        public void onMessageAck(ProcessResult result) {
+            // System.out.println("msgId=" + msgId + ", send result = " + 
result);
+        }
+
+        @Override
+        public void onException(Throwable ex) {
+            System.out.println("msgId=" + msgId + ", send exception=" + ex);
+
+        }
+    }
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongFactoryExample.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongFactoryExample.java
new file mode 100644
index 0000000000..d1c8b8e19c
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongFactoryExample.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.example;
+
+import org.apache.inlong.common.msg.MsgType;
+import org.apache.inlong.sdk.dataproxy.MsgSenderMultiFactory;
+import org.apache.inlong.sdk.dataproxy.MsgSenderSingleFactory;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ThreadFactory;
+
+public class InLongFactoryExample {
+
+    protected static final Logger logger = 
LoggerFactory.getLogger(InLongFactoryExample.class);
+
+    public static void main(String[] args) throws Exception {
+
+        String managerIp = args[0];
+        int managerPort = Integer.parseInt(args[1]);
+        String groupId = args[2];
+        String streamId = args[3];
+        String secretId = args[4];
+        String secretKey = args[5];
+        int reqCnt = Integer.parseInt(args[6]);
+        int msgSize = 1024;
+        int msgCnt = 1;
+        if (args.length > 7) {
+            msgSize = Integer.parseInt(args[7]);
+            msgCnt = Integer.parseInt(args[8]);
+        }
+
+        System.out.println("InLongFactoryExample start");
+
+        // build singleton factory
+        MsgSenderSingleFactory singleFactory = new MsgSenderSingleFactory();
+        // report data by tcp
+        TcpMsgSenderConfig tcpMsgSenderConfig = new TcpMsgSenderConfig(
+                false, managerIp, managerPort, groupId, secretId, secretKey);
+        tcpMsgSenderConfig.setRequestTimeoutMs(20000L);
+        InLongTcpMsgSender tcpMsgSender =
+                singleFactory.genTcpSenderByClusterId(tcpMsgSenderConfig);
+        ProcessResult procResult = new ProcessResult();
+        if (!tcpMsgSender.start(procResult)) {
+            System.out.println("Start tcp sender failure: process result=" + 
procResult);
+        }
+
+        // report data
+        ExampleUtils.sendTcpMessages(tcpMsgSender, false, false,
+                groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+        ExampleUtils.sendTcpMessages(tcpMsgSender, false, true,
+                groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+        ProxyUtils.sleepSomeTime(10000L);
+        tcpMsgSender.close();
+
+        // report data use multi-factory
+        MsgSenderMultiFactory multiFactory1 = new MsgSenderMultiFactory();
+        MsgSenderMultiFactory multiFactory2 = new MsgSenderMultiFactory();
+        // report data by tcp
+        tcpMsgSenderConfig.setSdkMsgType(MsgType.MSG_ACK_SERVICE);
+        InLongTcpMsgSender tcpMsgSender1 =
+                multiFactory1.genTcpSenderByGroupId(tcpMsgSenderConfig);
+        if (!tcpMsgSender1.start(procResult)) {
+            System.out.println("Start tcp sender1 failure: process result=" + 
procResult);
+        }
+
+        String managerAddr = "http://"; + managerIp + ":" + managerPort;
+        TcpMsgSenderConfig tcpMsgSenderConfig2 =
+                new TcpMsgSenderConfig(managerAddr, groupId, secretId, 
secretKey);
+        tcpMsgSenderConfig2.setSdkMsgType(MsgType.MSG_MULTI_BODY);
+        InLongTcpMsgSender tcpMsgSender2 =
+                multiFactory2.genTcpSenderByGroupId(tcpMsgSenderConfig2);
+        ExampleUtils.sendTcpMessages(tcpMsgSender1, false, false,
+                groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+        ExampleUtils.sendTcpMessages(tcpMsgSender2, false, true,
+                groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+        ProxyUtils.sleepSomeTime(10000L);
+        tcpMsgSender1.close();
+        System.out.println("Multi-1.1 Cur multiFactory1 sender count = "
+                + multiFactory1.getMsgSenderCount()
+                + ", cur multiFactory2 sender count is " + 
multiFactory2.getMsgSenderCount());
+        tcpMsgSender2.close();
+        System.out.println("Multi-1.2 Cur multiFactory1 sender count = "
+                + multiFactory1.getMsgSenderCount()
+                + ", cur multiFactory2 sender count is " + 
multiFactory2.getMsgSenderCount());
+
+        // test self DefineFactory
+        ThreadFactory selfDefineFactory = new 
DefaultThreadFactory("test_self_thread_factory");
+        InLongTcpMsgSender tcpMsgSelfSender =
+                singleFactory.genTcpSenderByGroupId(tcpMsgSenderConfig, 
selfDefineFactory);
+        ExampleUtils.sendTcpMessages(tcpMsgSelfSender, false, false,
+                groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+        ProxyUtils.sleepSomeTime(10000L);
+
+        tcpMsgSelfSender.close();
+
+        System.out.println("singleFactory-3 Cur singleFactory sender count = "
+                + singleFactory.getMsgSenderCount());
+
+        ProxyUtils.sleepSomeTime(3 * 60 * 1000L);
+
+        // close all
+        multiFactory1.shutdownAll();
+        multiFactory2.shutdownAll();
+    }
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongTcpClientExample.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongTcpClientExample.java
new file mode 100644
index 0000000000..fce1404e0d
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongTcpClientExample.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.example;
+
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InLongTcpClientExample {
+
+    protected static final Logger logger = 
LoggerFactory.getLogger(InLongTcpClientExample.class);
+
+    public static void main(String[] args) throws Exception {
+
+        String managerIp = args[0];
+        String managerPort = args[1];
+        String groupId = args[2];
+        String streamId = args[3];
+        String secretId = args[4];
+        String secretKey = args[5];
+        int reqCnt = Integer.parseInt(args[6]);
+        int msgSize = 1024;
+        int msgCnt = 1;
+        if (args.length > 7) {
+            msgSize = Integer.parseInt(args[7]);
+            msgCnt = Integer.parseInt(args[8]);
+        }
+
+        String managerAddr = "http://"; + managerIp + ":" + managerPort;
+
+        TcpMsgSenderConfig dataProxyConfig =
+                new TcpMsgSenderConfig(managerAddr, groupId, secretId, 
secretKey);
+        dataProxyConfig.setRequestTimeoutMs(20000L);
+        InLongTcpMsgSender messageSender = new 
InLongTcpMsgSender(dataProxyConfig);
+
+        logger.info("InLongTcpMsgSender start");
+
+        ProcessResult procResult = new ProcessResult();
+        if (!messageSender.start(procResult)) {
+            System.out.println("Start sender failure: process result=" + 
procResult.toString());
+        }
+        ExampleUtils.sendTcpMessages(messageSender, true, false,
+                groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+        ExampleUtils.sendTcpMessages(messageSender, true, true,
+                groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+        ExampleUtils.sendTcpMessages(messageSender, false, false,
+                groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+        ExampleUtils.sendTcpMessages(messageSender, false, true,
+                groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+        ProxyUtils.sleepSomeTime(10000L);
+    }
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
index a5f44f17ee..c103bd31f4 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
@@ -17,11 +17,13 @@
 
 package org.apache.inlong.sdk.dataproxy.sender;
 
+import org.apache.inlong.sdk.dataproxy.MsgSenderFactory;
 import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
 import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
 import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig;
 import org.apache.inlong.sdk.dataproxy.config.ConfigHolder;
 import org.apache.inlong.sdk.dataproxy.config.HostInfo;
+import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
 import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
 import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
@@ -58,6 +60,8 @@ public abstract class BaseSender implements ConfigHolder {
     private static final AtomicLong senderIdGen = new AtomicLong(0L);
     //
     protected final AtomicInteger senderStatus = new 
AtomicInteger(SENDER_STATUS_UNINITIALIZED);
+    protected final MsgSenderFactory senderFactory;
+    private final String factoryClusterIdKey;
     protected final String senderId;
     protected final ProxyClientConfig baseConfig;
     protected ClientMgr clientMgr;
@@ -71,15 +75,20 @@ public abstract class BaseSender implements ConfigHolder {
     protected volatile int groupIdNum = 0;
     private Map<String, Integer> streamIdMap = new HashMap<>();
 
-    protected BaseSender(ProxyClientConfig configure) {
+    protected BaseSender(ProxyClientConfig configure, MsgSenderFactory 
senderFactory, String clusterIdKey) {
+        if (configure == null) {
+            throw new NullPointerException("configure is null");
+        }
         this.baseConfig = configure.clone();
+        this.senderFactory = senderFactory;
+        this.factoryClusterIdKey = clusterIdKey;
         this.senderId = configure.getDataRptProtocol() + "-" + 
senderIdGen.incrementAndGet();
     }
 
     public boolean start(ProcessResult procResult) {
         if (!this.senderStatus.compareAndSet(
                 SENDER_STATUS_UNINITIALIZED, SENDER_STATUS_INITIALIZING)) {
-            return procResult.setFailResult(ErrorCode.OK);
+            return procResult.setSuccess();
         }
         // start client manager
         if (!this.clientMgr.start(procResult)) {
@@ -103,7 +112,7 @@ public abstract class BaseSender implements ConfigHolder {
         this.configManager.start();
         this.senderStatus.set(SENDER_STATUS_STARTED);
         logger.info("Sender({}) instance started!", senderId);
-        return procResult.setFailResult(ErrorCode.OK);
+        return procResult.setSuccess();
     }
 
     public void close() {
@@ -116,6 +125,9 @@ public abstract class BaseSender implements ConfigHolder {
         }
         configManager.shutDown();
         clientMgr.stop();
+        if (this.senderFactory != null) {
+            this.senderFactory.removeClient(this);
+        }
         logger.info("Sender({}) instance stopped!", senderId);
     }
 
@@ -165,10 +177,22 @@ public abstract class BaseSender implements ConfigHolder {
         return senderStatus.get() == SENDER_STATUS_STARTED;
     }
 
+    public MsgSenderFactory getSenderFactory() {
+        return senderFactory;
+    }
+
+    public String getFactoryClusterIdKey() {
+        return factoryClusterIdKey;
+    }
+
     public String getMetaConfigKey() {
         return this.baseConfig.getGroupMetaConfigKey();
     }
 
+    public ProxyConfigEntry getProxyConfigEntry() {
+        return configManager.getProxyConfigEntry();
+    }
+
     public String getSenderId() {
         return senderId;
     }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java
index 97076dd977..aac6d97aee 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sdk.dataproxy.sender.tcp;
 
 import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.common.msg.MsgType;
+import org.apache.inlong.sdk.dataproxy.MsgSenderFactory;
 import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
 import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
 import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
@@ -56,13 +57,54 @@ public class InLongTcpMsgSender extends BaseSender 
implements TcpMsgSender {
     private final TcpMsgSenderConfig tcpConfig;
     private final TcpClientMgr tcpClientMgr;
 
-    protected InLongTcpMsgSender(TcpMsgSenderConfig configure, ThreadFactory 
selfDefineFactory) {
-        super(configure);
+    public InLongTcpMsgSender(TcpMsgSenderConfig configure) {
+        this(configure, null, null, null);
+    }
+
+    public InLongTcpMsgSender(TcpMsgSenderConfig configure, ThreadFactory 
selfDefineFactory) {
+        this(configure, selfDefineFactory, null, null);
+    }
+
+    public InLongTcpMsgSender(TcpMsgSenderConfig configure,
+            ThreadFactory selfDefineFactory, MsgSenderFactory senderFactory, 
String clusterIdKey) {
+        super(configure, senderFactory, clusterIdKey);
         this.tcpConfig = (TcpMsgSenderConfig) baseConfig;
         this.clientMgr = new TcpClientMgr(this.getSenderId(), this.tcpConfig, 
selfDefineFactory);
         this.tcpClientMgr = (TcpClientMgr) clientMgr;
     }
 
+    @Override
+    public boolean sendMessage(TcpEventInfo eventInfo, ProcessResult 
procResult) {
+        if (eventInfo == null) {
+            throw new NullPointerException("eventInfo is null");
+        }
+        if (procResult == null) {
+            throw new NullPointerException("procResult is null");
+        }
+        if (!this.isStarted()) {
+            return procResult.setFailResult(ErrorCode.SDK_CLOSED);
+        }
+        return processEvent(SendQos.SOURCE_ACK, eventInfo, null, procResult);
+    }
+
+    @Override
+    public boolean asyncSendMessage(
+            TcpEventInfo eventInfo, MsgSendCallback callback, ProcessResult 
procResult) {
+        if (eventInfo == null) {
+            throw new NullPointerException("eventInfo is null");
+        }
+        if (callback == null) {
+            throw new NullPointerException("callback is null");
+        }
+        if (procResult == null) {
+            throw new NullPointerException("procResult is null");
+        }
+        if (!this.isStarted()) {
+            return procResult.setFailResult(ErrorCode.SDK_CLOSED);
+        }
+        return processEvent(SendQos.SOURCE_ACK, eventInfo, callback, 
procResult);
+    }
+
     @Override
     public boolean sendMessageWithoutAck(TcpEventInfo eventInfo, ProcessResult 
procResult) {
         if (eventInfo == null) {
@@ -78,8 +120,7 @@ public class InLongTcpMsgSender extends BaseSender 
implements TcpMsgSender {
     }
 
     @Override
-    public boolean syncSendMessage(boolean sendInB2B,
-            TcpEventInfo eventInfo, ProcessResult procResult) {
+    public boolean sendMsgWithSinkAck(TcpEventInfo eventInfo, ProcessResult 
procResult) {
         if (eventInfo == null) {
             throw new NullPointerException("eventInfo is null");
         }
@@ -89,15 +130,11 @@ public class InLongTcpMsgSender extends BaseSender 
implements TcpMsgSender {
         if (!this.isStarted()) {
             return procResult.setFailResult(ErrorCode.SDK_CLOSED);
         }
-        if (sendInB2B) {
-            return processEvent(SendQos.SOURCE_ACK, eventInfo, null, 
procResult);
-        } else {
-            return processEvent(SendQos.SINK_ACK, eventInfo, null, procResult);
-        }
+        return processEvent(SendQos.SINK_ACK, eventInfo, null, procResult);
     }
 
     @Override
-    public boolean asyncSendMessage(boolean sendInB2B,
+    public boolean asyncSendMsgWithSinkAck(
             TcpEventInfo eventInfo, MsgSendCallback callback, ProcessResult 
procResult) {
         if (eventInfo == null) {
             throw new NullPointerException("eventInfo is null");
@@ -111,11 +148,7 @@ public class InLongTcpMsgSender extends BaseSender 
implements TcpMsgSender {
         if (!this.isStarted()) {
             return procResult.setFailResult(ErrorCode.SDK_CLOSED);
         }
-        if (sendInB2B) {
-            return processEvent(SendQos.SOURCE_ACK, eventInfo, callback, 
procResult);
-        } else {
-            return processEvent(SendQos.SINK_ACK, eventInfo, callback, 
procResult);
-        }
+        return processEvent(SendQos.SINK_ACK, eventInfo, callback, procResult);
     }
 
     @Override
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSender.java
index 97543d86a5..d36117614c 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSender.java
@@ -28,6 +28,47 @@ import 
org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;
  */
 public interface TcpMsgSender extends MessageSender {
 
+    /**
+     * Synchronously send message and wait for the final sending result
+     *      DataProxy returns response as soon as it receives the request and
+     *      forwards the message in B2B mode until it succeeds;
+     *
+     * <p>Attention:
+     * 1. if return false, the caller can choose to wait for a period of time 
before trying again, or
+     *    discard the event after multiple retries and failures.
+     * 2. this method tries to ensure that messages are delivered, but there
+     *    may be duplicate messages or message loss scenarios. It is suitable 
for scenarios with
+     *    a very large number of reports, very low reporting time 
requirements, and
+     *    the need to return the sending results.
+     * </p>
+     *
+     * @param eventInfo the event information need to send
+     * @param procResult The send result, including the detail error infos if 
failed
+     * @return  true if successful, false if failed for some reason.
+     */
+    boolean sendMessage(TcpEventInfo eventInfo, ProcessResult procResult);
+
+    /**
+     * Asynchronously send message and return immediately
+     *      DataProxy returns response as soon as it receives the request and
+     *      forwards the message in B2B mode until it succeeds;
+     *
+     * <p>Attention:
+     * 1. if return false, the caller can choose to wait for a period of time 
before trying again, or
+     *    discard the event after multiple retries and failures.
+     * 2. this method, tries to ensure that messages are delivered, but there
+     *    may be duplicate messages or message loss scenarios. It is suitable 
for scenarios with
+     *    a very large number of reports, very low reporting time 
requirements, and
+     *    the need to return the sending results.
+     * </p>
+     * @param eventInfo the event information need to send
+     * @param callback  the callback that returns the response from DataProxy 
or
+     *                 an exception that occurred while waiting for the 
response.
+     * @param procResult The send result, including the detail error infos if 
the event not accepted
+     * @return  true if successful, false if the event not accepted for some 
reason.
+     */
+    boolean asyncSendMessage(TcpEventInfo eventInfo, MsgSendCallback callback, 
ProcessResult procResult);
+
     /**
      * Send message without response
      *
@@ -47,55 +88,41 @@ public interface TcpMsgSender extends MessageSender {
 
     /**
      * Synchronously send message and wait for the final sending result
+     *     DataProxy returns response after receiving the request and 
forwarding it successfully,
+     *     and DataProxy does not retry on failure
      *
      * <p>Attention:
      * 1. if return false, the caller can choose to wait for a period of time 
before trying again, or
      *    discard the event after multiple retries and failures.
-     * 2. this method, with sendInB2B = true, tries to ensure that messages 
are delivered, but there
-     *    may be duplicate messages or message loss scenarios. It is suitable 
for scenarios with
-     *    a very large number of reports, very low reporting time 
requirements, and
-     *    the need to return the sending results.
-     * 3. this method, with sendInB2B = false, ensures that the message is 
delivered only once and
+     * 2. this method, ensures that the message is delivered only once and
      *    will not be repeated. It is suitable for businesses with a small 
amount of reports and
      *    no requirements on the reporting time, but require DataProxy to 
forward messages with high reliability.
      * </p>
      *
-     * @param sendInB2B indicates the DataProxy message service mode, true 
indicates DataProxy returns
-     *                 as soon as it receives the request and forwards the 
message in B2B mode until it succeeds;
-     *                 false indicates DataProxy returns after receiving the 
request and forwarding it successfully,
-     *                 and DataProxy does not retry on failure
      * @param eventInfo the event information need to send
      * @param procResult The send result, including the detail error infos if 
failed
      * @return  true if successful, false if failed for some reason.
      */
-    boolean syncSendMessage(boolean sendInB2B,
-            TcpEventInfo eventInfo, ProcessResult procResult);
+    boolean sendMsgWithSinkAck(TcpEventInfo eventInfo, ProcessResult 
procResult);
 
     /**
-     * Asynchronously send message
+     * Asynchronously send message and return immediately
+     *     DataProxy returns response after receiving the request and 
forwarding it successfully,
+     *     and DataProxy does not retry on failure
      *
      * <p>Attention:
      * 1. if return false, the caller can choose to wait for a period of time 
before trying again, or
      *    discard the event after multiple retries and failures.
-     * 2. this method, with sendInB2B = true, tries to ensure that messages 
are delivered, but there
-     *    may be duplicate messages or message loss scenarios. It is suitable 
for scenarios with
-     *    a very large number of reports, very low reporting time 
requirements, and
-     *    the need to return the sending results.
-     * 3. this method, with sendInB2B = false, ensures that the message is 
delivered only once and
+     * 3. this method, ensures that the message is delivered only once and
      *    will not be repeated. It is suitable for businesses with a small 
amount of reports and
      *    no requirements on the reporting time, but require DataProxy to 
forward messages with high reliability.
      * </p>
      *
-     * @param sendInB2B indicates the DataProxy message service mode, true 
indicates DataProxy returns
-     *                 as soon as it receives the request and forwards the 
message in B2B mode until it succeeds;
-     *                 false indicates DataProxy returns after receiving the 
request and forwarding it successfully,
-     *                 and DataProxy does not retry on failure
      * @param eventInfo the event information need to send
      * @param callback  the callback that returns the response from DataProxy 
or
      *                 an exception that occurred while waiting for the 
response.
      * @param procResult The send result, including the detail error infos if 
the event not accepted
      * @return  true if successful, false if the event not accepted for some 
reason.
      */
-    boolean asyncSendMessage(boolean sendInB2B,
-            TcpEventInfo eventInfo, MsgSendCallback callback, ProcessResult 
procResult);
+    boolean asyncSendMsgWithSinkAck(TcpEventInfo eventInfo, MsgSendCallback 
callback, ProcessResult procResult);
 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
index 2eace638a2..1e72c2bf9c 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
@@ -19,7 +19,9 @@ package org.apache.inlong.sdk.dataproxy.utils;
 
 import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.common.msg.MsgType;
+import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig;
 import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
+import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
 import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
 
 import org.apache.commons.lang3.StringUtils;
@@ -124,6 +126,20 @@ public class ProxyUtils {
         }
     }
 
+    public static void validProxyConfigNotNull(ProxyClientConfig configure) 
throws ProxySdkException {
+        if (configure == null) {
+            throw new ProxySdkException("configure is null!");
+        }
+    }
+
+    public static String buildClusterIdKey(String protocol, String regionName, 
Integer clusterId) {
+        return clusterId + ":" + regionName + ":" + protocol;
+    }
+
+    public static String buildGroupIdConfigKey(String protocol, String 
regionName, String groupId) {
+        return protocol + ":" + regionName + ":" + groupId;
+    }
+
     public static boolean isAttrKeysValid(Map<String, String> attrsMap) {
         if (attrsMap == null || attrsMap.size() == 0) {
             return false;

Reply via email to