This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 1fcc640e [Fix] When http reports an error, writing will get stuck
(#539)
1fcc640e is described below
commit 1fcc640ea589094baa7536abcef26fbb6edb4752
Author: wudi <[email protected]>
AuthorDate: Fri Jan 10 12:11:07 2025 +0800
[Fix] When http reports an error, writing will get stuck (#539)
---
.../java/org/apache/doris/flink/sink/HttpUtil.java | 1 +
.../doris/flink/sink/writer/DorisStreamLoad.java | 26 ++++++++++++++++++++--
.../doris/flink/sink/writer/LabelGenerator.java | 8 +++++++
3 files changed, 33 insertions(+), 2 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java
index 53d3ce13..d1600de6 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java
@@ -60,6 +60,7 @@ public class HttpUtil {
return true;
}
})
+ .setRetryHandler((exception, executionCount, context)
-> false)
.setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE)
.setDefaultRequestConfig(
RequestConfig.custom()
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index f900f741..c6e39326 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -89,6 +89,7 @@ public class DorisStreamLoad implements Serializable {
private final Properties streamLoadProp;
private final RecordStream recordStream;
private volatile Future<CloseableHttpResponse> pendingLoadFuture;
+ private volatile Exception httpException = null;
private final CloseableHttpClient httpClient;
private final ExecutorService executorService;
private boolean loadBatchFirstRecord;
@@ -115,6 +116,10 @@ public class DorisStreamLoad implements Serializable {
this.streamLoadProp = executionOptions.getStreamLoadProp();
this.enableDelete = executionOptions.getDeletable();
this.httpClient = httpClient;
+ String threadName =
+ String.format(
+ "stream-load-upload-%s-%s",
+ labelGenerator.getSubtaskId(),
labelGenerator.getTableIdentifier());
this.executorService =
new ThreadPoolExecutor(
1,
@@ -122,7 +127,7 @@ public class DorisStreamLoad implements Serializable {
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
- new ExecutorThreadFactory("stream-load-upload"));
+ new ExecutorThreadFactory(threadName));
this.recordStream =
new RecordStream(
executionOptions.getBufferSize(),
@@ -250,6 +255,7 @@ public class DorisStreamLoad implements Serializable {
* @throws IOException
*/
public void writeRecord(byte[] record) throws IOException {
+ checkLoadException();
if (loadBatchFirstRecord) {
loadBatchFirstRecord = false;
} else if (lineDelimiter != null) {
@@ -258,6 +264,12 @@ public class DorisStreamLoad implements Serializable {
recordStream.write(record);
}
+ private void checkLoadException() {
+ if (httpException != null) {
+ throw new RuntimeException("Stream load http request error, ",
httpException);
+ }
+ }
+
@VisibleForTesting
public RecordStream getRecordStream() {
return recordStream;
@@ -347,11 +359,21 @@ public class DorisStreamLoad implements Serializable {
} else {
executeMessage = "table " + table + " start execute load for
label " + label;
}
+ Thread mainThread = Thread.currentThread();
pendingLoadFuture =
executorService.submit(
() -> {
LOG.info(executeMessage);
- return httpClient.execute(putBuilder.build());
+ try {
+ return
httpClient.execute(putBuilder.build());
+ } catch (Exception e) {
+ LOG.error("Failed to execute load, cause
", e);
+ httpException = e;
+ // When an HTTP error occurs, the main
thread should be
+ // interrupted to prevent blocking
+ mainThread.interrupt();
+ throw e;
+ }
});
} catch (Exception e) {
String err;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
index d80315f5..84c14b79 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
@@ -91,4 +91,12 @@ public class LabelGenerator {
String concatPrefix = String.format("%s_%s_%s", labelPrefix,
tableIdentifier, subtaskId);
return concatPrefix;
}
+
+ public int getSubtaskId() {
+ return subtaskId;
+ }
+
+ public String getTableIdentifier() {
+ return tableIdentifier;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]