This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch pipe-borrow-timeout-opt
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/pipe-borrow-timeout-opt by
this push:
new e828dfef99a fix
e828dfef99a is described below
commit e828dfef99ab01789b52285f2652cfa2eb0d767d
Author: Caideyipi <[email protected]>
AuthorDate: Thu Aug 7 18:26:36 2025 +0800
fix
---
.../async/handler/PipeTransferTrackableHandler.java | 20 +++++++++++++++++++-
.../async/handler/PipeTransferTsFileHandler.java | 20 ++++++++------------
2 files changed, 27 insertions(+), 13 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
index 25a341d954d..d5e765288d4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
@@ -26,11 +26,17 @@ import
org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
public abstract class PipeTransferTrackableHandler
implements AsyncMethodCallback<TPipeTransferResp>, AutoCloseable {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTransferTsFileHandler.class);
protected final IoTDBDataRegionAsyncSink connector;
+ protected volatile AsyncPipeDataTransferServiceClient client;
public PipeTransferTrackableHandler(final IoTDBDataRegionAsyncSink
connector) {
this.connector = connector;
@@ -77,6 +83,9 @@ public abstract class PipeTransferTrackableHandler
protected boolean tryTransfer(
final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq
req)
throws TException {
+ if (Objects.isNull(this.client)) {
+ this.client = client;
+ }
// track handler before checking if connector is closed
connector.trackHandler(this);
if (connector.isClosed()) {
@@ -106,6 +115,15 @@ public abstract class PipeTransferTrackableHandler
@Override
public void close() {
- // do nothing
+ try {
+ client.close();
+ client.invalidateAll();
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Failed to close or invalidate client when connector is closed.
Client: {}, Exception: {}",
+ client,
+ e.getMessage(),
+ e);
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index e40bdacd6a5..ed4a0517323 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -87,7 +87,6 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
private final AtomicBoolean isSealSignalSent;
private IoTDBDataNodeAsyncClientManager clientManager;
- private volatile AsyncPipeDataTransferServiceClient client;
public PipeTransferTsFileHandler(
final IoTDBDataRegionAsyncSink connector,
@@ -419,19 +418,16 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
}
if (connector.isClosed()) {
- try {
- client.close();
- client.invalidateAll();
- } catch (final Exception e) {
- LOGGER.warn(
- "Failed to close or invalidate client when connector is closed.
Client: {}, Exception: {}",
- client,
- e.getMessage(),
- e);
- }
+ close();
}
+
client.setShouldReturnSelf(true);
- client.returnSelf();
+ try {
+ client.returnSelf();
+ } catch (final IllegalStateException e) {
+ LOGGER.info(
+ "Illegal state when return the client to object pool, maybe the pool
is already cleared. Will ignore.");
+ }
client = null;
}