This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 888ac76  fix interrupt issue (#140)
888ac76 is described below

commit 888ac76712f942b3ac4b1b8b03357d779f742d3a
Author: GoGoWen <[email protected]>
AuthorDate: Wed May 10 15:02:02 2023 +0800

    fix interrupt issue (#140)
---
 .../doris/flink/sink/writer/DorisWriter.java       | 30 +++++++++++++---------
 1 file changed, 18 insertions(+), 12 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 8ba2050..711f765 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -182,19 +182,25 @@ public class DorisWriter<IN> implements SinkWriter<IN, 
DorisCommittable, DorisWr
                 LOG.debug("not loading, skip timer checker");
                 return;
             }
-            // TODO: introduce cache for reload instead of throwing exceptions.
-            String errorMsg;
-            try {
-                RespContent content = 
dorisStreamLoad.handlePreCommitResponse(dorisStreamLoad.getPendingLoadFuture().get());
-                errorMsg = content.getMessage();
-            } catch (Exception e) {
-                errorMsg = e.getMessage();
-            }
 
-            loadException = new StreamLoadException(errorMsg);
-            LOG.error("stream load finished unexpectedly, interrupt worker 
thread! {}", errorMsg);
-            // set the executor thread interrupted in case blocking in write 
data.
-            executorThread.interrupt();
+            // double check to interrupt when loading is true and 
dorisStreamLoad.getPendingLoadFuture().isDone
+            // fix issue #139
+            if (dorisStreamLoad.getPendingLoadFuture() != null
+                    && dorisStreamLoad.getPendingLoadFuture().isDone()) {
+                // TODO: introduce cache for reload instead of throwing 
exceptions.
+                String errorMsg;
+                try {
+                    RespContent content = 
dorisStreamLoad.handlePreCommitResponse(dorisStreamLoad.getPendingLoadFuture().get());
+                    errorMsg = content.getMessage();
+                } catch (Exception e) {
+                    errorMsg = e.getMessage();
+                }
+
+                loadException = new StreamLoadException(errorMsg);
+                LOG.error("stream load finished unexpectedly, interrupt worker 
thread! {}", errorMsg);
+                // set the executor thread interrupted in case blocking in 
write data.
+                executorThread.interrupt();
+            }
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to