This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 9f277c0ce23 Pipe: iotdb-thrift-connector async retry mechanism 
(#14916) (#15201)
9f277c0ce23 is described below

commit 9f277c0ce235d419ae6950981f7a4f6343512075
Author: Itami Sho <[email protected]>
AuthorDate: Thu Mar 27 17:22:09 2025 +0800

    Pipe: iotdb-thrift-connector async retry mechanism (#14916) (#15201)
---
 .../evolvable/batch/PipeTabletEventBatch.java      |  12 +-
 .../batch/PipeTransferBatchReqBuilder.java         |   4 +-
 .../async/IoTDBDataRegionAsyncConnector.java       | 175 +++++++++++++++------
 .../apache/iotdb/commons/conf/CommonConfig.java    |  35 ++++-
 .../iotdb/commons/conf/CommonDescriptor.java       |  30 ++++
 .../iotdb/commons/pipe/config/PipeConfig.java      |  22 +++
 6 files changed, 220 insertions(+), 58 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
index 44374adb22a..bd07d0c34ad 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -69,8 +69,16 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
       if (((EnrichedEvent) event)
           
.increaseReferenceCount(PipeTransferBatchReqBuilder.class.getName())) {
 
-        if (constructBatch(event)) {
-          events.add((EnrichedEvent) event);
+        try {
+          if (constructBatch(event)) {
+            events.add((EnrichedEvent) event);
+          }
+        } catch (final Exception e) {
+          // If the event is not added to the batch, we need to decrease the 
reference count.
+          ((EnrichedEvent) event)
+              
.decreaseReferenceCount(PipeTransferBatchReqBuilder.class.getName(), false);
+          // Will cause a retry
+          throw e;
         }
 
         if (firstEventProcessingTime == Long.MIN_VALUE) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
index 9b787ee30f6..74ab3a1ebdc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
@@ -29,7 +29,6 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 
-import org.apache.tsfile.exception.write.WriteProcessException;
 import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -128,8 +127,7 @@ public class PipeTransferBatchReqBuilder implements 
AutoCloseable {
    *     endpoint to transfer to (might be null), the second element is the 
batch to be transferred.
    */
   public synchronized Pair<TEndPoint, PipeTabletEventBatch> onEvent(
-      final TabletInsertionEvent event)
-      throws IOException, WALPipeException, WriteProcessException {
+      final TabletInsertionEvent event) throws IOException, WALPipeException {
     if (!(event instanceof EnrichedEvent)) {
       LOGGER.warn(
           "Unsupported event {} type {} when building transfer request", 
event, event.getClass());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index b92c066e8be..ca38765820c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -24,7 +24,6 @@ import 
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import 
org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeConnectorSubtask;
 import 
org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeAsyncClientManager;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventBatch;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventPlainBatch;
@@ -45,6 +44,7 @@ import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
@@ -53,6 +53,7 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 
 import com.google.common.collect.ImmutableSet;
@@ -91,8 +92,16 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
   private static final String THRIFT_ERROR_FORMATTER_WITH_ENDPOINT =
       "Exception occurred while sending to receiver %s:%s.";
 
-  private final IoTDBDataRegionSyncConnector retryConnector = new 
IoTDBDataRegionSyncConnector();
+  private final IoTDBDataRegionSyncConnector syncConnector = new 
IoTDBDataRegionSyncConnector();
   private final BlockingQueue<Event> retryEventQueue = new 
LinkedBlockingQueue<>();
+  private final PipeDataRegionEventCounter retryEventQueueEventCounter =
+      new PipeDataRegionEventCounter();
+  private final int forcedRetryTsFileEventQueueSizeThreshold =
+      
PipeConfig.getInstance().getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold();
+  private final int forcedRetryTabletEventQueueSizeThreshold =
+      
PipeConfig.getInstance().getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold();
+  private final int forcedRetryTotalEventQueueSizeThreshold =
+      
PipeConfig.getInstance().getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold();
   private final long maxRetryExecutionTimeMsPerCall =
       
PipeConfig.getInstance().getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall();
 
@@ -108,7 +117,7 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
   @Override
   public void validate(final PipeParameterValidator validator) throws 
Exception {
     super.validate(validator);
-    retryConnector.validate(validator);
+    syncConnector.validate(validator);
 
     final PipeParameters parameters = validator.getParameters();
 
@@ -125,7 +134,7 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
       final PipeParameters parameters, final PipeConnectorRuntimeConfiguration 
configuration)
       throws Exception {
     super.customize(parameters, configuration);
-    retryConnector.customize(parameters, configuration);
+    syncConnector.customize(parameters, configuration);
 
     clientManager =
         new IoTDBDataNodeAsyncClientManager(
@@ -149,17 +158,17 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
   @Override
   // Synchronized to avoid close connector when transfer event
   public synchronized void handshake() throws Exception {
-    retryConnector.handshake();
+    syncConnector.handshake();
   }
 
   @Override
-  public void heartbeat() {
-    retryConnector.heartbeat();
+  public void heartbeat() throws Exception {
+    syncConnector.heartbeat();
   }
 
   @Override
   public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
-    transferQueuedEventsIfNecessary();
+    transferQueuedEventsIfNecessary(false);
 
     if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)
         && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
@@ -173,9 +182,6 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
     if (isTabletBatchModeEnabled) {
       final Pair<TEndPoint, PipeTabletEventBatch> endPointAndBatch =
           tabletBatchBuilder.onEvent(tabletInsertionEvent);
-      if (Objects.isNull(endPointAndBatch)) {
-        return;
-      }
       transferInBatchWithoutCheck(endPointAndBatch);
     } else {
       transferInEventWithoutCheck(tabletInsertionEvent);
@@ -185,6 +191,9 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
   private void transferInBatchWithoutCheck(
       final Pair<TEndPoint, PipeTabletEventBatch> endPointAndBatch)
       throws IOException, WriteProcessException, InterruptedException {
+    if (Objects.isNull(endPointAndBatch)) {
+      return;
+    }
     final PipeTabletEventBatch batch = endPointAndBatch.getRight();
 
     if (batch instanceof PipeTabletEventPlainBatch) {
@@ -226,7 +235,7 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
     endPointAndBatch.getRight().onSuccess();
   }
 
-  private void transferInEventWithoutCheck(final TabletInsertionEvent 
tabletInsertionEvent)
+  private boolean transferInEventWithoutCheck(final TabletInsertionEvent 
tabletInsertionEvent)
       throws Exception {
     if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
       final PipeInsertNodeTabletInsertionEvent 
pipeInsertNodeTabletInsertionEvent =
@@ -234,7 +243,7 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
       // We increase the reference count for this event to determine if the 
event may be released.
       if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
           IoTDBDataRegionAsyncConnector.class.getName())) {
-        return;
+        return false;
       }
 
       final InsertNode insertNode =
@@ -258,7 +267,7 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
       // We increase the reference count for this event to determine if the 
event may be released.
       if (!pipeRawTabletInsertionEvent.increaseReferenceCount(
           IoTDBDataRegionAsyncConnector.class.getName())) {
-        return;
+        return false;
       }
 
       final TPipeTransferReq pipeTransferTabletRawReq =
@@ -272,6 +281,8 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
 
       transfer(pipeRawTabletInsertionEvent.getDeviceId(), 
pipeTransferTabletReqHandler);
     }
+
+    return true;
   }
 
   private void transfer(
@@ -314,7 +325,7 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
 
   @Override
   public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws 
Exception {
-    transferQueuedEventsIfNecessary();
+    transferQueuedEventsIfNecessary(false);
     transferBatchedEventsIfNecessary();
 
     if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
@@ -327,14 +338,14 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
     transferWithoutCheck(tsFileInsertionEvent);
   }
 
-  private void transferWithoutCheck(final TsFileInsertionEvent 
tsFileInsertionEvent)
+  private boolean transferWithoutCheck(final TsFileInsertionEvent 
tsFileInsertionEvent)
       throws Exception {
     final PipeTsFileInsertionEvent pipeTsFileInsertionEvent =
         (PipeTsFileInsertionEvent) tsFileInsertionEvent;
     // We increase the reference count for this event to determine if the 
event may be released.
     if (!pipeTsFileInsertionEvent.increaseReferenceCount(
         IoTDBDataRegionAsyncConnector.class.getName())) {
-      return;
+      return false;
     }
 
     // We assume that no exceptions will be thrown after reference count is 
increased.
@@ -361,6 +372,7 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
                   && clientManager.supportModsIfIsDataNodeReceiver());
 
       transfer(pipeTransferTsFileHandler);
+      return true;
     } catch (final Exception e) {
       // Just in case. To avoid the case that exception occurred when 
constructing the handler.
       pipeTsFileInsertionEvent.decreaseReferenceCount(
@@ -382,7 +394,7 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
 
   @Override
   public void transfer(final Event event) throws Exception {
-    transferQueuedEventsIfNecessary();
+    transferQueuedEventsIfNecessary(true);
     transferBatchedEventsIfNecessary();
 
     if (!(event instanceof PipeHeartbeatEvent
@@ -393,7 +405,20 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
       return;
     }
 
-    retryConnector.transfer(event);
+    syncConnector.transfer(event);
+  }
+
+  /** Try its best to commit data in order. Flush can also be a trigger to 
transfer batched data. */
+  private void transferBatchedEventsIfNecessary()
+      throws IOException, WriteProcessException, InterruptedException {
+    if (!isTabletBatchModeEnabled || tabletBatchBuilder.isEmpty()) {
+      return;
+    }
+
+    for (final Pair<TEndPoint, PipeTabletEventBatch> endPointAndBatch :
+        tabletBatchBuilder.getAllNonEmptyBatches()) {
+      transferInBatchWithoutCheck(endPointAndBatch);
+    }
   }
 
   //////////////////////////// Leader cache update ////////////////////////////
@@ -425,14 +450,19 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
    * @see PipeConnector#transfer(TabletInsertionEvent) for more details.
    * @see PipeConnector#transfer(TsFileInsertionEvent) for more details.
    */
-  private void transferQueuedEventsIfNecessary() throws Exception {
-    if (retryEventQueue.isEmpty()) {
-      // Trigger cron heartbeat event in retry connector to send batch in time
-      retryConnector.transfer(PipeConnectorSubtask.CRON_HEARTBEAT_EVENT);
+  private void transferQueuedEventsIfNecessary(final boolean forced) throws 
Exception {
+    if (retryEventQueue.isEmpty()
+        || (!forced
+            && retryEventQueueEventCounter.getTabletInsertionEventCount()
+                < forcedRetryTabletEventQueueSizeThreshold
+            && retryEventQueueEventCounter.getTsFileInsertionEventCount()
+                < forcedRetryTsFileEventQueueSizeThreshold
+            && retryEventQueue.size() < 
forcedRetryTotalEventQueueSizeThreshold)) {
       return;
     }
 
     final long retryStartTime = System.currentTimeMillis();
+    final int remainingEvents = retryEventQueue.size();
     while (!retryEventQueue.isEmpty()) {
       synchronized (this) {
         if (isClosed.get()) {
@@ -445,23 +475,19 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
         final Event peekedEvent = retryEventQueue.peek();
 
         if (peekedEvent instanceof PipeInsertNodeTabletInsertionEvent) {
-          retryConnector.transfer((PipeInsertNodeTabletInsertionEvent) 
peekedEvent);
+          retryTransfer((PipeInsertNodeTabletInsertionEvent) peekedEvent);
         } else if (peekedEvent instanceof PipeRawTabletInsertionEvent) {
-          retryConnector.transfer((PipeRawTabletInsertionEvent) peekedEvent);
+          retryTransfer((PipeRawTabletInsertionEvent) peekedEvent);
         } else if (peekedEvent instanceof PipeTsFileInsertionEvent) {
-          retryConnector.transfer((PipeTsFileInsertionEvent) peekedEvent);
+          retryTransfer((PipeTsFileInsertionEvent) peekedEvent);
         } else {
           LOGGER.warn(
               "IoTDBThriftAsyncConnector does not support transfer generic 
event: {}.",
               peekedEvent);
         }
 
-        if (peekedEvent instanceof EnrichedEvent) {
-          ((EnrichedEvent) peekedEvent)
-              
.decreaseReferenceCount(IoTDBDataRegionAsyncConnector.class.getName(), true);
-        }
-
         final Event polledEvent = retryEventQueue.poll();
+        retryEventQueueEventCounter.decreaseEventCount(polledEvent);
         if (polledEvent != peekedEvent) {
           LOGGER.error(
               "The event polled from the queue is not the same as the event 
peeked from the queue. "
@@ -476,24 +502,66 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
 
       // Stop retrying if the execution time exceeds the threshold for better 
realtime performance
       if (System.currentTimeMillis() - retryStartTime > 
maxRetryExecutionTimeMsPerCall) {
-        break;
+        if (retryEventQueueEventCounter.getTabletInsertionEventCount()
+                < forcedRetryTabletEventQueueSizeThreshold
+            && retryEventQueueEventCounter.getTsFileInsertionEventCount()
+                < forcedRetryTsFileEventQueueSizeThreshold
+            && retryEventQueue.size() < 
forcedRetryTotalEventQueueSizeThreshold) {
+          return;
+        }
+
+        if (remainingEvents <= retryEventQueue.size()) {
+          throw new PipeException("Failed to transfer events in retry queue.");
+        }
       }
     }
-
-    // Trigger cron heartbeat event in retry connector to send batch in time
-    retryConnector.transfer(PipeConnectorSubtask.CRON_HEARTBEAT_EVENT);
   }
 
-  /** Try its best to commit data in order. Flush can also be a trigger to 
transfer batched data. */
-  private void transferBatchedEventsIfNecessary()
-      throws IOException, WriteProcessException, InterruptedException {
-    if (!isTabletBatchModeEnabled || tabletBatchBuilder.isEmpty()) {
+  private void retryTransfer(final TabletInsertionEvent tabletInsertionEvent) {
+    if (isTabletBatchModeEnabled) {
+      try {
+        
transferInBatchWithoutCheck(tabletBatchBuilder.onEvent(tabletInsertionEvent));
+        if (tabletInsertionEvent instanceof EnrichedEvent) {
+          ((EnrichedEvent) tabletInsertionEvent)
+              
.decreaseReferenceCount(IoTDBDataRegionAsyncConnector.class.getName(), false);
+        }
+      } catch (final Exception e) {
+        addFailureEventToRetryQueue(tabletInsertionEvent);
+      }
       return;
     }
 
-    for (final Pair<TEndPoint, PipeTabletEventBatch> endPointAndBatch :
-        tabletBatchBuilder.getAllNonEmptyBatches()) {
-      transferInBatchWithoutCheck(endPointAndBatch);
+    // Tablet batch mode is not enabled, so we need to transfer the event 
directly.
+    try {
+      if (transferInEventWithoutCheck(tabletInsertionEvent)) {
+        if (tabletInsertionEvent instanceof EnrichedEvent) {
+          ((EnrichedEvent) tabletInsertionEvent)
+              
.decreaseReferenceCount(IoTDBDataRegionAsyncConnector.class.getName(), false);
+        }
+      } else {
+        addFailureEventToRetryQueue(tabletInsertionEvent);
+      }
+    } catch (final Exception e) {
+      if (tabletInsertionEvent instanceof EnrichedEvent) {
+        ((EnrichedEvent) tabletInsertionEvent)
+            
.decreaseReferenceCount(IoTDBDataRegionAsyncConnector.class.getName(), false);
+      }
+      addFailureEventToRetryQueue(tabletInsertionEvent);
+    }
+  }
+
+  private void retryTransfer(final PipeTsFileInsertionEvent 
tsFileInsertionEvent) {
+    try {
+      if (transferWithoutCheck(tsFileInsertionEvent)) {
+        tsFileInsertionEvent.decreaseReferenceCount(
+            IoTDBDataRegionAsyncConnector.class.getName(), false);
+      } else {
+        addFailureEventToRetryQueue(tsFileInsertionEvent);
+      }
+    } catch (final Exception e) {
+      tsFileInsertionEvent.decreaseReferenceCount(
+          IoTDBDataRegionAsyncConnector.class.getName(), false);
+      addFailureEventToRetryQueue(tsFileInsertionEvent);
     }
   }
 
@@ -516,6 +584,7 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
     }
 
     retryEventQueue.offer(event);
+    retryEventQueueEventCounter.increaseEventCount(event);
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("Added event {} to retry queue.", event);
     }
@@ -536,15 +605,6 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
     events.forEach(this::addFailureEventToRetryQueue);
   }
 
-  public synchronized void clearRetryEventsReferenceCount() {
-    while (!retryEventQueue.isEmpty()) {
-      final Event event = retryEventQueue.poll();
-      if (event instanceof EnrichedEvent) {
-        ((EnrichedEvent) 
event).clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName());
-      }
-    }
-  }
-
   //////////////////////////// Operations for close 
////////////////////////////
 
   @Override
@@ -559,6 +619,7 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
               && regionId == ((EnrichedEvent) event).getRegionId()) {
             ((EnrichedEvent) event)
                 
.clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName());
+            retryEventQueueEventCounter.decreaseEventCount(event);
             return true;
           }
           return false;
@@ -570,7 +631,7 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
   public synchronized void close() {
     isClosed.set(true);
 
-    retryConnector.close();
+    syncConnector.close();
 
     if (tabletBatchBuilder != null) {
       tabletBatchBuilder.close();
@@ -600,6 +661,16 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
     super.close();
   }
 
+  public synchronized void clearRetryEventsReferenceCount() {
+    while (!retryEventQueue.isEmpty()) {
+      final Event event = retryEventQueue.poll();
+      retryEventQueueEventCounter.decreaseEventCount(event);
+      if (event instanceof EnrichedEvent) {
+        ((EnrichedEvent) 
event).clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName());
+      }
+    }
+  }
+
   //////////////////////// APIs provided for metric framework 
////////////////////////
 
   public int getRetryEventQueueSize() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 99e041cdfb4..dd15c21a48c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -235,6 +235,9 @@ public class CommonConfig {
   private long pipeConnectorRetryIntervalMs = 1000L;
   private boolean pipeConnectorRPCThriftCompressionEnabled = false;
 
+  private int pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold = 5;
+  private int pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold = 20;
+  private int pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold = 30;
   private long pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall = 500;
   private int pipeAsyncConnectorSelectorNumber =
       Math.max(4, Runtime.getRuntime().availableProcessors() / 2);
@@ -261,7 +264,7 @@ public class CommonConfig {
 
   private int pipeMaxAllowedHistoricalTsFilePerDataRegion = 100;
   private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 5;
-  private int pipeMaxAllowedPinnedMemTableCount = 5; // per data region
+  private int pipeMaxAllowedPinnedMemTableCount = 10; // per data region
   private long pipeMaxAllowedLinkedTsFileCount = 300;
   private float pipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage = 0.1F;
   private long pipeStuckRestartIntervalSeconds = 120;
@@ -862,6 +865,36 @@ public class CommonConfig {
     return pipeConnectorRPCThriftCompressionEnabled;
   }
 
+  public void setPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold(
+      int pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold) {
+    this.pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold =
+        pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold;
+  }
+
+  public int getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold() {
+    return pipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold;
+  }
+
+  public void setPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold(
+      int pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold) {
+    this.pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold =
+        pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold;
+  }
+
+  public int getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold() {
+    return pipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold;
+  }
+
+  public void setPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold(
+      int pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold) {
+    this.pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold =
+        pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold;
+  }
+
+  public int getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold() {
+    return pipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold;
+  }
+
   public void setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall(
       long pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall) {
     this.pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall =
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index c6b7013b23e..d7d8ba068f4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -430,6 +430,36 @@ public class CommonDescriptor {
                         
"pipe_async_connector_max_retry_execution_time_ms_per_call",
                         String.valueOf(
                             
config.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall())))));
+    config.setPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold(
+        Integer.parseInt(
+            Optional.ofNullable(
+                    
properties.getProperty("pipe_async_sink_forced_retry_tsfile_event_queue_size"))
+                .orElse(
+                    properties.getProperty(
+                        
"pipe_async_connector_forced_retry_tsfile_event_queue_size",
+                        String.valueOf(
+                            config
+                                
.getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold())))));
+    config.setPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold(
+        Integer.parseInt(
+            Optional.ofNullable(
+                    
properties.getProperty("pipe_async_sink_forced_retry_tablet_event_queue_size"))
+                .orElse(
+                    properties.getProperty(
+                        
"pipe_async_connector_forced_retry_tablet_event_queue_size",
+                        String.valueOf(
+                            config
+                                
.getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold())))));
+    config.setPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold(
+        Integer.parseInt(
+            Optional.ofNullable(
+                    
properties.getProperty("pipe_async_sink_forced_retry_total_event_queue_size"))
+                .orElse(
+                    properties.getProperty(
+                        
"pipe_async_connector_forced_retry_total_event_queue_size",
+                        String.valueOf(
+                            config
+                                
.getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold())))));
     int pipeAsyncConnectorSelectorNumber =
         Integer.parseInt(
             
Optional.ofNullable(properties.getProperty("pipe_sink_selector_number"))
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 0f558782f00..24d49da94c1 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -152,6 +152,18 @@ public class PipeConfig {
     return COMMON_CONFIG.isPipeConnectorRPCThriftCompressionEnabled();
   }
 
+  public int getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold() {
+    return 
COMMON_CONFIG.getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold();
+  }
+
+  public int getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold() {
+    return 
COMMON_CONFIG.getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold();
+  }
+
+  public int getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold() {
+    return 
COMMON_CONFIG.getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold();
+  }
+
   public long getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall() {
     return COMMON_CONFIG.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall();
   }
@@ -438,6 +450,16 @@ public class PipeConfig {
         "PipeRemainingTimeCommitRateAverageTime: {}", 
getPipeRemainingTimeCommitRateAverageTime());
     LOGGER.info("PipeTsFileScanParsingThreshold(): {}", 
getPipeTsFileScanParsingThreshold());
 
+    LOGGER.info(
+        "PipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold: {}",
+        getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold());
+    LOGGER.info(
+        "PipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold: {}",
+        getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold());
+    LOGGER.info(
+        "PipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold: {}",
+        getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold());
+
     LOGGER.info(
         "PipeAsyncConnectorMaxRetryExecutionTimeMsPerCall: {}",
         getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall());

Reply via email to