This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new d9f52e6  [optimize](sink) Optimize the BE load balancing logic during 
concurrent imports. (#388)
d9f52e6 is described below

commit d9f52e6774a2aafa1c1274d2048afdae86f0e39e
Author: Petrichor <31833513+vinle...@users.noreply.github.com>
AuthorDate: Tue May 21 17:35:22 2024 +0800

    [optimize](sink) Optimize the BE load balancing logic during concurrent 
imports. (#388)
---
 .../src/main/java/org/apache/doris/flink/sink/BackendUtil.java    | 8 +++++++-
 .../main/java/org/apache/doris/flink/sink/writer/DorisWriter.java | 6 +++---
 .../test/java/org/apache/doris/flink/sink/TestBackendUtil.java    | 4 +---
 3 files changed, 11 insertions(+), 7 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
index cb5b6f2..9296ae5 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
@@ -76,9 +76,15 @@ public class BackendUtil {
     }
 
     public String getAvailableBackend() {
+        return getAvailableBackend(0);
+    }
+
+    public String getAvailableBackend(int subtaskId) {
         long tmp = pos + backends.size();
         while (pos < tmp) {
-            BackendV2.BackendRowV2 backend = backends.get((int) (pos++ % 
backends.size()));
+            BackendV2.BackendRowV2 backend =
+                    backends.get((int) ((pos + subtaskId) % backends.size()));
+            pos++;
             String res = backend.toBackendString();
             if (tryHttpConnection(res)) {
                 return res;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 08edfc7..81bfe88 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -312,7 +312,7 @@ public class DorisWriter<IN>
         List<DorisWriterState> writerStates = new ArrayList<>();
         for (DorisStreamLoad dorisStreamLoad : dorisStreamLoadMap.values()) {
             // Dynamic refresh backend
-            dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend());
+            
dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend(subtaskId));
             DorisWriterState writerState =
                     new DorisWriterState(
                             labelPrefix,
@@ -340,7 +340,7 @@ public class DorisWriter<IN>
                 tableKey,
                 v ->
                         new DorisStreamLoad(
-                                backendUtil.getAvailableBackend(),
+                                backendUtil.getAvailableBackend(subtaskId),
                                 dorisOptions,
                                 executionOptions,
                                 labelGenerator,
@@ -373,7 +373,7 @@ public class DorisWriter<IN>
                 // use send cached data to new txn, then notify to restart the 
stream
                 if (executionOptions.isUseCache()) {
                     try {
-                        
dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend());
+                        
dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend(subtaskId));
                         if (executionOptions.enabled2PC()) {
                             dorisStreamLoad.abortPreCommit(labelPrefix, 
curCheckpointId);
                         }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestBackendUtil.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestBackendUtil.java
index 8a780ff..ece59c7 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestBackendUtil.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestBackendUtil.java
@@ -22,7 +22,6 @@ import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
@@ -45,8 +44,7 @@ public class TestBackendUtil {
 
     @Test
     public void testTryHttpConnection() {
-        BackendUtil backendUtil = new BackendUtil(new ArrayList<>());
-        boolean flag = backendUtil.tryHttpConnection("127.0.0.1:8040");
+        boolean flag = BackendUtil.tryHttpConnection("127.0.0.1:8040");
         Assert.assertFalse(flag);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to