This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch branch-for-flink-before-1.13
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/branch-for-flink-before-1.13
by this push:
new 31e10ed6 [Pick] Pick load banlance on each backend (#565)
31e10ed6 is described below
commit 31e10ed66adbc4e1395806ed2df368645501e3ec
Author: wudi <[email protected]>
AuthorDate: Mon Mar 24 18:06:53 2025 +0800
[Pick] Pick load banlance on each backend (#565)
---
.../doris/flink/table/DorisDynamicOutputFormat.java | 16 ++++++++++++----
.../org/apache/doris/flink/table/DorisStreamLoad.java | 3 ++-
2 files changed, 14 insertions(+), 5 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
index cb0a3d21..03a26dbf 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
@@ -91,6 +91,7 @@ public class DorisDynamicOutputFormat<T> extends
RichOutputFormat<T> {
private String keysType;
private List<BackendV2.BackendRowV2> backends;
private long pos = 0L;
+ private int subtaskId = 0;
private transient volatile boolean closed = false;
private transient ScheduledExecutorService scheduler;
private transient ScheduledFuture<?> scheduledFuture;
@@ -191,16 +192,17 @@ public class DorisDynamicOutputFormat<T> extends
RichOutputFormat<T> {
@Override
public void open(int taskNumber, int numTasks) throws IOException {
+ this.subtaskId = taskNumber;
this.backends = settingBackends();
+ String backend = getAvailableBackend();
dorisStreamLoad = new DorisStreamLoad(
- backends.get(0).toBackendString(),
+ backend,
options.getTableIdentifier().split("\\.")[0],
options.getTableIdentifier().split("\\.")[1],
options.getUsername(),
options.getPassword(),
executionOptions.getStreamLoadProp(),
readOptions);
- LOG.info("Streamload BE:{}", dorisStreamLoad.getLoadUrlStr());
if (executionOptions.getBatchIntervalMs() != 0 &&
executionOptions.getBatchSize() != 1) {
this.scheduler = Executors.newScheduledThreadPool(1, new
ExecutorThreadFactory("doris-streamload-output" +
@@ -321,6 +323,10 @@ public class DorisDynamicOutputFormat<T> extends
RichOutputFormat<T> {
} else {
result = String.join(this.lineDelimiter, batch);
}
+
+ // refresh backend
+ dorisStreamLoad.setHostPort(getAvailableBackend());
+
for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
try {
dorisStreamLoad.load(result);
@@ -334,7 +340,7 @@ public class DorisDynamicOutputFormat<T> extends
RichOutputFormat<T> {
}
try {
dorisStreamLoad.setHostPort(getAvailableBackend());
- LOG.warn("streamload error,switch be: {}",
dorisStreamLoad.getLoadUrlStr(), e);
+ LOG.warn("stream load error,switch be: {}",
dorisStreamLoad.getLoadUrlStr(), e);
Thread.sleep(1000L * ( i + 1 ));
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
@@ -342,8 +348,10 @@ public class DorisDynamicOutputFormat<T> extends
RichOutputFormat<T> {
}
}
}
+
}
+ @Deprecated
private String getBackend() throws IOException {
try {
//get be url from fe
@@ -371,7 +379,7 @@ public class DorisDynamicOutputFormat<T> extends
RichOutputFormat<T> {
long tmp = pos + backends.size();
while (pos < tmp) {
BackendV2.BackendRowV2 backend =
- backends.get((int) (pos % backends.size()));
+ backends.get((int) ((pos + subtaskId) % backends.size()));
pos++;
return backend.toBackendString();
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
index 1ef8e20b..72cf8ffc 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
@@ -108,7 +108,7 @@ public class DorisStreamLoad implements Serializable {
public void load(String value) throws StreamLoadException {
LoadResponse loadResponse = loadBatch(value);
- LOG.info("Streamload Response:{}", loadResponse);
+ LOG.info("stream load response:{}", loadResponse);
if (loadResponse.status != 200) {
throw new StreamLoadException("stream load error: " +
loadResponse.respContent);
} else {
@@ -133,6 +133,7 @@ public class DorisStreamLoad implements Serializable {
UUID.randomUUID().toString().replaceAll("-", ""));
}
+ LOG.info("stream load started for {} on host {}", label, hostPort);
try {
HttpPut put = new HttpPut(loadUrlStr);
put.setHeader(HttpHeaders.EXPECT, "100-continue");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]