This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 888ac76 fix interrupt issue (#140)
888ac76 is described below
commit 888ac76712f942b3ac4b1b8b03357d779f742d3a
Author: GoGoWen <[email protected]>
AuthorDate: Wed May 10 15:02:02 2023 +0800
fix interrupt issue (#140)
---
.../doris/flink/sink/writer/DorisWriter.java | 30 +++++++++++++---------
1 file changed, 18 insertions(+), 12 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 8ba2050..711f765 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -182,19 +182,25 @@ public class DorisWriter<IN> implements SinkWriter<IN,
DorisCommittable, DorisWr
LOG.debug("not loading, skip timer checker");
return;
}
- // TODO: introduce cache for reload instead of throwing exceptions.
- String errorMsg;
- try {
- RespContent content =
dorisStreamLoad.handlePreCommitResponse(dorisStreamLoad.getPendingLoadFuture().get());
- errorMsg = content.getMessage();
- } catch (Exception e) {
- errorMsg = e.getMessage();
- }
- loadException = new StreamLoadException(errorMsg);
- LOG.error("stream load finished unexpectedly, interrupt worker
thread! {}", errorMsg);
- // set the executor thread interrupted in case blocking in write
data.
- executorThread.interrupt();
+ // double check to interrupt when loading is true and
dorisStreamLoad.getPendingLoadFuture().isDone
+ // fix issue #139
+ if (dorisStreamLoad.getPendingLoadFuture() != null
+ && dorisStreamLoad.getPendingLoadFuture().isDone()) {
+ // TODO: introduce cache for reload instead of throwing
exceptions.
+ String errorMsg;
+ try {
+ RespContent content =
dorisStreamLoad.handlePreCommitResponse(dorisStreamLoad.getPendingLoadFuture().get());
+ errorMsg = content.getMessage();
+ } catch (Exception e) {
+ errorMsg = e.getMessage();
+ }
+
+ loadException = new StreamLoadException(errorMsg);
+ LOG.error("stream load finished unexpectedly, interrupt worker
thread! {}", errorMsg);
+ // set the executor thread interrupted in case blocking in
write data.
+ executorThread.interrupt();
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]