This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8b78c3a4f87 Pipe: Fix and improve async tsfile transfer error handling
and logging (avoid client connection leak) (#16008)
8b78c3a4f87 is described below
commit 8b78c3a4f87be66e9fada6695ef182a6d5ebf6e5
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Aug 8 09:53:59 2025 +0800
Pipe: Fix and improve async tsfile transfer error handling and logging
(avoid client connection leak) (#16008)
* Pipe: Fix and improve async tsfile transfer error handling and logging
Refactored IoTDBDataRegionAsyncConnector to handle exceptions during
asynchronous tsfile transfer more gracefully. Now logs warnings instead of
errors, invokes onError on the handler, and provides more context in log
messages. Added getTsFile() method to PipeTransferTsFileHandler for better
logging, and ensured memoryBlock is set to null after closing to prevent
potential resource leaks.
* fix
* fix
* bald-logger
---------
Co-authored-by: Caideyipi <[email protected]>
---
.../thrift/async/IoTDBDataRegionAsyncSink.java | 24 ++++++++++-------
.../handler/PipeTransferTrackableHandler.java | 30 ++++++++++++++++++++--
.../async/handler/PipeTransferTsFileHandler.java | 26 ++++++++++---------
3 files changed, 57 insertions(+), 23 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index 1f35a96b84d..5d6d1beba0b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -416,8 +416,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
}
}
- private void transfer(final PipeTransferTsFileHandler
pipeTransferTsFileHandler)
- throws Exception {
+ private void transfer(final PipeTransferTsFileHandler
pipeTransferTsFileHandler) {
transferTsFileCounter.incrementAndGet();
CompletableFuture<Void> completableFuture =
CompletableFuture.supplyAsync(
@@ -439,13 +438,20 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
if (PipeConfig.getInstance().isTransferTsFileSync() || !isRealtimeFirst) {
try {
completableFuture.get();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOGGER.error("Transfer tsfile event asynchronously was interrupted.",
e);
- throw new PipeException("Transfer tsfile event asynchronously was
interrupted.", e);
- } catch (Exception e) {
- LOGGER.error("Failed to transfer tsfile event asynchronously.", e);
- throw e;
+ } catch (final Exception e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn(
+ "Transfer tsfile event {} asynchronously was interrupted.",
+ pipeTransferTsFileHandler.getTsFile(),
+ e);
+ }
+
+ pipeTransferTsFileHandler.onError(e);
+ LOGGER.warn(
+ "Failed to transfer tsfile event {} asynchronously.",
+ pipeTransferTsFileHandler.getTsFile(),
+ e);
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
index 25a341d954d..0b7ba7554fa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
@@ -26,11 +26,17 @@ import
org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
public abstract class PipeTransferTrackableHandler
implements AsyncMethodCallback<TPipeTransferResp>, AutoCloseable {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTransferTsFileHandler.class);
protected final IoTDBDataRegionAsyncSink connector;
+ protected volatile AsyncPipeDataTransferServiceClient client;
public PipeTransferTrackableHandler(final IoTDBDataRegionAsyncSink
connector) {
this.connector = connector;
@@ -77,13 +83,21 @@ public abstract class PipeTransferTrackableHandler
protected boolean tryTransfer(
final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq
req)
throws TException {
+ if (Objects.isNull(this.client)) {
+ this.client = client;
+ }
// track handler before checking if connector is closed
connector.trackHandler(this);
if (connector.isClosed()) {
clearEventsReferenceCount();
connector.eliminateHandler(this);
client.setShouldReturnSelf(true);
- client.returnSelf();
+ try {
+ client.returnSelf();
+ } catch (final IllegalStateException e) {
+ LOGGER.info(
+ "Illegal state when return the client to object pool, maybe the
pool is already cleared. Will ignore.");
+ }
return false;
}
doTransfer(client, req);
@@ -106,6 +120,18 @@ public abstract class PipeTransferTrackableHandler
@Override
public void close() {
- // do nothing
+ if (Objects.isNull(client)) {
+ return;
+ }
+ try {
+ client.close();
+ client.invalidateAll();
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Failed to close or invalidate client when connector is closed.
Client: {}, Exception: {}",
+ client,
+ e.getMessage(),
+ e);
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 4ca34073d44..ed4a0517323 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -87,7 +87,6 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
private final AtomicBoolean isSealSignalSent;
private IoTDBDataNodeAsyncClientManager clientManager;
- private volatile AsyncPipeDataTransferServiceClient client;
public PipeTransferTsFileHandler(
final IoTDBDataRegionAsyncSink connector,
@@ -131,6 +130,10 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
isSealSignalSent = new AtomicBoolean(false);
}
+ public File getTsFile() {
+ return tsFile;
+ }
+
public void transfer(
final IoTDBDataNodeAsyncClientManager clientManager,
final AsyncPipeDataTransferServiceClient client)
@@ -415,19 +418,16 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
}
if (connector.isClosed()) {
- try {
- client.close();
- client.invalidateAll();
- } catch (final Exception e) {
- LOGGER.warn(
- "Failed to close or invalidate client when connector is closed.
Client: {}, Exception: {}",
- client,
- e.getMessage(),
- e);
- }
+ close();
}
+
client.setShouldReturnSelf(true);
- client.returnSelf();
+ try {
+ client.returnSelf();
+ } catch (final IllegalStateException e) {
+ LOGGER.info(
+ "Illegal state when return the client to object pool, maybe the pool
is already cleared. Will ignore.");
+ }
client = null;
}
@@ -454,8 +454,10 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
@Override
public void close() {
super.close();
+
if (memoryBlock != null) {
memoryBlock.close();
+ memoryBlock = null;
}
}