This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new cf94f1e0114 Pipe: Avoid unnecessary close-client in async client
(Follow up fix for #16008)
cf94f1e0114 is described below
commit cf94f1e0114fe84243b20520b65e2408a55665be
Author: Caideyipi <[email protected]>
AuthorDate: Sat Aug 9 14:05:10 2025 +0800
Pipe: Avoid unnecessary close-client in async client (Follow up fix for
#16008)
---
.../thrift/async/IoTDBDataRegionAsyncSink.java | 8 ++++++--
.../async/handler/PipeTransferTrackableHandler.java | 18 +++++++++++-------
.../async/handler/PipeTransferTsFileHandler.java | 2 +-
3 files changed, 18 insertions(+), 10 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index 5d6d1beba0b..ff35329ffb9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -756,7 +756,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
.forEach(
handler -> {
handler.clearEventsReferenceCount();
- eliminateHandler(handler);
+ eliminateHandler(handler, true);
});
}
@@ -813,7 +813,11 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
pendingHandlers.put(handler, handler);
}
- public void eliminateHandler(final PipeTransferTrackableHandler handler) {
+ public void eliminateHandler(
+ final PipeTransferTrackableHandler handler, final boolean closeClient) {
+ if (closeClient) {
+ handler.closeClient();
+ }
handler.close();
pendingHandlers.remove(handler);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
index 7f0617a2785..6ea07b0c278 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
@@ -46,7 +46,7 @@ public abstract class PipeTransferTrackableHandler
public void onComplete(final TPipeTransferResp response) {
if (connector.isClosed()) {
clearEventsReferenceCount();
- connector.eliminateHandler(this);
+ connector.eliminateHandler(this, true);
return;
}
@@ -55,7 +55,7 @@ public abstract class PipeTransferTrackableHandler
// completed
// NOTE: We should not clear the reference count of events, as this
would cause the
//
`org.apache.iotdb.pipe.it.dual.tablemodel.manual.basic.IoTDBPipeDataSinkIT#testSinkTsFileFormat3`
test to fail.
- connector.eliminateHandler(this);
+ connector.eliminateHandler(this, false);
}
}
@@ -63,12 +63,12 @@ public abstract class PipeTransferTrackableHandler
public void onError(final Exception exception) {
if (connector.isClosed()) {
clearEventsReferenceCount();
- connector.eliminateHandler(this);
+ connector.eliminateHandler(this, true);
return;
}
onErrorInternal(exception);
- connector.eliminateHandler(this);
+ connector.eliminateHandler(this, false);
}
/**
@@ -90,7 +90,7 @@ public abstract class PipeTransferTrackableHandler
connector.trackHandler(this);
if (connector.isClosed()) {
clearEventsReferenceCount();
- connector.eliminateHandler(this);
+ connector.eliminateHandler(this, true);
client.setShouldReturnSelf(true);
try {
client.returnSelf();
@@ -119,8 +119,7 @@ public abstract class PipeTransferTrackableHandler
public abstract void clearEventsReferenceCount();
- @Override
- public void close() {
+ public void closeClient() {
if (Objects.isNull(client)) {
return;
}
@@ -135,4 +134,9 @@ public abstract class PipeTransferTrackableHandler
e);
}
}
+
+ @Override
+ public void close() {
+ // Do nothing
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index ed4a0517323..39c4b148e21 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -418,7 +418,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
}
if (connector.isClosed()) {
- close();
+ closeClient();
}
client.setShouldReturnSelf(true);