This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3db9adcc0a [INLONG-11695][SDK] MessageSender related interfaces 
abstraction (#11696)
3db9adcc0a is described below

commit 3db9adcc0a135247c055badc2bb0f831b6bdeae7
Author: Goson Zhang <[email protected]>
AuthorDate: Tue Jan 21 12:12:01 2025 +0800

    [INLONG-11695][SDK] MessageSender related interfaces abstraction (#11696)
    
    Co-authored-by: gosonzhang <[email protected]>
---
 .../inlong/sdk/dataproxy/network/ClientMgr.java    |  67 ++++++++++++++
 .../inlong/sdk/dataproxy/sender/MessageSender.java |  47 ++++++++++
 .../sdk/dataproxy/sender/MsgSendCallback.java      |  42 +++++++++
 .../sdk/dataproxy/sender/http/HttpMsgSender.java   |  56 ++++++++++++
 .../sdk/dataproxy/sender/tcp/TcpMsgSender.java     | 101 +++++++++++++++++++++
 5 files changed, 313 insertions(+)

diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
new file mode 100644
index 0000000000..bb678456b1
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.network;
+
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.config.HostInfo;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Client Manager interface
+ *
+ * Used to Manager network client
+ */
+public interface ClientMgr {
+
+    /**
+     * Start network client manager
+     *
+     * @param procResult the start result, return detail error infos if 
sending fails
+     * @return  true if successful, false return indicates failure.
+     */
+    boolean start(ProcessResult procResult);
+
+    /**
+     * Stop network client manager
+     *
+     */
+    void stop();
+
+    /**
+     * Get the number of proxy nodes currently in use
+     *
+     * @return  Number of nodes in use
+     */
+    int getActiveNodeCnt();
+
+    /**
+     * Get the number of in-flight messages
+     *
+     * @return  Number of in-flight messages
+     */
+    int getInflightMsgCnt();
+
+    /**
+     * Update cached proxy nodes
+     *
+     * @param nodeChanged whether the updated node has changed
+     * @param hostInfoMap  the new proxy nodes
+     */
+    void updateProxyInfoList(boolean nodeChanged, ConcurrentHashMap<String, 
HostInfo> hostInfoMap);
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/MessageSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/MessageSender.java
new file mode 100644
index 0000000000..26374c7857
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/MessageSender.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.sender;
+
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+
+/**
+ * Message Sender interface
+ *
+ * Used to define the sender common methods
+ */
+public interface MessageSender {
+
+    /**
+     * Start sender when the sender is built
+     *
+     * <p>Attention:
+     *    if return false, the caller need to handle it based on the error 
code and
+     *    error information returned by procResult, such as:
+     *    prompting the user, retrying after some time, etc.
+     * </p>
+     *
+     * @param procResult the start result, return detail error infos if 
sending fails
+     * @return  true if successful, false return indicates that the sender 
fails to start.
+     */
+    boolean start(ProcessResult procResult);
+
+    /**
+     * Close the sender when need to stop the sender's sending service.
+     */
+    void close();
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/MsgSendCallback.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/MsgSendCallback.java
new file mode 100644
index 0000000000..f0c8638f08
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/MsgSendCallback.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.sender;
+
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+
+/**
+ * Message Send Callback interface
+ *
+ * Used to define the send callback methods
+ */
+public interface MsgSendCallback {
+
+    /**
+     * Invoked when a message is confirmed by DataProxy
+     *
+     * @param result The event process result, include detail error infos if 
sending fails
+     */
+    void onMessageAck(ProcessResult result);
+
+    /**
+     * Invoked when a message transportation interrupted by an exception
+     *
+     * @param ex The exception info
+     */
+    void onException(Throwable ex);
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpMsgSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpMsgSender.java
new file mode 100644
index 0000000000..d7cd77d8cf
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpMsgSender.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.sender.http;
+
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.sender.MessageSender;
+import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;
+
+/**
+ * HTTP Message Sender interface
+ *
+ * Used to define the HTTP message sender methods
+ */
+public interface HttpMsgSender extends MessageSender {
+
+    /**
+     * Synchronously send message and wait for the final sending result
+     *
+     * <p>Attention: if return false, the caller can choose to wait for a 
period of time before trying again, or
+     *    discard the event after multiple retries and failures.</p>
+     *
+     * @param eventInfo the event information need to send
+     * @param procResult The send result, including the detail error infos if 
failed
+     * @return  true if successful, false if failed for some reason.
+     */
+    boolean syncSendMessage(HttpEventInfo eventInfo, ProcessResult procResult);
+
+    /**
+     * Asynchronously send message
+     *
+     * <p>Attention: if return false, the caller can choose to wait for a 
period of time before trying again, or
+     *    discard the event after multiple retries and failures.</p>
+     *
+     * @param eventInfo the event information need to send
+     * @param callback  the callback that returns the response from DataProxy 
or
+     *                 an exception that occurred while waiting for the 
response.
+     * @param procResult The send result, including the detail error infos if 
the event not accepted
+     * @return  true if successful, false if the event not accepted for some 
reason.
+     */
+    boolean asyncSendMessage(HttpEventInfo eventInfo, MsgSendCallback 
callback, ProcessResult procResult);
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSender.java
new file mode 100644
index 0000000000..97543d86a5
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSender.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.sender.tcp;
+
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.sender.MessageSender;
+import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;
+
+/**
+ * TCP Message Sender interface
+ *
+ * Used to define the TCP message sender methods
+ */
+public interface TcpMsgSender extends MessageSender {
+
+    /**
+     * Send message without response
+     *
+     * <p>Attention:
+     * 1. if return false, the caller can choose to wait for a period of time 
before trying again, or
+     *    discard the event after multiple retries and failures.
+     * 2. this method may lose messages. It is suitable for situations where 
the reporting volume is very large,
+     *    the business does not pay attention to the final reporting results, 
and
+     *    the message loss is tolerated in the event of an exception.
+     * </p>
+     *
+     * @param eventInfo the event information need to send
+     * @param procResult The send result, include the detail error infos if 
the eventInfo is not accepted
+     * @return  true if successful, false return indicates that the eventInfo 
was not accepted for some reason.
+     */
+    boolean sendMessageWithoutAck(TcpEventInfo eventInfo, ProcessResult 
procResult);
+
+    /**
+     * Synchronously send message and wait for the final sending result
+     *
+     * <p>Attention:
+     * 1. if return false, the caller can choose to wait for a period of time 
before trying again, or
+     *    discard the event after multiple retries and failures.
+     * 2. this method, with sendInB2B = true, tries to ensure that messages 
are delivered, but there
+     *    may be duplicate messages or message loss scenarios. It is suitable 
for scenarios with
+     *    a very large number of reports, very low reporting time 
requirements, and
+     *    the need to return the sending results.
+     * 3. this method, with sendInB2B = false, ensures that the message is 
delivered only once and
+     *    will not be repeated. It is suitable for businesses with a small 
amount of reports and
+     *    no requirements on the reporting time, but require DataProxy to 
forward messages with high reliability.
+     * </p>
+     *
+     * @param sendInB2B indicates the DataProxy message service mode, true 
indicates DataProxy returns
+     *                 as soon as it receives the request and forwards the 
message in B2B mode until it succeeds;
+     *                 false indicates DataProxy returns after receiving the 
request and forwarding it successfully,
+     *                 and DataProxy does not retry on failure
+     * @param eventInfo the event information need to send
+     * @param procResult The send result, including the detail error infos if 
failed
+     * @return  true if successful, false if failed for some reason.
+     */
+    boolean syncSendMessage(boolean sendInB2B,
+            TcpEventInfo eventInfo, ProcessResult procResult);
+
+    /**
+     * Asynchronously send message
+     *
+     * <p>Attention:
+     * 1. if return false, the caller can choose to wait for a period of time 
before trying again, or
+     *    discard the event after multiple retries and failures.
+     * 2. this method, with sendInB2B = true, tries to ensure that messages 
are delivered, but there
+     *    may be duplicate messages or message loss scenarios. It is suitable 
for scenarios with
+     *    a very large number of reports, very low reporting time 
requirements, and
+     *    the need to return the sending results.
+     * 3. this method, with sendInB2B = false, ensures that the message is 
delivered only once and
+     *    will not be repeated. It is suitable for businesses with a small 
amount of reports and
+     *    no requirements on the reporting time, but require DataProxy to 
forward messages with high reliability.
+     * </p>
+     *
+     * @param sendInB2B indicates the DataProxy message service mode, true 
indicates DataProxy returns
+     *                 as soon as it receives the request and forwards the 
message in B2B mode until it succeeds;
+     *                 false indicates DataProxy returns after receiving the 
request and forwarding it successfully,
+     *                 and DataProxy does not retry on failure
+     * @param eventInfo the event information need to send
+     * @param callback  the callback that returns the response from DataProxy 
or
+     *                 an exception that occurred while waiting for the 
response.
+     * @param procResult The send result, including the detail error infos if 
the event not accepted
+     * @return  true if successful, false if the event not accepted for some 
reason.
+     */
+    boolean asyncSendMessage(boolean sendInB2B,
+            TcpEventInfo eventInfo, MsgSendCallback callback, ProcessResult 
procResult);
+}

Reply via email to