This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new 1aab3aa1e4 [To rel/1.1] Fix potential NPE in SinkChannel
1aab3aa1e4 is described below
commit 1aab3aa1e48a4a1876a0bdcb87d2bf957638506f
Author: Jackie Tien <[email protected]>
AuthorDate: Wed Apr 12 08:39:05 2023 +0800
[To rel/1.1] Fix potential NPE in SinkChannel
---
.../mpp/execution/exchange/sink/ShuffleSinkHandle.java | 4 ++--
.../db/mpp/execution/exchange/sink/SinkChannel.java | 18 +++++++++++++-----
2 files changed, 15 insertions(+), 7 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
index 6f9b617e2e..a4d3f7c198 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
@@ -166,7 +166,7 @@ public class ShuffleSinkHandle implements ISinkHandle {
@Override
public synchronized void abort() {
- if (aborted) {
+ if (aborted || closed) {
return;
}
LOGGER.debug("[StartAbortShuffleSinkHandle]");
@@ -192,7 +192,7 @@ public class ShuffleSinkHandle implements ISinkHandle {
@Override
public synchronized void close() {
- if (closed) {
+ if (closed || aborted) {
return;
}
LOGGER.debug("[StartCloseShuffleSinkHandle]");
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
index 5cd28de462..356bfff296 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
@@ -207,11 +207,13 @@ public class SinkChannel implements ISinkChannel {
@Override
public synchronized void abort() {
LOGGER.debug("[StartAbortSinkChannel]");
- if (aborted) {
+ if (aborted || closed) {
return;
}
sequenceIdToTsBlock.clear();
- bufferRetainedSizeInBytes -=
localMemoryManager.getQueryPool().tryCancel(blocked);
+ if (blocked != null) {
+ bufferRetainedSizeInBytes -=
localMemoryManager.getQueryPool().tryCancel(blocked);
+ }
if (bufferRetainedSizeInBytes > 0) {
localMemoryManager
.getQueryPool()
@@ -234,11 +236,13 @@ public class SinkChannel implements ISinkChannel {
@Override
public synchronized void close() {
LOGGER.debug("[StartCloseSinkChannel]");
- if (closed) {
+ if (closed || aborted) {
return;
}
sequenceIdToTsBlock.clear();
- bufferRetainedSizeInBytes -=
localMemoryManager.getQueryPool().tryComplete(blocked);
+ if (blocked != null) {
+ bufferRetainedSizeInBytes -=
localMemoryManager.getQueryPool().tryComplete(blocked);
+ }
if (bufferRetainedSizeInBytes > 0) {
localMemoryManager
.getQueryPool()
@@ -367,7 +371,11 @@ public class SinkChannel implements ISinkChannel {
// region ============ ISinkChannel related ============
- public void open() {
+ @Override
+ public synchronized void open() {
+ if (aborted || closed) {
+ return;
+ }
// SinkChannel is opened when ShuffleSinkHandle choose it as the next
channel
this.blocked =
localMemoryManager