This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 22b04f1bc8 Remove check of closed state in ISink
22b04f1bc8 is described below

commit 22b04f1bc8af8f678faa857d08a7f768d5db749b
Author: Liao Lanyu <[email protected]>
AuthorDate: Mon Apr 24 17:21:16 2023 +0800

    Remove check of closed state in ISink
---
 .../iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java     | 7 +++++++
 .../iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java    | 7 ++++---
 2 files changed, 11 insertions(+), 3 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java
index 0e434aae80..d24c669c5d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/LocalSinkChannel.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Optional;
 
+import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
 import static 
com.google.common.util.concurrent.Futures.nonCancellationPropagating;
 import static 
org.apache.iotdb.db.mpp.metric.DataExchangeCostMetricSet.SINK_HANDLE_SEND_TSBLOCK_LOCAL;
 
@@ -83,6 +84,9 @@ public class LocalSinkChannel implements ISinkChannel {
   @Override
   public synchronized ListenableFuture<?> isFull() {
     checkState();
+    if (closed) {
+      return immediateVoidFuture();
+    }
     return nonCancellationPropagating(blocked);
   }
 
@@ -115,6 +119,9 @@ public class LocalSinkChannel implements ISinkChannel {
       Validate.notNull(tsBlock, "tsBlocks is null");
       synchronized (this) {
         checkState();
+        if (closed) {
+          return;
+        }
         if (!blocked.isDone()) {
           throw new IllegalStateException("Sink handle is blocked.");
         }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
index b1e155b22f..11c69a2ec3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
@@ -114,9 +114,12 @@ public class ShuffleSinkHandle implements ISinkHandle {
   public synchronized void send(TsBlock tsBlock) {
     long startTime = System.nanoTime();
     try {
+      checkState();
+      if (closed) {
+        return;
+      }
       ISinkChannel currentChannel =
           downStreamChannelList.get(downStreamChannelIndex.getCurrentIndex());
-      checkState();
       currentChannel.send(tsBlock);
     } finally {
       switchChannelIfNecessary();
@@ -248,8 +251,6 @@ public class ShuffleSinkHandle implements ISinkHandle {
   private void checkState() {
     if (aborted) {
       throw new IllegalStateException("ShuffleSinkHandle is aborted.");
-    } else if (closed) {
-      throw new IllegalStateException("ShuffleSinkHandle is closed.");
     }
   }
 

Reply via email to