This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3bc7b9032f1 Pipe: Fix async sink handshake backoff (#18054)
3bc7b9032f1 is described below

commit 3bc7b9032f1e949a68e69233958e21df1cde45d8
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Jun 30 10:56:13 2026 +0800

    Pipe: Fix async sink handshake backoff (#18054)
    
    * Throttle async sink handshakes after failures
    
    * Schedule pipe sink backoff without blocking workers
    
    * update
---
 .../apache/iotdb/db/i18n/DataNodePipeMessages.java |  3 ++
 .../apache/iotdb/db/i18n/DataNodePipeMessages.java |  2 +
 .../subtask/processor/PipeProcessorSubtask.java    |  2 +
 .../agent/task/subtask/sink/PipeSinkSubtask.java   | 32 +++++++++++++++
 .../thrift/async/IoTDBDataRegionAsyncSink.java     | 45 +++++++++++++++++++++-
 .../agent/task/execution/PipeSubtaskExecutor.java  | 12 +++++-
 .../task/subtask/PipeAbstractSinkSubtask.java      | 44 ++++++++++++++++++++-
 .../pipe/agent/task/subtask/PipeSubtask.java       | 11 ++++++
 .../sink/protocol/PipeSinkWithSchedulingDelay.java | 27 +++++++++++++
 9 files changed, 175 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
 
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
index 050de467f26..13369fc5ee3 100644
--- 
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
+++ 
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
@@ -222,6 +222,9 @@ public final class DataNodePipeMessages {
   public static final String PIPE_SINK_SUBTASKS_WITH_ATTRIBUTES_IS_BOUNDED =
       "Pipe sink subtasks with attributes {} is bounded with sinkExecutor {} 
and "
           + "callbackExecutor {}.";
+  public static final String 
PIPE_SINK_SUBTASK_DELAYED_TO_AVOID_FREQUENT_HANDSHAKES =
+      "Pipe sink subtask {} is delayed for {} ms before polling events to 
avoid frequent "
+          + "handshakes after client borrow failures.";
   public static final String PIPE_SKIPPING_TEMPORARY_TSFILE_WHICH_SHOULDN_T =
       "Pipe skipping temporary TsFile which shouldn't be transferred: {}";
   public static final String PULLED_PIPE_META_FROM_CONFIG_NODE_RECOVERING =
diff --git 
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
 
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
index 147d977d572..49ee222c3e9 100644
--- 
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
+++ 
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
@@ -213,6 +213,8 @@ public final class DataNodePipeMessages {
   public static final String PIPE_SINK_SUBTASKS_WITH_ATTRIBUTES_IS_BOUNDED =
       "Pipe sink subtasks with attributes {} is bounded with sinkExecutor {} 
and "
           + "callbackExecutor {}.";
+  public static final String 
PIPE_SINK_SUBTASK_DELAYED_TO_AVOID_FREQUENT_HANDSHAKES =
+      "Pipe sink 子任务 {} 在拉取事件前延迟 {} ms,以避免客户端借用失败后频繁握手。";
   public static final String PIPE_SKIPPING_TEMPORARY_TSFILE_WHICH_SHOULDN_T =
       "Pipe 跳过不应传输的临时 TsFile:{}";
   public static final String PULLED_PIPE_META_FROM_CONFIG_NODE_RECOVERING =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index c37aab5af2c..295f30da3f2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -49,6 +49,7 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
 import org.apache.tsfile.external.commons.lang3.exception.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -104,6 +105,7 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
   @Override
   public void bindExecutors(
       final ListeningExecutorService subtaskWorkerThreadPoolExecutor,
+      final ListeningScheduledExecutorService ignoredScheduledExecutor,
       final ExecutorService ignored,
       final PipeSubtaskScheduler subtaskScheduler) {
     this.subtaskWorkerThreadPoolExecutor = subtaskWorkerThreadPoolExecutor;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 95884b17789..f79c2bb1238 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
 import 
org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard;
+import org.apache.iotdb.commons.pipe.sink.protocol.PipeSinkWithSchedulingDelay;
 import 
org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
 import org.apache.iotdb.db.i18n.DataNodePipeMessages;
@@ -162,6 +163,37 @@ public class PipeSinkSubtask extends 
PipeAbstractSinkSubtask {
     return true;
   }
 
+  @Override
+  protected long peekSchedulingDelayInMs() {
+    if (!(outputPipeSink instanceof PipeSinkWithSchedulingDelay)) {
+      return 0;
+    }
+
+    return ((PipeSinkWithSchedulingDelay) 
outputPipeSink).peekSchedulingDelayMs();
+  }
+
+  @Override
+  protected long consumeSchedulingDelayInMs() {
+    if (!(outputPipeSink instanceof PipeSinkWithSchedulingDelay)) {
+      return 0;
+    }
+
+    final long remainingSchedulingDelayMs =
+        ((PipeSinkWithSchedulingDelay) 
outputPipeSink).consumeSchedulingDelayMs();
+    if (remainingSchedulingDelayMs <= 0) {
+      return 0;
+    }
+
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug(
+          
DataNodePipeMessages.PIPE_SINK_SUBTASK_DELAYED_TO_AVOID_FREQUENT_HANDSHAKES,
+          getDisplayTaskID(),
+          remainingSchedulingDelayMs);
+    }
+
+    return remainingSchedulingDelayMs;
+  }
+
   private void transferHeartbeatEvent(final PipeHeartbeatEvent event) {
     // DO NOT call heartbeat or transfer after closed, or will cause 
connection leak
     if (isClosed.get()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index 7c85268e6d3..f02f1be3671 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
 import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
+import org.apache.iotdb.commons.pipe.sink.protocol.PipeSinkWithSchedulingDelay;
 import org.apache.iotdb.db.i18n.DataNodePipeMessages;
 import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
@@ -102,7 +103,7 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SIN
 
 @TreeModel
 @TableModel
-public class IoTDBDataRegionAsyncSink extends IoTDBSink {
+public class IoTDBDataRegionAsyncSink extends IoTDBSink implements 
PipeSinkWithSchedulingDelay {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBDataRegionAsyncSink.class);
 
@@ -130,6 +131,8 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
 
   // use these variables to prevent reference count leaks under some corner 
cases when closing
   private final AtomicBoolean isClosed = new AtomicBoolean(false);
+  private int consecutiveHandshakeFailureCount = 0;
+  private final AtomicLong schedulingDelayMs = new AtomicLong(0);
   private final Map<PipeTransferTrackableHandler, 
PipeTransferTrackableHandler> pendingHandlers =
       new ConcurrentHashMap<>();
 
@@ -352,8 +355,10 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
     AsyncPipeDataTransferServiceClient client = null;
     try {
       client = clientManager.borrowClient(endPoint);
+      markHandshakeSucceeded();
       pipeTransferTabletBatchEventHandler.transfer(client);
     } catch (final Exception ex) {
+      markSchedulingDelayIfHandshakeFailed(client);
       logOnClientException(client, ex);
       pipeTransferTabletBatchEventHandler.onError(ex);
     }
@@ -365,8 +370,10 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
     AsyncPipeDataTransferServiceClient client = null;
     try {
       client = clientManager.borrowClient(deviceId);
+      markHandshakeSucceeded();
       pipeTransferInsertNodeReqHandler.transfer(client);
     } catch (final Exception ex) {
+      markSchedulingDelayIfHandshakeFailed(client);
       logOnClientException(client, ex);
       pipeTransferInsertNodeReqHandler.onError(ex);
     }
@@ -377,8 +384,10 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
     AsyncPipeDataTransferServiceClient client = null;
     try {
       client = clientManager.borrowClient(deviceId);
+      markHandshakeSucceeded();
       pipeTransferTabletReqHandler.transfer(client);
     } catch (final Exception ex) {
+      markSchedulingDelayIfHandshakeFailed(client);
       logOnClientException(client, ex);
       pipeTransferTabletReqHandler.onError(ex);
     }
@@ -454,8 +463,10 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
               AsyncPipeDataTransferServiceClient client = null;
               try {
                 client = transferTsFileClientManager.borrowClient();
+                markHandshakeSucceeded();
                 
pipeTransferTsFileHandler.transfer(transferTsFileClientManager, client);
               } catch (final Exception ex) {
+                markSchedulingDelayIfHandshakeFailed(client);
                 logOnClientException(client, ex);
                 pipeTransferTsFileHandler.onError(ex);
               } finally {
@@ -555,6 +566,38 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
     }
   }
 
+  private void markHandshakeSucceeded() {
+    consecutiveHandshakeFailureCount = 0;
+  }
+
+  private void markSchedulingDelayIfHandshakeFailed(
+      final AsyncPipeDataTransferServiceClient client) {
+    if (client != null) {
+      return;
+    }
+
+    if (++consecutiveHandshakeFailureCount < 
getSchedulingDelayFailureThreshold()) {
+      return;
+    }
+
+    schedulingDelayMs.accumulateAndGet(
+        PipeConfig.getInstance().getPipeSinkRetryIntervalMs(), Math::max);
+  }
+
+  private int getSchedulingDelayFailureThreshold() {
+    return Math.max(1, nodeUrls.size() << 1);
+  }
+
+  @Override
+  public long peekSchedulingDelayMs() {
+    return schedulingDelayMs.get();
+  }
+
+  @Override
+  public long consumeSchedulingDelayMs() {
+    return schedulingDelayMs.getAndSet(0);
+  }
+
   /**
    * Transfer queued {@link Event}s which are waiting for retry.
    *
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
index 4782988868a..7fe169f7a38 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.commons.pipe.agent.task.subtask.PipeSubtask;
 import org.apache.iotdb.commons.utils.TestOnly;
 
 import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,6 +38,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
@@ -54,6 +56,7 @@ public abstract class PipeSubtaskExecutor {
 
   protected final WrappedThreadPoolExecutor underlyingThreadPool;
   protected final ListeningExecutorService subtaskWorkerThreadPoolExecutor;
+  protected final ListeningScheduledExecutorService 
subtaskWorkerScheduledExecutor;
 
   private final Map<String, PipeSubtask> registeredIdSubtaskMapper;
 
@@ -90,6 +93,9 @@ public abstract class PipeSubtaskExecutor {
       underlyingThreadPool.disableErrorLog();
     }
     subtaskWorkerThreadPoolExecutor = 
MoreExecutors.listeningDecorator(underlyingThreadPool);
+    final ScheduledExecutorService underlyingScheduledExecutor =
+        
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(workingThreadName + 
"-Scheduler");
+    subtaskWorkerScheduledExecutor = 
MoreExecutors.listeningDecorator(underlyingScheduledExecutor);
     subtaskCallbackListeningExecutor =
         Objects.nonNull(callbackThreadName)
             ? IoTDBThreadPoolFactory.newSingleThreadExecutor(
@@ -112,7 +118,10 @@ public abstract class PipeSubtaskExecutor {
 
     registeredIdSubtaskMapper.put(subtask.getTaskID(), subtask);
     subtask.bindExecutors(
-        subtaskWorkerThreadPoolExecutor, subtaskCallbackListeningExecutor, 
schedulerSupplier(this));
+        subtaskWorkerThreadPoolExecutor,
+        subtaskWorkerScheduledExecutor,
+        subtaskCallbackListeningExecutor,
+        schedulerSupplier(this));
   }
 
   private static String getSafeSubtaskStr(final String subtaskID) {
@@ -191,6 +200,7 @@ public abstract class PipeSubtaskExecutor {
     }
 
     subtaskWorkerThreadPoolExecutor.shutdown();
+    subtaskWorkerScheduledExecutor.shutdown();
     if (subtaskCallbackListeningExecutor != 
globalSubtaskCallbackListeningExecutor) {
       subtaskCallbackListeningExecutor.shutdown();
     }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index 9dbf5af2d0c..5078d14f5a5 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -36,11 +36,13 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
 import org.apache.tsfile.external.commons.lang3.exception.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 public abstract class PipeAbstractSinkSubtask extends PipeReportableSubtask {
 
@@ -72,9 +74,11 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
   @Override
   public void bindExecutors(
       final ListeningExecutorService subtaskWorkerThreadPoolExecutor,
+      final ListeningScheduledExecutorService subtaskWorkerScheduledExecutor,
       final ExecutorService subtaskCallbackListeningExecutor,
       final PipeSubtaskScheduler subtaskScheduler) {
     this.subtaskWorkerThreadPoolExecutor = subtaskWorkerThreadPoolExecutor;
+    this.subtaskWorkerScheduledExecutor = subtaskWorkerScheduledExecutor;
     this.subtaskCallbackListeningExecutor = subtaskCallbackListeningExecutor;
     this.subtaskScheduler = subtaskScheduler;
   }
@@ -230,9 +234,47 @@ public abstract class PipeAbstractSinkSubtask extends 
PipeReportableSubtask {
       return;
     }
 
+    final long schedulingDelayInMs = getNextSchedulingDelayInMs();
+    if (schedulingDelayInMs > 0) {
+      isSubmitted = true;
+      subtaskWorkerScheduledExecutor.schedule(
+          // Keep the isSubmitted placeholder set before the delayed 
submission to avoid duplicate
+          // schedules, so the delayed task should not mark it again.
+          () -> submitSelfToWorker(false), schedulingDelayInMs, 
TimeUnit.MILLISECONDS);
+      return;
+    }
+
+    submitSelfToWorker(true);
+  }
+
+  @Override
+  protected boolean shouldStopSubmittingSelfInCurrentCall() {
+    return peekSchedulingDelayInMs() > 0;
+  }
+
+  private synchronized void submitSelfToWorker(final boolean 
shouldMarkSubmitted) {
+    if (shouldStopSubmittingSelf.get()) {
+      isSubmitted = false;
+      return;
+    }
+
     final ListenableFuture<Boolean> nextFuture = 
subtaskWorkerThreadPoolExecutor.submit(this);
     registerCallbackHookAfterSubmit(nextFuture);
-    isSubmitted = true;
+    if (shouldMarkSubmitted) {
+      isSubmitted = true;
+    }
+  }
+
+  private long getNextSchedulingDelayInMs() {
+    return consumeSchedulingDelayInMs();
+  }
+
+  protected long peekSchedulingDelayInMs() {
+    return 0;
+  }
+
+  protected long consumeSchedulingDelayInMs() {
+    return 0;
   }
 
   protected void registerCallbackHookAfterSubmit(final 
ListenableFuture<Boolean> future) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java
index 29f651a1ac9..a8c78490a79 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.pipe.api.event.Event;
 
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,6 +47,7 @@ public abstract class PipeSubtask
 
   // For thread pool to execute subtasks
   protected ListeningExecutorService subtaskWorkerThreadPoolExecutor;
+  protected ListeningScheduledExecutorService subtaskWorkerScheduledExecutor;
 
   // For controlling the subtask execution
   protected final AtomicBoolean shouldStopSubmittingSelf = new 
AtomicBoolean(true);
@@ -65,6 +67,7 @@ public abstract class PipeSubtask
 
   public abstract void bindExecutors(
       ListeningExecutorService subtaskWorkerThreadPoolExecutor,
+      ListeningScheduledExecutorService subtaskWorkerScheduledExecutor,
       ExecutorService subtaskCallbackListeningExecutor,
       PipeSubtaskScheduler subtaskScheduler);
 
@@ -81,6 +84,10 @@ public abstract class PipeSubtask
           break;
         }
         hasAtLeastOneEventProcessed = true;
+        // Stop the current call early if the subtask asks to delay its next 
submission.
+        if (shouldStopSubmittingSelfInCurrentCall()) {
+          break;
+        }
       }
     } finally {
       // Reset the scheduler to make sure that the scheduler can schedule again
@@ -105,6 +112,10 @@ public abstract class PipeSubtask
   @SuppressWarnings("squid:S112") // Allow to throw Exception
   protected abstract boolean executeOnce() throws Exception;
 
+  protected boolean shouldStopSubmittingSelfInCurrentCall() {
+    return false;
+  }
+
   @Override
   public synchronized void onSuccess(final Boolean 
hasAtLeastOneEventProcessed) {
     final int totalRetryCount = retryCount.getAndSet(0);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeSinkWithSchedulingDelay.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeSinkWithSchedulingDelay.java
new file mode 100644
index 00000000000..94392279828
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeSinkWithSchedulingDelay.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.sink.protocol;
+
+public interface PipeSinkWithSchedulingDelay {
+
+  long peekSchedulingDelayMs();
+
+  long consumeSchedulingDelayMs();
+}

Reply via email to