This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7c70b21893 Fix potential NPE in SinkChannel
7c70b21893 is described below
commit 7c70b218938e12a4e71be0f73aef3ac0bc6204e2
Author: Jackie Tien <[email protected]>
AuthorDate: Wed Apr 12 08:37:30 2023 +0800
Fix potential NPE in SinkChannel
---
.../mpp/execution/exchange/sink/ShuffleSinkHandle.java | 4 ++--
.../db/mpp/execution/exchange/sink/SinkChannel.java | 18 +++++++++++++-----
2 files changed, 15 insertions(+), 7 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
index 6f9b617e2e..a4d3f7c198 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.java
@@ -166,7 +166,7 @@ public class ShuffleSinkHandle implements ISinkHandle {
@Override
public synchronized void abort() {
- if (aborted) {
+ if (aborted || closed) {
return;
}
LOGGER.debug("[StartAbortShuffleSinkHandle]");
@@ -192,7 +192,7 @@ public class ShuffleSinkHandle implements ISinkHandle {
@Override
public synchronized void close() {
- if (closed) {
+ if (closed || aborted) {
return;
}
LOGGER.debug("[StartCloseShuffleSinkHandle]");
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
index b32028bee1..1b027ba2ab 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/sink/SinkChannel.java
@@ -211,11 +211,13 @@ public class SinkChannel implements ISinkChannel {
@Override
public synchronized void abort() {
LOGGER.debug("[StartAbortSinkChannel]");
- if (aborted) {
+ if (aborted || closed) {
return;
}
sequenceIdToTsBlock.clear();
- bufferRetainedSizeInBytes -=
localMemoryManager.getQueryPool().tryCancel(blocked);
+ if (blocked != null) {
+ bufferRetainedSizeInBytes -=
localMemoryManager.getQueryPool().tryCancel(blocked);
+ }
if (bufferRetainedSizeInBytes > 0) {
localMemoryManager
.getQueryPool()
@@ -234,11 +236,13 @@ public class SinkChannel implements ISinkChannel {
@Override
public synchronized void close() {
LOGGER.debug("[StartCloseSinkChannel]");
- if (closed) {
+ if (closed || aborted) {
return;
}
sequenceIdToTsBlock.clear();
- bufferRetainedSizeInBytes -=
localMemoryManager.getQueryPool().tryComplete(blocked);
+ if (blocked != null) {
+ bufferRetainedSizeInBytes -=
localMemoryManager.getQueryPool().tryComplete(blocked);
+ }
if (bufferRetainedSizeInBytes > 0) {
localMemoryManager
.getQueryPool()
@@ -363,7 +367,11 @@ public class SinkChannel implements ISinkChannel {
// region ============ ISinkChannel related ============
- public void open() {
+ @Override
+ public synchronized void open() {
+ if (aborted || closed) {
+ return;
+ }
// SinkChannel is opened when ShuffleSinkHandle choose it as the next
channel
this.blocked =
localMemoryManager