This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3bc7b9032f1 Pipe: Fix async sink handshake backoff (#18054)
3bc7b9032f1 is described below
commit 3bc7b9032f1e949a68e69233958e21df1cde45d8
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Jun 30 10:56:13 2026 +0800
Pipe: Fix async sink handshake backoff (#18054)
* Throttle async sink handshakes after failures
* Schedule pipe sink backoff without blocking workers
* update
---
.../apache/iotdb/db/i18n/DataNodePipeMessages.java | 3 ++
.../apache/iotdb/db/i18n/DataNodePipeMessages.java | 2 +
.../subtask/processor/PipeProcessorSubtask.java | 2 +
.../agent/task/subtask/sink/PipeSinkSubtask.java | 32 +++++++++++++++
.../thrift/async/IoTDBDataRegionAsyncSink.java | 45 +++++++++++++++++++++-
.../agent/task/execution/PipeSubtaskExecutor.java | 12 +++++-
.../task/subtask/PipeAbstractSinkSubtask.java | 44 ++++++++++++++++++++-
.../pipe/agent/task/subtask/PipeSubtask.java | 11 ++++++
.../sink/protocol/PipeSinkWithSchedulingDelay.java | 27 +++++++++++++
9 files changed, 175 insertions(+), 3 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
index 050de467f26..13369fc5ee3 100644
---
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
+++
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
@@ -222,6 +222,9 @@ public final class DataNodePipeMessages {
public static final String PIPE_SINK_SUBTASKS_WITH_ATTRIBUTES_IS_BOUNDED =
"Pipe sink subtasks with attributes {} is bounded with sinkExecutor {}
and "
+ "callbackExecutor {}.";
+ public static final String
PIPE_SINK_SUBTASK_DELAYED_TO_AVOID_FREQUENT_HANDSHAKES =
+ "Pipe sink subtask {} is delayed for {} ms before polling events to
avoid frequent "
+ + "handshakes after client borrow failures.";
public static final String PIPE_SKIPPING_TEMPORARY_TSFILE_WHICH_SHOULDN_T =
"Pipe skipping temporary TsFile which shouldn't be transferred: {}";
public static final String PULLED_PIPE_META_FROM_CONFIG_NODE_RECOVERING =
diff --git
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
index 147d977d572..49ee222c3e9 100644
---
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
+++
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
@@ -213,6 +213,8 @@ public final class DataNodePipeMessages {
public static final String PIPE_SINK_SUBTASKS_WITH_ATTRIBUTES_IS_BOUNDED =
"Pipe sink subtasks with attributes {} is bounded with sinkExecutor {}
and "
+ "callbackExecutor {}.";
+ public static final String
PIPE_SINK_SUBTASK_DELAYED_TO_AVOID_FREQUENT_HANDSHAKES =
+ "Pipe sink 子任务 {} 在拉取事件前延迟 {} ms,以避免客户端借用失败后频繁握手。";
public static final String PIPE_SKIPPING_TEMPORARY_TSFILE_WHICH_SHOULDN_T =
"Pipe 跳过不应传输的临时 TsFile:{}";
public static final String PULLED_PIPE_META_FROM_CONFIG_NODE_RECOVERING =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index c37aab5af2c..295f30da3f2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -49,6 +49,7 @@ import
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import org.apache.tsfile.external.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -104,6 +105,7 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
@Override
public void bindExecutors(
final ListeningExecutorService subtaskWorkerThreadPoolExecutor,
+ final ListeningScheduledExecutorService ignoredScheduledExecutor,
final ExecutorService ignored,
final PipeSubtaskScheduler subtaskScheduler) {
this.subtaskWorkerThreadPoolExecutor = subtaskWorkerThreadPoolExecutor;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 95884b17789..f79c2bb1238 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
import
org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard;
+import org.apache.iotdb.commons.pipe.sink.protocol.PipeSinkWithSchedulingDelay;
import
org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
@@ -162,6 +163,37 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
return true;
}
+ @Override
+ protected long peekSchedulingDelayInMs() {
+ if (!(outputPipeSink instanceof PipeSinkWithSchedulingDelay)) {
+ return 0;
+ }
+
+ return ((PipeSinkWithSchedulingDelay)
outputPipeSink).peekSchedulingDelayMs();
+ }
+
+ @Override
+ protected long consumeSchedulingDelayInMs() {
+ if (!(outputPipeSink instanceof PipeSinkWithSchedulingDelay)) {
+ return 0;
+ }
+
+ final long remainingSchedulingDelayMs =
+ ((PipeSinkWithSchedulingDelay)
outputPipeSink).consumeSchedulingDelayMs();
+ if (remainingSchedulingDelayMs <= 0) {
+ return 0;
+ }
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+
DataNodePipeMessages.PIPE_SINK_SUBTASK_DELAYED_TO_AVOID_FREQUENT_HANDSHAKES,
+ getDisplayTaskID(),
+ remainingSchedulingDelayMs);
+ }
+
+ return remainingSchedulingDelayMs;
+ }
+
private void transferHeartbeatEvent(final PipeHeartbeatEvent event) {
// DO NOT call heartbeat or transfer after closed, or will cause
connection leak
if (isClosed.get()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index 7c85268e6d3..f02f1be3671 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
+import org.apache.iotdb.commons.pipe.sink.protocol.PipeSinkWithSchedulingDelay;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
@@ -102,7 +103,7 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SIN
@TreeModel
@TableModel
-public class IoTDBDataRegionAsyncSink extends IoTDBSink {
+public class IoTDBDataRegionAsyncSink extends IoTDBSink implements
PipeSinkWithSchedulingDelay {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBDataRegionAsyncSink.class);
@@ -130,6 +131,8 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
// use these variables to prevent reference count leaks under some corner
cases when closing
private final AtomicBoolean isClosed = new AtomicBoolean(false);
+ private int consecutiveHandshakeFailureCount = 0;
+ private final AtomicLong schedulingDelayMs = new AtomicLong(0);
private final Map<PipeTransferTrackableHandler,
PipeTransferTrackableHandler> pendingHandlers =
new ConcurrentHashMap<>();
@@ -352,8 +355,10 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
AsyncPipeDataTransferServiceClient client = null;
try {
client = clientManager.borrowClient(endPoint);
+ markHandshakeSucceeded();
pipeTransferTabletBatchEventHandler.transfer(client);
} catch (final Exception ex) {
+ markSchedulingDelayIfHandshakeFailed(client);
logOnClientException(client, ex);
pipeTransferTabletBatchEventHandler.onError(ex);
}
@@ -365,8 +370,10 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
AsyncPipeDataTransferServiceClient client = null;
try {
client = clientManager.borrowClient(deviceId);
+ markHandshakeSucceeded();
pipeTransferInsertNodeReqHandler.transfer(client);
} catch (final Exception ex) {
+ markSchedulingDelayIfHandshakeFailed(client);
logOnClientException(client, ex);
pipeTransferInsertNodeReqHandler.onError(ex);
}
@@ -377,8 +384,10 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
AsyncPipeDataTransferServiceClient client = null;
try {
client = clientManager.borrowClient(deviceId);
+ markHandshakeSucceeded();
pipeTransferTabletReqHandler.transfer(client);
} catch (final Exception ex) {
+ markSchedulingDelayIfHandshakeFailed(client);
logOnClientException(client, ex);
pipeTransferTabletReqHandler.onError(ex);
}
@@ -454,8 +463,10 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
AsyncPipeDataTransferServiceClient client = null;
try {
client = transferTsFileClientManager.borrowClient();
+ markHandshakeSucceeded();
pipeTransferTsFileHandler.transfer(transferTsFileClientManager, client);
} catch (final Exception ex) {
+ markSchedulingDelayIfHandshakeFailed(client);
logOnClientException(client, ex);
pipeTransferTsFileHandler.onError(ex);
} finally {
@@ -555,6 +566,38 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
}
}
+ private void markHandshakeSucceeded() {
+ consecutiveHandshakeFailureCount = 0;
+ }
+
+ private void markSchedulingDelayIfHandshakeFailed(
+ final AsyncPipeDataTransferServiceClient client) {
+ if (client != null) {
+ return;
+ }
+
+ if (++consecutiveHandshakeFailureCount <
getSchedulingDelayFailureThreshold()) {
+ return;
+ }
+
+ schedulingDelayMs.accumulateAndGet(
+ PipeConfig.getInstance().getPipeSinkRetryIntervalMs(), Math::max);
+ }
+
+ private int getSchedulingDelayFailureThreshold() {
+ return Math.max(1, nodeUrls.size() << 1);
+ }
+
+ @Override
+ public long peekSchedulingDelayMs() {
+ return schedulingDelayMs.get();
+ }
+
+ @Override
+ public long consumeSchedulingDelayMs() {
+ return schedulingDelayMs.getAndSet(0);
+ }
+
/**
* Transfer queued {@link Event}s which are waiting for retry.
*
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
index 4782988868a..7fe169f7a38 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.commons.pipe.agent.task.subtask.PipeSubtask;
import org.apache.iotdb.commons.utils.TestOnly;
import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,6 +38,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -54,6 +56,7 @@ public abstract class PipeSubtaskExecutor {
protected final WrappedThreadPoolExecutor underlyingThreadPool;
protected final ListeningExecutorService subtaskWorkerThreadPoolExecutor;
+ protected final ListeningScheduledExecutorService
subtaskWorkerScheduledExecutor;
private final Map<String, PipeSubtask> registeredIdSubtaskMapper;
@@ -90,6 +93,9 @@ public abstract class PipeSubtaskExecutor {
underlyingThreadPool.disableErrorLog();
}
subtaskWorkerThreadPoolExecutor =
MoreExecutors.listeningDecorator(underlyingThreadPool);
+ final ScheduledExecutorService underlyingScheduledExecutor =
+
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(workingThreadName +
"-Scheduler");
+ subtaskWorkerScheduledExecutor =
MoreExecutors.listeningDecorator(underlyingScheduledExecutor);
subtaskCallbackListeningExecutor =
Objects.nonNull(callbackThreadName)
? IoTDBThreadPoolFactory.newSingleThreadExecutor(
@@ -112,7 +118,10 @@ public abstract class PipeSubtaskExecutor {
registeredIdSubtaskMapper.put(subtask.getTaskID(), subtask);
subtask.bindExecutors(
- subtaskWorkerThreadPoolExecutor, subtaskCallbackListeningExecutor,
schedulerSupplier(this));
+ subtaskWorkerThreadPoolExecutor,
+ subtaskWorkerScheduledExecutor,
+ subtaskCallbackListeningExecutor,
+ schedulerSupplier(this));
}
private static String getSafeSubtaskStr(final String subtaskID) {
@@ -191,6 +200,7 @@ public abstract class PipeSubtaskExecutor {
}
subtaskWorkerThreadPoolExecutor.shutdown();
+ subtaskWorkerScheduledExecutor.shutdown();
if (subtaskCallbackListeningExecutor !=
globalSubtaskCallbackListeningExecutor) {
subtaskCallbackListeningExecutor.shutdown();
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
index 9dbf5af2d0c..5078d14f5a5 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java
@@ -36,11 +36,13 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import org.apache.tsfile.external.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
public abstract class PipeAbstractSinkSubtask extends PipeReportableSubtask {
@@ -72,9 +74,11 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
@Override
public void bindExecutors(
final ListeningExecutorService subtaskWorkerThreadPoolExecutor,
+ final ListeningScheduledExecutorService subtaskWorkerScheduledExecutor,
final ExecutorService subtaskCallbackListeningExecutor,
final PipeSubtaskScheduler subtaskScheduler) {
this.subtaskWorkerThreadPoolExecutor = subtaskWorkerThreadPoolExecutor;
+ this.subtaskWorkerScheduledExecutor = subtaskWorkerScheduledExecutor;
this.subtaskCallbackListeningExecutor = subtaskCallbackListeningExecutor;
this.subtaskScheduler = subtaskScheduler;
}
@@ -230,9 +234,47 @@ public abstract class PipeAbstractSinkSubtask extends
PipeReportableSubtask {
return;
}
+ final long schedulingDelayInMs = getNextSchedulingDelayInMs();
+ if (schedulingDelayInMs > 0) {
+ isSubmitted = true;
+ subtaskWorkerScheduledExecutor.schedule(
+ // Keep the isSubmitted placeholder set before the delayed
submission to avoid duplicate
+ // schedules, so the delayed task should not mark it again.
+ () -> submitSelfToWorker(false), schedulingDelayInMs,
TimeUnit.MILLISECONDS);
+ return;
+ }
+
+ submitSelfToWorker(true);
+ }
+
+ @Override
+ protected boolean shouldStopSubmittingSelfInCurrentCall() {
+ return peekSchedulingDelayInMs() > 0;
+ }
+
+ private synchronized void submitSelfToWorker(final boolean
shouldMarkSubmitted) {
+ if (shouldStopSubmittingSelf.get()) {
+ isSubmitted = false;
+ return;
+ }
+
final ListenableFuture<Boolean> nextFuture =
subtaskWorkerThreadPoolExecutor.submit(this);
registerCallbackHookAfterSubmit(nextFuture);
- isSubmitted = true;
+ if (shouldMarkSubmitted) {
+ isSubmitted = true;
+ }
+ }
+
+ private long getNextSchedulingDelayInMs() {
+ return consumeSchedulingDelayInMs();
+ }
+
+ protected long peekSchedulingDelayInMs() {
+ return 0;
+ }
+
+ protected long consumeSchedulingDelayInMs() {
+ return 0;
}
protected void registerCallbackHookAfterSubmit(final
ListenableFuture<Boolean> future) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java
index 29f651a1ac9..a8c78490a79 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.pipe.api.event.Event;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,6 +47,7 @@ public abstract class PipeSubtask
// For thread pool to execute subtasks
protected ListeningExecutorService subtaskWorkerThreadPoolExecutor;
+ protected ListeningScheduledExecutorService subtaskWorkerScheduledExecutor;
// For controlling the subtask execution
protected final AtomicBoolean shouldStopSubmittingSelf = new
AtomicBoolean(true);
@@ -65,6 +67,7 @@ public abstract class PipeSubtask
public abstract void bindExecutors(
ListeningExecutorService subtaskWorkerThreadPoolExecutor,
+ ListeningScheduledExecutorService subtaskWorkerScheduledExecutor,
ExecutorService subtaskCallbackListeningExecutor,
PipeSubtaskScheduler subtaskScheduler);
@@ -81,6 +84,10 @@ public abstract class PipeSubtask
break;
}
hasAtLeastOneEventProcessed = true;
+ // Stop the current call early if the subtask asks to delay its next
submission.
+ if (shouldStopSubmittingSelfInCurrentCall()) {
+ break;
+ }
}
} finally {
// Reset the scheduler to make sure that the scheduler can schedule again
@@ -105,6 +112,10 @@ public abstract class PipeSubtask
@SuppressWarnings("squid:S112") // Allow to throw Exception
protected abstract boolean executeOnce() throws Exception;
+ protected boolean shouldStopSubmittingSelfInCurrentCall() {
+ return false;
+ }
+
@Override
public synchronized void onSuccess(final Boolean
hasAtLeastOneEventProcessed) {
final int totalRetryCount = retryCount.getAndSet(0);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeSinkWithSchedulingDelay.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeSinkWithSchedulingDelay.java
new file mode 100644
index 00000000000..94392279828
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeSinkWithSchedulingDelay.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.sink.protocol;
+
+public interface PipeSinkWithSchedulingDelay {
+
+ long peekSchedulingDelayMs();
+
+ long consumeSchedulingDelayMs();
+}