This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 57003ef8 [Fix](batch) Fixed an issue where the writer might be blocked
in batch mode (#506)
57003ef8 is described below
commit 57003ef8d242a613a40ac3f064ba83505e78248b
Author: wudi <[email protected]>
AuthorDate: Wed Nov 6 16:45:58 2024 +0800
[Fix](batch) Fixed an issue where the writer might be blocked in batch mode
(#506)
---
.../flink/sink/batch/DorisBatchStreamLoad.java | 22 +++++++++++++++++-----
1 file changed, 17 insertions(+), 5 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index 42b83207..3cfda604 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -198,6 +198,7 @@ public class DorisBatchStreamLoad implements Serializable {
lock.lock();
try {
while (currentCacheBytes.get() >= maxBlockedBytes) {
+ checkFlushException();
LOG.info(
"Cache full, waiting for flush, currentBytes: {},
maxBlockedBytes: {}",
currentCacheBytes.get(),
@@ -486,11 +487,22 @@ public class DorisBatchStreamLoad implements Serializable
{
putBuilder.setLabel(label + "_" + retry);
reason = respContent.getMessage();
} else {
- String errMsg =
- String.format(
- "stream load error: %s, see
more in %s",
- respContent.getMessage(),
- respContent.getErrorURL());
+ String errMsg = null;
+ if
(StringUtils.isBlank(respContent.getMessage())
+ &&
StringUtils.isBlank(respContent.getErrorURL())) {
+ // sometimes stream load will not return
message
+ errMsg =
+ String.format(
+ "stream load error,
response is %s",
+ loadResult);
+ throw new DorisBatchLoadException(errMsg);
+ } else {
+ errMsg =
+ String.format(
+ "stream load error: %s,
see more in %s",
+ respContent.getMessage(),
+ respContent.getErrorURL());
+ }
throw new DorisBatchLoadException(errMsg);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]