This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 8080d1ff1a9 Pipe: Fixed Async Connector Repeatedly Putting Events into
RetryQueue (#15179) (#15184)
8080d1ff1a9 is described below
commit 8080d1ff1a9cf1d5ba3ee0d0f0ae87a12aad6b15
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Mar 25 11:57:51 2025 +0800
Pipe: Fixed Async Connector Repeatedly Putting Events into RetryQueue
(#15179) (#15184)
Co-authored-by: Steve Yurong Su <[email protected]>
(cherry picked from commit bc078fccc89c7518db2d44d421373f5c47d51c64)
---
.../async/IoTDBDataRegionAsyncConnector.java | 29 ++++++++++++++--------
1 file changed, 18 insertions(+), 11 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index bf0b8df2d6c..b92c066e8be 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -199,17 +199,24 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
final AtomicInteger eventsReferenceCount = new
AtomicInteger(sealedFiles.size());
final AtomicBoolean eventsHadBeenAddedToRetryQueue = new
AtomicBoolean(false);
- for (final File sealedFile : sealedFiles) {
- transfer(
- new PipeTransferTsFileHandler(
- this,
- pipe2WeightMap,
- events,
- eventsReferenceCount,
- eventsHadBeenAddedToRetryQueue,
- sealedFile,
- null,
- false));
+ try {
+ for (final File sealedFile : sealedFiles) {
+ transfer(
+ new PipeTransferTsFileHandler(
+ this,
+ pipe2WeightMap,
+ events,
+ eventsReferenceCount,
+ eventsHadBeenAddedToRetryQueue,
+ sealedFile,
+ null,
+ false));
+ }
+ } catch (final Throwable t) {
+ LOGGER.warn("Failed to transfer tsfile batch ({}).", sealedFiles, t);
+ if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) {
+ addFailureEventsToRetryQueue(events);
+ }
}
} else {
LOGGER.warn(