This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 9493c040b9 [INLONG-11702][SDK] Optimize Sender factory implementation
(#11705)
9493c040b9 is described below
commit 9493c040b958d25fda2f21708fa698ae61a5f8e5
Author: Goson Zhang <[email protected]>
AuthorDate: Wed Jan 22 14:30:30 2025 +0800
[INLONG-11702][SDK] Optimize Sender factory implementation (#11705)
Co-authored-by: gosonzhang <[email protected]>
---
.../inlong/sdk/dataproxy/BaseMsgSenderFactory.java | 267 +++++++++++++++++++++
.../inlong/sdk/dataproxy/DefaultMessageSender.java | 4 +
.../apache/inlong/sdk/dataproxy/MessageSender.java | 1 +
.../inlong/sdk/dataproxy/MessageSenderFactory.java | 22 --
.../inlong/sdk/dataproxy/MsgSenderFactory.java | 91 +++++++
.../sdk/dataproxy/MsgSenderMultiFactory.java | 101 ++++++++
.../sdk/dataproxy/MsgSenderSingleFactory.java | 108 +++++++++
.../sdk/dataproxy/common/ProxyClientConfig.java | 14 +-
.../sdk/dataproxy/common/SendMessageCallback.java | 5 +
.../inlong/sdk/dataproxy/example/ExampleUtils.java | 259 ++++++++++++++++++++
.../dataproxy/example/InLongFactoryExample.java | 127 ++++++++++
.../dataproxy/example/InLongTcpClientExample.java | 71 ++++++
.../inlong/sdk/dataproxy/sender/BaseSender.java | 30 ++-
.../dataproxy/sender/tcp/InLongTcpMsgSender.java | 63 +++--
.../sdk/dataproxy/sender/tcp/TcpMsgSender.java | 73 ++++--
.../inlong/sdk/dataproxy/utils/ProxyUtils.java | 16 ++
16 files changed, 1182 insertions(+), 70 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
new file mode 100644
index 0000000000..0bc0cf4bb9
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy;
+
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig;
+import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
+import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
+import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
+import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
+import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Base Message Sender Factory
+ *
+ * Used to manage the instance relationship of the sender factory.
+ * Since both singleton and multiple instances involve the same relationship,
+ * they are abstracted and maintained separately here.
+ */
+public class BaseMsgSenderFactory {
+
+ private static final Logger logger =
LoggerFactory.getLogger(BaseMsgSenderFactory.class);
+ private static final LogCounter exptCounter = new LogCounter(10, 100000,
60 * 1000L);
+ // msg send factory
+ private final MsgSenderFactory msgSenderFactory;
+ private final String factoryNo;
+ // for senders
+ private final ReentrantReadWriteLock senderCacheLock = new
ReentrantReadWriteLock();
+ // for inlong groupId -- Sender map
+ private final ConcurrentHashMap<String, BaseSender> groupIdSenderMap = new
ConcurrentHashMap<>();
+ // for inlong clusterId -- Sender map
+ private final ConcurrentHashMap<String, BaseSender> clusterIdSenderMap =
new ConcurrentHashMap<>();
+
+ public BaseMsgSenderFactory(MsgSenderFactory msgSenderFactory, String
factoryNo) {
+ this.msgSenderFactory = msgSenderFactory;
+ this.factoryNo = factoryNo;
+ logger.info("MsgSenderFactory({}) started", this.factoryNo);
+ }
+
+ public void close() {
+ int totalSenderCnt;
+ int totalTDBankCnt;
+ senderCacheLock.writeLock().lock();
+ try {
+ // release groupId mapped senders
+ totalSenderCnt = innReleaseAllGroupIdSenders(groupIdSenderMap);
+ // release clusterId mapped senders
+ totalSenderCnt +=
innReleaseAllClusterIdSenders(clusterIdSenderMap);
+ } finally {
+ senderCacheLock.writeLock().unlock();
+ }
+ logger.info("MsgSenderFactory({}) closed, release {} inlong senders",
+ this.factoryNo, totalSenderCnt);
+ }
+
+ public void removeClient(BaseSender msgSender) {
+ if (msgSender == null
+ || msgSender.getSenderFactory() == null
+ || msgSender.getSenderFactory() != msgSenderFactory) {
+ return;
+ }
+ boolean removed;
+ String senderId = msgSender.getSenderId();
+ senderCacheLock.writeLock().lock();
+ try {
+ if (msgSender.getFactoryClusterIdKey() == null) {
+ removed = innRemoveGroupIdSender(msgSender, groupIdSenderMap);
+ } else {
+ removed = innRemoveClusterIdSender(msgSender,
clusterIdSenderMap);
+ }
+ } finally {
+ senderCacheLock.writeLock().unlock();
+ }
+ if (removed) {
+ logger.info("MsgSenderFactory({}) removed sender({})",
this.factoryNo, senderId);
+ }
+ }
+
+ public int getMsgSenderCount() {
+ return groupIdSenderMap.size() + clusterIdSenderMap.size();
+ }
+
+ public InLongTcpMsgSender genTcpSenderByGroupId(
+ TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory)
throws ProxySdkException {
+ ProxyUtils.validProxyConfigNotNull(configure);
+ // query cached sender
+ String metaConfigKey = configure.getGroupMetaConfigKey();
+ InLongTcpMsgSender messageSender =
+ (InLongTcpMsgSender) groupIdSenderMap.get(metaConfigKey);
+ if (messageSender != null) {
+ return messageSender;
+ }
+ // valid configure info
+ ProcessResult procResult = new ProcessResult();
+ qryProxyMetaConfigure(configure, procResult);
+ // generate sender
+ senderCacheLock.writeLock().lock();
+ try {
+ // re-get the created sender based on the groupId key after locked
+ messageSender = (InLongTcpMsgSender)
groupIdSenderMap.get(metaConfigKey);
+ if (messageSender != null) {
+ return messageSender;
+ }
+ // build a new sender based on groupId
+ messageSender = new InLongTcpMsgSender(configure,
selfDefineFactory, msgSenderFactory, null);
+ if (!messageSender.start(procResult)) {
+ messageSender.close();
+ throw new ProxySdkException("Failed to start groupId sender: "
+ procResult);
+ }
+ groupIdSenderMap.put(metaConfigKey, messageSender);
+ logger.info("MsgSenderFactory({}) generated a new groupId({})
sender({})",
+ this.factoryNo, metaConfigKey,
messageSender.getSenderId());
+ return messageSender;
+ } catch (Throwable ex) {
+ if (exptCounter.shouldPrint()) {
+ logger.warn("MsgSenderFactory({}) build groupId sender({})
exception",
+ this.factoryNo, metaConfigKey, ex);
+ }
+ throw new ProxySdkException("Failed to build groupId sender: " +
ex.getMessage());
+ } finally {
+ senderCacheLock.writeLock().unlock();
+ }
+ }
+
+ public InLongTcpMsgSender genTcpSenderByClusterId(
+ TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory)
throws ProxySdkException {
+ ProxyUtils.validProxyConfigNotNull(configure);
+ // get groupId's clusterIdKey
+ ProcessResult procResult = new ProcessResult();
+ ProxyConfigEntry proxyConfigEntry = qryProxyMetaConfigure(configure,
procResult);;
+ String clusterIdKey = ProxyUtils.buildClusterIdKey(
+ configure.getDataRptProtocol(), configure.getRegionName(),
proxyConfigEntry.getClusterId());
+ // get local built sender
+ InLongTcpMsgSender messageSender = (InLongTcpMsgSender)
clusterIdSenderMap.get(clusterIdKey);
+ if (messageSender != null) {
+ return messageSender;
+ }
+ // generate sender
+ senderCacheLock.writeLock().lock();
+ try {
+ // re-get the created sender based on the clusterId Key after
locked
+ messageSender = (InLongTcpMsgSender)
clusterIdSenderMap.get(clusterIdKey);
+ if (messageSender != null) {
+ return messageSender;
+ }
+ // build a new sender based on clusterId Key
+ messageSender = new InLongTcpMsgSender(configure,
+ selfDefineFactory, msgSenderFactory, clusterIdKey);
+ if (!messageSender.start(procResult)) {
+ messageSender.close();
+ throw new ProxySdkException("Failed to start cluster sender: "
+ procResult);
+ }
+ clusterIdSenderMap.put(clusterIdKey, messageSender);
+ logger.info("MsgSenderFactory({}) generated a new clusterId({})
sender({})",
+ this.factoryNo, clusterIdKey, messageSender.getSenderId());
+ return messageSender;
+ } catch (Throwable ex) {
+ if (exptCounter.shouldPrint()) {
+ logger.warn("MsgSenderFactory({}) build cluster sender({})
exception",
+ this.factoryNo, clusterIdKey, ex);
+ }
+ throw new ProxySdkException("Failed to build cluster sender: " +
ex.getMessage());
+ } finally {
+ senderCacheLock.writeLock().unlock();
+ }
+ }
+
+ private ProxyConfigEntry qryProxyMetaConfigure(
+ ProxyClientConfig proxyConfig, ProcessResult procResult) throws
ProxySdkException {
+ ProxyConfigManager inlongMetaQryMgr = new
ProxyConfigManager(proxyConfig);
+ // check whether valid configure
+ if (!inlongMetaQryMgr.getGroupIdConfigure(true, procResult)) {
+ throw new ProxySdkException("Failed to query remote group config:
" + procResult);
+ }
+ if (proxyConfig.isEnableReportEncrypt()
+ && !inlongMetaQryMgr.getEncryptConfigure(true, procResult)) {
+ throw new ProxySdkException("Failed to query remote encrypt
config: " + procResult);
+ }
+ return inlongMetaQryMgr.getProxyConfigEntry();
+ }
+
+ private boolean innRemoveGroupIdSender(BaseSender msgSender, Map<String,
BaseSender> senderMap) {
+ BaseSender tmpSender = senderMap.get(msgSender.getMetaConfigKey());
+ if (tmpSender == null
+ || !tmpSender.getSenderId().equals(msgSender.getSenderId())) {
+ return false;
+ }
+ return senderMap.remove(msgSender.getMetaConfigKey()) != null;
+ }
+
+ private boolean innRemoveClusterIdSender(BaseSender msgSender, Map<String,
BaseSender> senderMap) {
+ BaseSender tmpSender =
senderMap.get(msgSender.getFactoryClusterIdKey());
+ if (tmpSender == null
+ || !tmpSender.getSenderId().equals(msgSender.getSenderId())) {
+ return false;
+ }
+ return senderMap.remove(msgSender.getFactoryClusterIdKey()) != null;
+ }
+
+ private int innReleaseAllGroupIdSenders(Map<String, BaseSender> senderMap)
{
+ int totalSenderCnt = 0;
+ for (Map.Entry<String, BaseSender> entry : senderMap.entrySet()) {
+ if (entry == null || entry.getValue() == null) {
+ continue;
+ }
+ try {
+ entry.getValue().close();
+ } catch (Throwable ex) {
+ if (exptCounter.shouldPrint()) {
+ logger.warn("MsgSenderFactory({}) close groupId({})'s
sender failure",
+ this.factoryNo, entry.getKey(), ex);
+ }
+ }
+ totalSenderCnt++;
+ }
+ senderMap.clear();
+ return totalSenderCnt;
+ }
+
+ private int innReleaseAllClusterIdSenders(Map<String, BaseSender>
senderMap) {
+ int totalSenderCnt = 0;
+ for (Map.Entry<String, BaseSender> entry : senderMap.entrySet()) {
+ if (entry == null
+ || entry.getKey() == null
+ || entry.getValue() == null) {
+ continue;
+ }
+ try {
+ entry.getValue().close();
+ } catch (Throwable ex) {
+ if (exptCounter.shouldPrint()) {
+ logger.warn("MsgSenderFactory({}) close clusterId({})'s
sender failure",
+ this.factoryNo, entry.getKey(), ex);
+ }
+ }
+ totalSenderCnt++;
+ }
+ senderMap.clear();
+ return totalSenderCnt;
+ }
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index 52f2116e88..ecdc820a7d 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -44,6 +44,10 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
+@Deprecated
+/**
+ * Replace by InLongTcpMsgSender
+ */
public class DefaultMessageSender implements MessageSender {
private static final Logger LOGGER =
LoggerFactory.getLogger(DefaultMessageSender.class);
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
index 862586ab77..26fe170b90 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
@@ -24,6 +24,7 @@ import
org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
import java.util.List;
import java.util.Map;
+@Deprecated
public interface MessageSender {
void close();
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSenderFactory.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSenderFactory.java
deleted file mode 100644
index a70dfa37a0..0000000000
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSenderFactory.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy;
-
-public class MessageSenderFactory {
-
-}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java
new file mode 100644
index 0000000000..a6a4e20b4c
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy;
+
+import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
+import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
+
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * Message Sender Factory interface
+ *
+ * Used to define the sender factory common methods
+ */
+public interface MsgSenderFactory {
+
+ /**
+ * Shutdown all senders at the factory
+ *
+ */
+ void shutdownAll() throws ProxySdkException;
+
+ /**
+ * Remove the specified sender from the factory
+ *
+ * @param msgSender the specified sender
+ */
+ void removeClient(BaseSender msgSender);
+
+ /**
+ * Remove the sender number int the factory
+ *
+ * @return the number of senders currently in use
+ */
+ int getMsgSenderCount();
+
+ /**
+ * Get or generate a sender from the factory according to groupId
+ *
+ * @param configure the sender configure
+ * @return the sender
+ */
+ InLongTcpMsgSender genTcpSenderByGroupId(
+ TcpMsgSenderConfig configure) throws ProxySdkException;
+
+ /**
+ * Get or generate a sender from the factory according to groupId
+ *
+ * @param configure the sender configure
+ * @param selfDefineFactory the self defined network threads factory
+ * @return the sender
+ */
+ InLongTcpMsgSender genTcpSenderByGroupId(
+ TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory)
throws ProxySdkException;
+
+ /**
+ * Get or generate a sender from the factory according to clusterId
+ *
+ * @param configure the sender configure
+ * @return the sender
+ */
+ InLongTcpMsgSender genTcpSenderByClusterId(
+ TcpMsgSenderConfig configure) throws ProxySdkException;
+
+ /**
+ * Get or generate a sender from the factory according to clusterId
+ *
+ * @param configure the sender configure
+ * @param selfDefineFactory the self defined network threads factory
+ * @return the sender
+ */
+ InLongTcpMsgSender genTcpSenderByClusterId(
+ TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory)
throws ProxySdkException;
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
new file mode 100644
index 0000000000..0cb595c261
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy;
+
+import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
+import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Multiple Instances Message Sender Factory
+ *
+ * Used to define the Multiple instance sender factory
+ */
+public class MsgSenderMultiFactory implements MsgSenderFactory {
+
+ private static final AtomicLong refCounter = new AtomicLong(0);
+ private final AtomicBoolean initialized = new AtomicBoolean(false);
+ private final BaseMsgSenderFactory baseMsgSenderFactory;
+
+ public MsgSenderMultiFactory() {
+ this.baseMsgSenderFactory = new BaseMsgSenderFactory(
+ this, "iMultiFact-" + refCounter.incrementAndGet());
+ this.initialized.set(true);
+ }
+
+ @Override
+ public void shutdownAll() throws ProxySdkException {
+ if (!this.initialized.get()) {
+ throw new ProxySdkException("Please initialize the factory
first!");
+ }
+ this.baseMsgSenderFactory.close();
+ }
+
+ @Override
+ public void removeClient(BaseSender msgSender) {
+ if (msgSender == null
+ || msgSender.getSenderFactory() == null
+ || msgSender.getSenderFactory() != this) {
+ return;
+ }
+ this.baseMsgSenderFactory.removeClient(msgSender);
+ }
+
+ @Override
+ public int getMsgSenderCount() {
+ return this.baseMsgSenderFactory.getMsgSenderCount();
+ }
+
+ @Override
+ public InLongTcpMsgSender genTcpSenderByGroupId(
+ TcpMsgSenderConfig configure) throws ProxySdkException {
+ return genTcpSenderByGroupId(configure, null);
+ }
+
+ @Override
+ public InLongTcpMsgSender genTcpSenderByGroupId(
+ TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory)
throws ProxySdkException {
+ if (!this.initialized.get()) {
+ throw new ProxySdkException("Please initialize the factory
first!");
+ }
+ ProxyUtils.validProxyConfigNotNull(configure);
+ return this.baseMsgSenderFactory.genTcpSenderByGroupId(configure,
selfDefineFactory);
+ }
+
+ @Override
+ public InLongTcpMsgSender genTcpSenderByClusterId(
+ TcpMsgSenderConfig configure) throws ProxySdkException {
+ return genTcpSenderByClusterId(configure, null);
+ }
+
+ @Override
+ public InLongTcpMsgSender genTcpSenderByClusterId(
+ TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory)
throws ProxySdkException {
+ if (!this.initialized.get()) {
+ throw new ProxySdkException("Please initialize the factory
first!");
+ }
+ ProxyUtils.validProxyConfigNotNull(configure);
+ return this.baseMsgSenderFactory.genTcpSenderByClusterId(configure,
selfDefineFactory);
+ }
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
new file mode 100644
index 0000000000..91b1735155
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy;
+
+import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
+import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Singleton Message Sender Factory
+ *
+ * Used to define the singleton sender factory
+ */
+public class MsgSenderSingleFactory implements MsgSenderFactory {
+
+ private static final AtomicBoolean initialized = new AtomicBoolean(false);
+ private static final AtomicLong singletonRefCounter = new AtomicLong(0);
+ private static BaseMsgSenderFactory baseMsgSenderFactory;
+
+ public MsgSenderSingleFactory() {
+ if (singletonRefCounter.incrementAndGet() == 1) {
+ baseMsgSenderFactory = new BaseMsgSenderFactory(this,
"iSingleFct");
+ initialized.set(true);
+ }
+ while (!initialized.get()) {
+ ProxyUtils.sleepSomeTime(50L);
+ }
+ }
+
+ @Override
+ public void shutdownAll() throws ProxySdkException {
+ if (!initialized.get()) {
+ throw new ProxySdkException("Please initialize the factory
first!");
+ }
+ if (singletonRefCounter.decrementAndGet() > 0) {
+ return;
+ }
+ baseMsgSenderFactory.close();
+ }
+
+ @Override
+ public void removeClient(BaseSender msgSender) {
+ if (msgSender == null
+ || msgSender.getSenderFactory() == null
+ || msgSender.getSenderFactory() != this) {
+ return;
+ }
+ baseMsgSenderFactory.removeClient(msgSender);
+ }
+
+ @Override
+ public int getMsgSenderCount() {
+ return baseMsgSenderFactory.getMsgSenderCount();
+ }
+
+ @Override
+ public InLongTcpMsgSender genTcpSenderByGroupId(
+ TcpMsgSenderConfig configure) throws ProxySdkException {
+ return genTcpSenderByGroupId(configure, null);
+ }
+
+ @Override
+ public InLongTcpMsgSender genTcpSenderByGroupId(
+ TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory)
throws ProxySdkException {
+ if (!initialized.get()) {
+ throw new ProxySdkException("Please initialize the factory
first!");
+ }
+ ProxyUtils.validProxyConfigNotNull(configure);
+ return baseMsgSenderFactory.genTcpSenderByGroupId(configure,
selfDefineFactory);
+ }
+
+ @Override
+ public InLongTcpMsgSender genTcpSenderByClusterId(
+ TcpMsgSenderConfig configure) throws ProxySdkException {
+ return genTcpSenderByClusterId(configure, null);
+ }
+
+ @Override
+ public InLongTcpMsgSender genTcpSenderByClusterId(
+ TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory)
throws ProxySdkException {
+ if (!initialized.get()) {
+ throw new ProxySdkException("Please initialize the factory
first!");
+ }
+ ProxyUtils.validProxyConfigNotNull(configure);
+ return baseMsgSenderFactory.genTcpSenderByClusterId(configure,
selfDefineFactory);
+ }
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java
index 48039c7c65..274b24f16d 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sdk.dataproxy.common;
import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
import org.apache.inlong.sdk.dataproxy.metric.MetricConfig;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@@ -112,7 +113,8 @@ public class ProxyClientConfig implements Cloneable {
this.inlongGroupId = groupId.trim();
this.dataRptProtocol = rptProtocol.name();
this.setRegionName(regionName);
- this.groupMetaConfigKey = this.buildMgrConfigKey();
+ this.groupMetaConfigKey = ProxyUtils.buildGroupIdConfigKey(
+ this.dataRptProtocol, this.regionName, this.inlongGroupId);
}
protected ProxyClientConfig(String managerAddress,
@@ -124,7 +126,8 @@ public class ProxyClientConfig implements Cloneable {
this.inlongGroupId = groupId.trim();
this.dataRptProtocol = rptProtocol.name();
this.setRegionName(regionName);
- this.groupMetaConfigKey = this.buildMgrConfigKey();
+ this.groupMetaConfigKey = ProxyUtils.buildGroupIdConfigKey(
+ this.dataRptProtocol, this.regionName, this.inlongGroupId);
}
public void setMgrAuthzInfo(boolean needMgrAuthz,
@@ -226,7 +229,8 @@ public class ProxyClientConfig implements Cloneable {
public void setRegionName(String regionName) {
if (StringUtils.isNotBlank(regionName)) {
this.regionName = regionName.trim().toLowerCase();
- this.groupMetaConfigKey = this.buildMgrConfigKey();
+ this.groupMetaConfigKey = ProxyUtils.buildGroupIdConfigKey(
+ this.dataRptProtocol, this.regionName, this.inlongGroupId);
}
}
@@ -520,8 +524,4 @@ public class ProxyClientConfig implements Cloneable {
}
this.managerPort = tmValue;
}
-
- private String buildMgrConfigKey() {
- return this.inlongGroupId + ":" + this.regionName + ":" +
this.dataRptProtocol;
- }
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java
index 9e83f4673c..48ce607037 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java
@@ -17,6 +17,11 @@
package org.apache.inlong.sdk.dataproxy.common;
+@Deprecated
+/**
+ * Replace by MsgSendCallback
+ *
+ */
public interface SendMessageCallback {
/* Invoked when a message is confirmed by TDBus. */
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/ExampleUtils.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/ExampleUtils.java
new file mode 100644
index 0000000000..b271aeccf1
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/ExampleUtils.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.example;
+
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpEventInfo;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSender;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpEventInfo;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSender;
+
+import org.apache.commons.codec.binary.StringUtils;
+
+import java.nio.ByteBuffer;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ExampleUtils {
+
+ private static final SecureRandom cntRandom = new SecureRandom(
+ Long.toString(System.nanoTime()).getBytes());
+
+ public static void sendTcpMessages(TcpMsgSender msgSender, boolean isSync,
boolean isMultiItem,
+ String groupId, String streamId, int reqCnt, int baseBodyLen, int
msgItemCnt, ProcessResult procResult) {
+ int sucCnt = 0;
+ int curCount = 0;
+ TcpEventInfo eventInfo;
+ byte[] itemBody = buildBoydData(baseBodyLen);
+ List<byte[]> multiBodys = new ArrayList<>();
+ for (int i = 0; i < msgItemCnt; i++) {
+ multiBodys.add(itemBody);
+ }
+ Map<String, String> localAttrs = new HashMap<>();
+ if (isSync) {
+ if (isMultiItem) {
+ // send single message
+ while (curCount++ < reqCnt) {
+ try {
+ if (curCount > 1) {
+ localAttrs.clear();
+ localAttrs.put("index", String.valueOf(curCount));
+ }
+ eventInfo = new TcpEventInfo(groupId, streamId,
+ System.currentTimeMillis(), localAttrs,
multiBodys);
+ } catch (Throwable ex) {
+ System.out.println("Build tcp event failure, ex=" +
ex);
+ continue;
+ }
+ if (!msgSender.sendMessage(eventInfo, procResult)) {
+ System.out.println("Sync request index=" + curCount +
", process result=" + procResult);
+ continue;
+ }
+ curCount++;
+ }
+ } else {
+ // send single message
+ while (curCount++ < reqCnt) {
+ try {
+ if (curCount > 1) {
+ localAttrs.clear();
+ localAttrs.put("index", String.valueOf(curCount));
+ localAttrs.put("multi", String.valueOf(false));
+ }
+ eventInfo = new TcpEventInfo(groupId, streamId,
+ System.currentTimeMillis(), localAttrs,
itemBody);
+ } catch (Throwable ex) {
+ System.out.println("Build tcp event failure, ex=" +
ex);
+ continue;
+ }
+ if (!msgSender.sendMessage(eventInfo, procResult)) {
+ System.out.println("Sync request index=" + curCount +
", process result=" + procResult);
+ continue;
+ }
+ curCount++;
+ }
+ }
+ } else {
+ if (isMultiItem) {
+ // send multiple message
+ while (curCount++ < reqCnt) {
+ try {
+ if (curCount > 1) {
+ localAttrs.clear();
+ localAttrs.put("index", String.valueOf(curCount));
+ localAttrs.put("multi", String.valueOf(true));
+ }
+ eventInfo = new TcpEventInfo(groupId, streamId,
+ System.currentTimeMillis(), localAttrs,
multiBodys);
+ } catch (Throwable ex) {
+ System.out.println("Build multiple tcp event failure,
ex=" + ex);
+ continue;
+ }
+ if (!msgSender.asyncSendMessage(eventInfo, new
MyMsgSendBack(curCount), procResult)) {
+ System.out.println("Async request index=" + curCount +
", post result=" + procResult);
+ continue;
+ }
+ curCount++;
+ }
+ } else {
+ // send single message
+ while (curCount++ < reqCnt) {
+ try {
+ eventInfo = new TcpEventInfo(groupId, streamId,
+ System.currentTimeMillis(), null, itemBody);
+ } catch (Throwable ex) {
+ System.out.println("Build tcp event failure, ex=" +
ex);
+ continue;
+ }
+ if (!msgSender.asyncSendMessage(eventInfo, new
MyMsgSendBack(curCount), procResult)) {
+ System.out.println("Async request index=" + curCount +
", post result=" + procResult);
+ continue;
+ }
+ curCount++;
+ }
+ }
+ }
+ }
+
+ public static void sendHttpMessages(HttpMsgSender msgSender, boolean
isSync, boolean isMultiItem,
+ String groupId, String streamId, int reqCnt, int baseBodyLen, int
msgItemCnt, ProcessResult procResult) {
+ int sucCnt = 0;
+ int curCount = 0;
+ HttpEventInfo eventInfo;
+ String itemBody = getRandomString(baseBodyLen);
+ List<String> multiBodys = new ArrayList<>();
+ for (int i = 0; i < msgItemCnt; i++) {
+ multiBodys.add(itemBody);
+ }
+ if (isSync) {
+ if (isMultiItem) {
+ // send multiple message
+ while (curCount++ < reqCnt) {
+ try {
+ eventInfo = new HttpEventInfo(groupId, streamId,
+ System.currentTimeMillis(), multiBodys);
+ } catch (Throwable ex) {
+ System.out.println("Build multiple http event failure,
ex=" + ex);
+ continue;
+ }
+ if (!msgSender.syncSendMessage(eventInfo, procResult)) {
+ System.out.println("Sync request index=" + curCount +
", process result=" + procResult);
+ continue;
+ }
+ curCount++;
+ }
+ } else {
+ // send single message
+ while (curCount++ < reqCnt) {
+ try {
+ eventInfo = new HttpEventInfo(groupId, streamId,
+ System.currentTimeMillis(), itemBody);
+ } catch (Throwable ex) {
+ System.out.println("Build single http event failure,
ex=" + ex);
+ continue;
+ }
+ if (!msgSender.syncSendMessage(eventInfo, procResult)) {
+ System.out.println("Sync request index=" + curCount +
", process result=" + procResult);
+ continue;
+ }
+ curCount++;
+ }
+ }
+ } else {
+ if (isMultiItem) {
+ // send multiple message
+ while (curCount++ < reqCnt) {
+ try {
+ eventInfo = new HttpEventInfo(groupId, streamId,
+ System.currentTimeMillis(), multiBodys);
+ } catch (Throwable ex) {
+ System.out.println("Build multiple http event failure,
ex=" + ex);
+ continue;
+ }
+ if (!msgSender.asyncSendMessage(eventInfo, new
MyMsgSendBack(curCount), procResult)) {
+ System.out.println("Async request index=" + curCount +
", post result=" + procResult);
+ continue;
+ }
+ curCount++;
+ }
+ } else {
+ // send single message
+ while (curCount++ < reqCnt) {
+ try {
+ eventInfo = new HttpEventInfo(groupId, streamId,
System.currentTimeMillis(), itemBody);
+ } catch (Throwable ex) {
+ System.out.println("Build single http event failure,
ex=" + ex);
+ continue;
+ }
+ if (!msgSender.asyncSendMessage(eventInfo, new
MyMsgSendBack(curCount), procResult)) {
+ System.out.println("Async request: index=" + curCount
+ ", post result=" + procResult);
+ continue;
+ }
+ curCount++;
+ }
+ }
+ }
+ }
+
+ private static String getRandomString(int length) {
+ String strBase =
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < length; i++) {
+ int number = cntRandom.nextInt(strBase.length());
+ sb.append(strBase.charAt(number));
+ }
+ return sb.toString();
+ }
+
+ private static byte[] buildBoydData(int bodySize) {
+ final byte[] itemBaseData =
+ StringUtils.getBytesUtf8("inglong tcp test data!");
+ final ByteBuffer dataBuffer = ByteBuffer.allocate(bodySize);
+ while (dataBuffer.hasRemaining()) {
+ int offset = dataBuffer.arrayOffset();
+ dataBuffer.put(itemBaseData, offset,
+ Math.min(dataBuffer.remaining(), itemBaseData.length));
+ }
+ dataBuffer.flip();
+ return dataBuffer.array();
+ }
+
+ private static class MyMsgSendBack implements MsgSendCallback {
+
+ private final int msgId;
+
+ public MyMsgSendBack(int msgId) {
+ this.msgId = msgId;
+ }
+
+ @Override
+ public void onMessageAck(ProcessResult result) {
+ // System.out.println("msgId=" + msgId + ", send result = " +
result);
+ }
+
+ @Override
+ public void onException(Throwable ex) {
+ System.out.println("msgId=" + msgId + ", send exception=" + ex);
+
+ }
+ }
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongFactoryExample.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongFactoryExample.java
new file mode 100644
index 0000000000..d1c8b8e19c
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongFactoryExample.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.example;
+
+import org.apache.inlong.common.msg.MsgType;
+import org.apache.inlong.sdk.dataproxy.MsgSenderMultiFactory;
+import org.apache.inlong.sdk.dataproxy.MsgSenderSingleFactory;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ThreadFactory;
+
+public class InLongFactoryExample {
+
+ protected static final Logger logger =
LoggerFactory.getLogger(InLongFactoryExample.class);
+
+ public static void main(String[] args) throws Exception {
+
+ String managerIp = args[0];
+ int managerPort = Integer.parseInt(args[1]);
+ String groupId = args[2];
+ String streamId = args[3];
+ String secretId = args[4];
+ String secretKey = args[5];
+ int reqCnt = Integer.parseInt(args[6]);
+ int msgSize = 1024;
+ int msgCnt = 1;
+ if (args.length > 7) {
+ msgSize = Integer.parseInt(args[7]);
+ msgCnt = Integer.parseInt(args[8]);
+ }
+
+ System.out.println("InLongFactoryExample start");
+
+ // build singleton factory
+ MsgSenderSingleFactory singleFactory = new MsgSenderSingleFactory();
+ // report data by tcp
+ TcpMsgSenderConfig tcpMsgSenderConfig = new TcpMsgSenderConfig(
+ false, managerIp, managerPort, groupId, secretId, secretKey);
+ tcpMsgSenderConfig.setRequestTimeoutMs(20000L);
+ InLongTcpMsgSender tcpMsgSender =
+ singleFactory.genTcpSenderByClusterId(tcpMsgSenderConfig);
+ ProcessResult procResult = new ProcessResult();
+ if (!tcpMsgSender.start(procResult)) {
+ System.out.println("Start tcp sender failure: process result=" +
procResult);
+ }
+
+ // report data
+ ExampleUtils.sendTcpMessages(tcpMsgSender, false, false,
+ groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+ ExampleUtils.sendTcpMessages(tcpMsgSender, false, true,
+ groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+ ProxyUtils.sleepSomeTime(10000L);
+ tcpMsgSender.close();
+
+ // report data use multi-factory
+ MsgSenderMultiFactory multiFactory1 = new MsgSenderMultiFactory();
+ MsgSenderMultiFactory multiFactory2 = new MsgSenderMultiFactory();
+ // report data by tcp
+ tcpMsgSenderConfig.setSdkMsgType(MsgType.MSG_ACK_SERVICE);
+ InLongTcpMsgSender tcpMsgSender1 =
+ multiFactory1.genTcpSenderByGroupId(tcpMsgSenderConfig);
+ if (!tcpMsgSender1.start(procResult)) {
+ System.out.println("Start tcp sender1 failure: process result=" +
procResult);
+ }
+
+ String managerAddr = "http://" + managerIp + ":" + managerPort;
+ TcpMsgSenderConfig tcpMsgSenderConfig2 =
+ new TcpMsgSenderConfig(managerAddr, groupId, secretId,
secretKey);
+ tcpMsgSenderConfig2.setSdkMsgType(MsgType.MSG_MULTI_BODY);
+ InLongTcpMsgSender tcpMsgSender2 =
+ multiFactory2.genTcpSenderByGroupId(tcpMsgSenderConfig2);
+ ExampleUtils.sendTcpMessages(tcpMsgSender1, false, false,
+ groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+ ExampleUtils.sendTcpMessages(tcpMsgSender2, false, true,
+ groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+ ProxyUtils.sleepSomeTime(10000L);
+ tcpMsgSender1.close();
+ System.out.println("Multi-1.1 Cur multiFactory1 sender count = "
+ + multiFactory1.getMsgSenderCount()
+ + ", cur multiFactory2 sender count is " +
multiFactory2.getMsgSenderCount());
+ tcpMsgSender2.close();
+ System.out.println("Multi-1.2 Cur multiFactory1 sender count = "
+ + multiFactory1.getMsgSenderCount()
+ + ", cur multiFactory2 sender count is " +
multiFactory2.getMsgSenderCount());
+
+ // test self DefineFactory
+ ThreadFactory selfDefineFactory = new
DefaultThreadFactory("test_self_thread_factory");
+ InLongTcpMsgSender tcpMsgSelfSender =
+ singleFactory.genTcpSenderByGroupId(tcpMsgSenderConfig,
selfDefineFactory);
+ ExampleUtils.sendTcpMessages(tcpMsgSelfSender, false, false,
+ groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+ ProxyUtils.sleepSomeTime(10000L);
+
+ tcpMsgSelfSender.close();
+
+ System.out.println("singleFactory-3 Cur singleFactory sender count = "
+ + singleFactory.getMsgSenderCount());
+
+ ProxyUtils.sleepSomeTime(3 * 60 * 1000L);
+
+ // close all
+ multiFactory1.shutdownAll();
+ multiFactory2.shutdownAll();
+ }
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongTcpClientExample.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongTcpClientExample.java
new file mode 100644
index 0000000000..fce1404e0d
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongTcpClientExample.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.example;
+
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InLongTcpClientExample {
+
+ protected static final Logger logger =
LoggerFactory.getLogger(InLongTcpClientExample.class);
+
+ public static void main(String[] args) throws Exception {
+
+ String managerIp = args[0];
+ String managerPort = args[1];
+ String groupId = args[2];
+ String streamId = args[3];
+ String secretId = args[4];
+ String secretKey = args[5];
+ int reqCnt = Integer.parseInt(args[6]);
+ int msgSize = 1024;
+ int msgCnt = 1;
+ if (args.length > 7) {
+ msgSize = Integer.parseInt(args[7]);
+ msgCnt = Integer.parseInt(args[8]);
+ }
+
+ String managerAddr = "http://" + managerIp + ":" + managerPort;
+
+ TcpMsgSenderConfig dataProxyConfig =
+ new TcpMsgSenderConfig(managerAddr, groupId, secretId,
secretKey);
+ dataProxyConfig.setRequestTimeoutMs(20000L);
+ InLongTcpMsgSender messageSender = new
InLongTcpMsgSender(dataProxyConfig);
+
+ logger.info("InLongTcpMsgSender start");
+
+ ProcessResult procResult = new ProcessResult();
+ if (!messageSender.start(procResult)) {
+ System.out.println("Start sender failure: process result=" +
procResult.toString());
+ }
+ ExampleUtils.sendTcpMessages(messageSender, true, false,
+ groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+ ExampleUtils.sendTcpMessages(messageSender, true, true,
+ groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+ ExampleUtils.sendTcpMessages(messageSender, false, false,
+ groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+ ExampleUtils.sendTcpMessages(messageSender, false, true,
+ groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+ ProxyUtils.sleepSomeTime(10000L);
+ }
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
index a5f44f17ee..c103bd31f4 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
@@ -17,11 +17,13 @@
package org.apache.inlong.sdk.dataproxy.sender;
+import org.apache.inlong.sdk.dataproxy.MsgSenderFactory;
import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.config.ConfigHolder;
import org.apache.inlong.sdk.dataproxy.config.HostInfo;
+import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
@@ -58,6 +60,8 @@ public abstract class BaseSender implements ConfigHolder {
private static final AtomicLong senderIdGen = new AtomicLong(0L);
//
protected final AtomicInteger senderStatus = new
AtomicInteger(SENDER_STATUS_UNINITIALIZED);
+ protected final MsgSenderFactory senderFactory;
+ private final String factoryClusterIdKey;
protected final String senderId;
protected final ProxyClientConfig baseConfig;
protected ClientMgr clientMgr;
@@ -71,15 +75,20 @@ public abstract class BaseSender implements ConfigHolder {
protected volatile int groupIdNum = 0;
private Map<String, Integer> streamIdMap = new HashMap<>();
- protected BaseSender(ProxyClientConfig configure) {
+ protected BaseSender(ProxyClientConfig configure, MsgSenderFactory
senderFactory, String clusterIdKey) {
+ if (configure == null) {
+ throw new NullPointerException("configure is null");
+ }
this.baseConfig = configure.clone();
+ this.senderFactory = senderFactory;
+ this.factoryClusterIdKey = clusterIdKey;
this.senderId = configure.getDataRptProtocol() + "-" +
senderIdGen.incrementAndGet();
}
public boolean start(ProcessResult procResult) {
if (!this.senderStatus.compareAndSet(
SENDER_STATUS_UNINITIALIZED, SENDER_STATUS_INITIALIZING)) {
- return procResult.setFailResult(ErrorCode.OK);
+ return procResult.setSuccess();
}
// start client manager
if (!this.clientMgr.start(procResult)) {
@@ -103,7 +112,7 @@ public abstract class BaseSender implements ConfigHolder {
this.configManager.start();
this.senderStatus.set(SENDER_STATUS_STARTED);
logger.info("Sender({}) instance started!", senderId);
- return procResult.setFailResult(ErrorCode.OK);
+ return procResult.setSuccess();
}
public void close() {
@@ -116,6 +125,9 @@ public abstract class BaseSender implements ConfigHolder {
}
configManager.shutDown();
clientMgr.stop();
+ if (this.senderFactory != null) {
+ this.senderFactory.removeClient(this);
+ }
logger.info("Sender({}) instance stopped!", senderId);
}
@@ -165,10 +177,22 @@ public abstract class BaseSender implements ConfigHolder {
return senderStatus.get() == SENDER_STATUS_STARTED;
}
+ public MsgSenderFactory getSenderFactory() {
+ return senderFactory;
+ }
+
+ public String getFactoryClusterIdKey() {
+ return factoryClusterIdKey;
+ }
+
public String getMetaConfigKey() {
return this.baseConfig.getGroupMetaConfigKey();
}
+ public ProxyConfigEntry getProxyConfigEntry() {
+ return configManager.getProxyConfigEntry();
+ }
+
public String getSenderId() {
return senderId;
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java
index 97076dd977..aac6d97aee 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sdk.dataproxy.sender.tcp;
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.common.msg.MsgType;
+import org.apache.inlong.sdk.dataproxy.MsgSenderFactory;
import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
@@ -56,13 +57,54 @@ public class InLongTcpMsgSender extends BaseSender
implements TcpMsgSender {
private final TcpMsgSenderConfig tcpConfig;
private final TcpClientMgr tcpClientMgr;
- protected InLongTcpMsgSender(TcpMsgSenderConfig configure, ThreadFactory
selfDefineFactory) {
- super(configure);
+ public InLongTcpMsgSender(TcpMsgSenderConfig configure) {
+ this(configure, null, null, null);
+ }
+
+ public InLongTcpMsgSender(TcpMsgSenderConfig configure, ThreadFactory
selfDefineFactory) {
+ this(configure, selfDefineFactory, null, null);
+ }
+
+ public InLongTcpMsgSender(TcpMsgSenderConfig configure,
+ ThreadFactory selfDefineFactory, MsgSenderFactory senderFactory,
String clusterIdKey) {
+ super(configure, senderFactory, clusterIdKey);
this.tcpConfig = (TcpMsgSenderConfig) baseConfig;
this.clientMgr = new TcpClientMgr(this.getSenderId(), this.tcpConfig,
selfDefineFactory);
this.tcpClientMgr = (TcpClientMgr) clientMgr;
}
+ @Override
+ public boolean sendMessage(TcpEventInfo eventInfo, ProcessResult
procResult) {
+ if (eventInfo == null) {
+ throw new NullPointerException("eventInfo is null");
+ }
+ if (procResult == null) {
+ throw new NullPointerException("procResult is null");
+ }
+ if (!this.isStarted()) {
+ return procResult.setFailResult(ErrorCode.SDK_CLOSED);
+ }
+ return processEvent(SendQos.SOURCE_ACK, eventInfo, null, procResult);
+ }
+
+ @Override
+ public boolean asyncSendMessage(
+ TcpEventInfo eventInfo, MsgSendCallback callback, ProcessResult
procResult) {
+ if (eventInfo == null) {
+ throw new NullPointerException("eventInfo is null");
+ }
+ if (callback == null) {
+ throw new NullPointerException("callback is null");
+ }
+ if (procResult == null) {
+ throw new NullPointerException("procResult is null");
+ }
+ if (!this.isStarted()) {
+ return procResult.setFailResult(ErrorCode.SDK_CLOSED);
+ }
+ return processEvent(SendQos.SOURCE_ACK, eventInfo, callback,
procResult);
+ }
+
@Override
public boolean sendMessageWithoutAck(TcpEventInfo eventInfo, ProcessResult
procResult) {
if (eventInfo == null) {
@@ -78,8 +120,7 @@ public class InLongTcpMsgSender extends BaseSender
implements TcpMsgSender {
}
@Override
- public boolean syncSendMessage(boolean sendInB2B,
- TcpEventInfo eventInfo, ProcessResult procResult) {
+ public boolean sendMsgWithSinkAck(TcpEventInfo eventInfo, ProcessResult
procResult) {
if (eventInfo == null) {
throw new NullPointerException("eventInfo is null");
}
@@ -89,15 +130,11 @@ public class InLongTcpMsgSender extends BaseSender
implements TcpMsgSender {
if (!this.isStarted()) {
return procResult.setFailResult(ErrorCode.SDK_CLOSED);
}
- if (sendInB2B) {
- return processEvent(SendQos.SOURCE_ACK, eventInfo, null,
procResult);
- } else {
- return processEvent(SendQos.SINK_ACK, eventInfo, null, procResult);
- }
+ return processEvent(SendQos.SINK_ACK, eventInfo, null, procResult);
}
@Override
- public boolean asyncSendMessage(boolean sendInB2B,
+ public boolean asyncSendMsgWithSinkAck(
TcpEventInfo eventInfo, MsgSendCallback callback, ProcessResult
procResult) {
if (eventInfo == null) {
throw new NullPointerException("eventInfo is null");
@@ -111,11 +148,7 @@ public class InLongTcpMsgSender extends BaseSender
implements TcpMsgSender {
if (!this.isStarted()) {
return procResult.setFailResult(ErrorCode.SDK_CLOSED);
}
- if (sendInB2B) {
- return processEvent(SendQos.SOURCE_ACK, eventInfo, callback,
procResult);
- } else {
- return processEvent(SendQos.SINK_ACK, eventInfo, callback,
procResult);
- }
+ return processEvent(SendQos.SINK_ACK, eventInfo, callback, procResult);
}
@Override
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSender.java
index 97543d86a5..d36117614c 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSender.java
@@ -28,6 +28,47 @@ import
org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;
*/
public interface TcpMsgSender extends MessageSender {
+ /**
+ * Synchronously send message and wait for the final sending result
+ * DataProxy returns response as soon as it receives the request and
+ * forwards the message in B2B mode until it succeeds;
+ *
+ * <p>Attention:
+ * 1. if return false, the caller can choose to wait for a period of time
before trying again, or
+ * discard the event after multiple retries and failures.
+ * 2. this method tries to ensure that messages are delivered, but there
+ * may be duplicate messages or message loss scenarios. It is suitable
for scenarios with
+ * a very large number of reports, very low reporting time
requirements, and
+ * the need to return the sending results.
+ * </p>
+ *
+ * @param eventInfo the event information need to send
+ * @param procResult The send result, including the detail error infos if
failed
+ * @return true if successful, false if failed for some reason.
+ */
+ boolean sendMessage(TcpEventInfo eventInfo, ProcessResult procResult);
+
+ /**
+ * Asynchronously send message and return immediately
+ * DataProxy returns response as soon as it receives the request and
+ * forwards the message in B2B mode until it succeeds;
+ *
+ * <p>Attention:
+ * 1. if return false, the caller can choose to wait for a period of time
before trying again, or
+ * discard the event after multiple retries and failures.
+ * 2. this method, tries to ensure that messages are delivered, but there
+ * may be duplicate messages or message loss scenarios. It is suitable
for scenarios with
+ * a very large number of reports, very low reporting time
requirements, and
+ * the need to return the sending results.
+ * </p>
+ * @param eventInfo the event information need to send
+ * @param callback the callback that returns the response from DataProxy
or
+ * an exception that occurred while waiting for the
response.
+ * @param procResult The send result, including the detail error infos if
the event not accepted
+ * @return true if successful, false if the event not accepted for some
reason.
+ */
+ boolean asyncSendMessage(TcpEventInfo eventInfo, MsgSendCallback callback,
ProcessResult procResult);
+
/**
* Send message without response
*
@@ -47,55 +88,41 @@ public interface TcpMsgSender extends MessageSender {
/**
* Synchronously send message and wait for the final sending result
+ * DataProxy returns response after receiving the request and
forwarding it successfully,
+ * and DataProxy does not retry on failure
*
* <p>Attention:
* 1. if return false, the caller can choose to wait for a period of time
before trying again, or
* discard the event after multiple retries and failures.
- * 2. this method, with sendInB2B = true, tries to ensure that messages
are delivered, but there
- * may be duplicate messages or message loss scenarios. It is suitable
for scenarios with
- * a very large number of reports, very low reporting time
requirements, and
- * the need to return the sending results.
- * 3. this method, with sendInB2B = false, ensures that the message is
delivered only once and
+ * 2. this method, ensures that the message is delivered only once and
* will not be repeated. It is suitable for businesses with a small
amount of reports and
* no requirements on the reporting time, but require DataProxy to
forward messages with high reliability.
* </p>
*
- * @param sendInB2B indicates the DataProxy message service mode, true
indicates DataProxy returns
- * as soon as it receives the request and forwards the
message in B2B mode until it succeeds;
- * false indicates DataProxy returns after receiving the
request and forwarding it successfully,
- * and DataProxy does not retry on failure
* @param eventInfo the event information need to send
* @param procResult The send result, including the detail error infos if
failed
* @return true if successful, false if failed for some reason.
*/
- boolean syncSendMessage(boolean sendInB2B,
- TcpEventInfo eventInfo, ProcessResult procResult);
+ boolean sendMsgWithSinkAck(TcpEventInfo eventInfo, ProcessResult
procResult);
/**
- * Asynchronously send message
+ * Asynchronously send message and return immediately
+ * DataProxy returns response after receiving the request and
forwarding it successfully,
+ * and DataProxy does not retry on failure
*
* <p>Attention:
* 1. if return false, the caller can choose to wait for a period of time
before trying again, or
* discard the event after multiple retries and failures.
- * 2. this method, with sendInB2B = true, tries to ensure that messages
are delivered, but there
- * may be duplicate messages or message loss scenarios. It is suitable
for scenarios with
- * a very large number of reports, very low reporting time
requirements, and
- * the need to return the sending results.
- * 3. this method, with sendInB2B = false, ensures that the message is
delivered only once and
+ * 3. this method, ensures that the message is delivered only once and
* will not be repeated. It is suitable for businesses with a small
amount of reports and
* no requirements on the reporting time, but require DataProxy to
forward messages with high reliability.
* </p>
*
- * @param sendInB2B indicates the DataProxy message service mode, true
indicates DataProxy returns
- * as soon as it receives the request and forwards the
message in B2B mode until it succeeds;
- * false indicates DataProxy returns after receiving the
request and forwarding it successfully,
- * and DataProxy does not retry on failure
* @param eventInfo the event information need to send
* @param callback the callback that returns the response from DataProxy
or
* an exception that occurred while waiting for the
response.
* @param procResult The send result, including the detail error infos if
the event not accepted
* @return true if successful, false if the event not accepted for some
reason.
*/
- boolean asyncSendMessage(boolean sendInB2B,
- TcpEventInfo eventInfo, MsgSendCallback callback, ProcessResult
procResult);
+ boolean asyncSendMsgWithSinkAck(TcpEventInfo eventInfo, MsgSendCallback
callback, ProcessResult procResult);
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
index 2eace638a2..1e72c2bf9c 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
@@ -19,7 +19,9 @@ package org.apache.inlong.sdk.dataproxy.utils;
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.common.msg.MsgType;
+import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
+import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import org.apache.commons.lang3.StringUtils;
@@ -124,6 +126,20 @@ public class ProxyUtils {
}
}
+ public static void validProxyConfigNotNull(ProxyClientConfig configure)
throws ProxySdkException {
+ if (configure == null) {
+ throw new ProxySdkException("configure is null!");
+ }
+ }
+
+ public static String buildClusterIdKey(String protocol, String regionName,
Integer clusterId) {
+ return clusterId + ":" + regionName + ":" + protocol;
+ }
+
+ public static String buildGroupIdConfigKey(String protocol, String
regionName, String groupId) {
+ return protocol + ":" + regionName + ":" + groupId;
+ }
+
public static boolean isAttrKeysValid(Map<String, String> attrsMap) {
if (attrsMap == null || attrsMap.size() == 0) {
return false;