This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch fix-pipe-client-not-return-master in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 39854d7eae9451c3ec3bea88b0a1039b7c0dd96f Author: Steve Yurong Su <[email protected]> AuthorDate: Tue Apr 1 17:27:59 2025 +0800 Pipe: Fix stuck caused by async connector client not returned after transferring tsfiles & Fix validateTsFile and shouldMarkAsPipeRequest may not be effective (#15245) --- .../client/IoTDBDataNodeAsyncClientManager.java | 6 ++- .../async/IoTDBDataRegionAsyncConnector.java | 12 +++-- .../async/handler/PipeTransferTsFileHandler.java | 56 ++++++++++++++++++---- 3 files changed, 59 insertions(+), 15 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java index 79c56fa6890..aa5bdd5112f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java @@ -102,10 +102,12 @@ public class IoTDBDataNodeAsyncClientManager extends IoTDBClientManager receiverAttributes = String.format( - "%s-%s-%s", + "%s-%s-%s-%s-%s", Base64.getEncoder().encodeToString((username + ":" + password).getBytes()), shouldReceiverConvertOnTypeMismatch, - loadTsFileStrategy); + loadTsFileStrategy, + validateTsFile, + shouldMarkAsPipeRequest); synchronized (IoTDBDataNodeAsyncClientManager.class) { if (!ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.containsKey(receiverAttributes)) { ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.putIfAbsent( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java index 3ca9635fb85..b8ea083a113 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java @@ -460,9 +460,6 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { /** * Transfer queued {@link Event}s which are waiting for retry. * - * @throws Exception if an error occurs. The error will be handled by pipe framework, which will - * retry the {@link Event} and mark the {@link Event} as failure and stop the pipe if the - * retry times exceeds the threshold. * @see PipeConnector#transfer(Event) for more details. * @see PipeConnector#transfer(TabletInsertionEvent) for more details. * @see PipeConnector#transfer(TsFileInsertionEvent) for more details. @@ -528,7 +525,14 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { } if (remainingEvents <= retryEventQueue.size()) { - throw new PipeException("Failed to transfer events in retry queue."); + throw new PipeException( + "Failed to retry transferring events in the retry queue. Remaining events: " + + retryEventQueue.size() + + " (tablet events: " + + retryEventQueueEventCounter.getTabletInsertionEventCount() + + ", tsfile events: " + + retryEventQueueEventCounter.getTsFileInsertionEventCount() + + ")."); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java index 6698ebc0ca3..0657e69e68e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java @@ -89,7 +89,7 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { private final AtomicBoolean isSealSignalSent; private IoTDBDataNodeAsyncClientManager clientManager; - private AsyncPipeDataTransferServiceClient client; + private volatile AsyncPipeDataTransferServiceClient client; public PipeTransferTsFileHandler( final IoTDBDataRegionAsyncConnector connector, @@ -152,6 +152,14 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { this.clientManager = clientManager; this.client = client; + if (client == null) { + LOGGER.warn( + "Client has been returned to the pool. Current handler status is {}. Will not transfer {}.", + connector.isClosed() ? "CLOSED" : "NOT CLOSED", + tsFile); + return; + } + client.setShouldReturnSelf(false); client.setTimeoutDynamically(clientManager.getConnectionTimeout()); @@ -233,6 +241,17 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { position += readLength; } + @Override + public void onComplete(final TPipeTransferResp response) { + try { + super.onComplete(response); + } finally { + if (connector.isClosed()) { + returnClientIfNecessary(); + } + } + } + @Override protected boolean onCompleteInternal(final TPipeTransferResp response) { if (isSealSignalSent.get()) { @@ -292,10 +311,7 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { referenceCount); } - if (client != null) { - client.setShouldReturnSelf(true); - client.returnSelf(); - } + returnClientIfNecessary(); } return true; @@ -334,6 +350,15 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { return false; // due to seal transfer not yet completed } + @Override + public void onError(final Exception exception) { + try { + super.onError(exception); + } finally { + returnClientIfNecessary(); + } + } + @Override protected void onErrorInternal(final Exception exception) { try { @@ -379,10 +404,7 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { LOGGER.warn("Failed to close file reader or delete tsFile when failed to transfer file.", e); } finally { try { - if (client != null) { - client.setShouldReturnSelf(true); - client.returnSelf(); - } + returnClientIfNecessary(); } finally { if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) { connector.addFailureEventsToRetryQueue(events); @@ -391,10 +413,26 @@ public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler { } } + private void returnClientIfNecessary() { + if (client != null) { + client.setShouldReturnSelf(true); + client.returnSelf(); + client = null; + } + } + @Override protected void doTransfer( final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq req) throws TException { + if (client == null) { + LOGGER.warn( + "Client has been returned to the pool. Current handler status is {}. Will not transfer {}.", + connector.isClosed() ? "CLOSED" : "NOT CLOSED", + tsFile); + return; + } + client.pipeTransfer(req, this); }
