This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new d9f52e6 [optimize](sink) Optimize the BE load balancing logic during concurrent imports. (#388) d9f52e6 is described below commit d9f52e6774a2aafa1c1274d2048afdae86f0e39e Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Tue May 21 17:35:22 2024 +0800 [optimize](sink) Optimize the BE load balancing logic during concurrent imports. (#388) --- .../src/main/java/org/apache/doris/flink/sink/BackendUtil.java | 8 +++++++- .../main/java/org/apache/doris/flink/sink/writer/DorisWriter.java | 6 +++--- .../test/java/org/apache/doris/flink/sink/TestBackendUtil.java | 4 +--- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java index cb5b6f2..9296ae5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java @@ -76,9 +76,15 @@ public class BackendUtil { } public String getAvailableBackend() { + return getAvailableBackend(0); + } + + public String getAvailableBackend(int subtaskId) { long tmp = pos + backends.size(); while (pos < tmp) { - BackendV2.BackendRowV2 backend = backends.get((int) (pos++ % backends.size())); + BackendV2.BackendRowV2 backend = + backends.get((int) ((pos + subtaskId) % backends.size())); + pos++; String res = backend.toBackendString(); if (tryHttpConnection(res)) { return res; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java index 08edfc7..81bfe88 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java @@ -312,7 +312,7 @@ public class DorisWriter<IN> List<DorisWriterState> writerStates = new ArrayList<>(); for (DorisStreamLoad dorisStreamLoad : dorisStreamLoadMap.values()) { // Dynamic refresh backend - dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend()); + dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend(subtaskId)); DorisWriterState writerState = new DorisWriterState( labelPrefix, @@ -340,7 +340,7 @@ public class DorisWriter<IN> tableKey, v -> new DorisStreamLoad( - backendUtil.getAvailableBackend(), + backendUtil.getAvailableBackend(subtaskId), dorisOptions, executionOptions, labelGenerator, @@ -373,7 +373,7 @@ public class DorisWriter<IN> // use send cached data to new txn, then notify to restart the stream if (executionOptions.isUseCache()) { try { - dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend()); + dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend(subtaskId)); if (executionOptions.enabled2PC()) { dorisStreamLoad.abortPreCommit(labelPrefix, curCheckpointId); } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestBackendUtil.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestBackendUtil.java index 8a780ff..ece59c7 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestBackendUtil.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/TestBackendUtil.java @@ -22,7 +22,6 @@ import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -45,8 +44,7 @@ public class TestBackendUtil { @Test public void testTryHttpConnection() { - BackendUtil backendUtil = new BackendUtil(new ArrayList<>()); - boolean flag = backendUtil.tryHttpConnection("127.0.0.1:8040"); + boolean flag = BackendUtil.tryHttpConnection("127.0.0.1:8040"); Assert.assertFalse(flag); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org