This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 352da912e21 Throttle async pipe sink on receiver reject (#17928)
(#17966)
352da912e21 is described below
commit 352da912e213b40089fd677d583da8de732ab71f
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 18 11:02:31 2026 +0800
Throttle async pipe sink on receiver reject (#17928) (#17966)
* Throttle async pipe sink on receiver reject
* Handle review
(cherry picked from commit 757f9007fdc801cb692f934ee38d557c04320191)
---
.../thrift/async/IoTDBDataRegionAsyncSink.java | 116 +++++++++++++++++++++
.../handler/PipeTransferTrackableHandler.java | 54 +++++++---
.../handler/PipeTransferTrackableHandlerTest.java | 38 ++++++-
3 files changed, 192 insertions(+), 16 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index b61cb4543c7..77d3d02864f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.sink.protocol.thrift.async;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.ThriftClient;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
@@ -59,6 +60,7 @@ import
org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import com.google.common.collect.ImmutableSet;
@@ -82,6 +84,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE;
@@ -124,6 +127,8 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
new ConcurrentHashMap<>();
private final Set<CommitterKey> droppedPipeTaskKeys =
ConcurrentHashMap.newKeySet();
+ private final Map<String, ReceiverTemporaryUnavailableBackoff>
receiverBackoffMap =
+ new ConcurrentHashMap<>();
private boolean enableSendTsFileLimit;
private volatile boolean isConnectionException;
@@ -719,6 +724,83 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
return enableSendTsFileLimit;
}
+ public void waitIfReceiverTemporarilyUnavailable(final TEndPoint endPoint) {
+ final String endPointKey = format(endPoint);
+ if (Objects.isNull(endPointKey)) {
+ return;
+ }
+
+ final ReceiverTemporaryUnavailableBackoff backoff =
receiverBackoffMap.get(endPointKey);
+ if (Objects.isNull(backoff)) {
+ return;
+ }
+
+ while (!isClosed.get()) {
+ final long waitTimeInMs = backoff.getRemainingWaitTimeInMs();
+ if (waitTimeInMs <= 0) {
+ return;
+ }
+
+ try {
+ Thread.sleep(waitTimeInMs);
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ }
+
+ public void recordReceiverStatus(final TEndPoint endPoint, final TSStatus
status) {
+ final String endPointKey = format(endPoint);
+ if (Objects.isNull(endPointKey) || Objects.isNull(status)) {
+ return;
+ }
+
+ if (isReceiverTemporarilyUnavailable(status)) {
+ final long backoffTimeInMs =
+ receiverBackoffMap
+ .computeIfAbsent(endPointKey, key -> new
ReceiverTemporaryUnavailableBackoff())
+ .markTemporarilyUnavailable();
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "Receiver {} is temporarily unavailable, throttle requests for {}
ms. Status: {}",
+ endPointKey,
+ backoffTimeInMs,
+ status);
+ }
+ } else if (isSuccess(status)) {
+ final ReceiverTemporaryUnavailableBackoff backoff =
receiverBackoffMap.get(endPointKey);
+ if (Objects.nonNull(backoff) && backoff.getRemainingWaitTimeInMs() <= 0)
{
+ receiverBackoffMap.remove(endPointKey, backoff);
+ }
+ }
+ }
+
+ private static boolean isReceiverTemporarilyUnavailable(final TSStatus
status) {
+ if (Objects.isNull(status)) {
+ return false;
+ }
+
+ final int statusCode = status.getCode();
+ if (statusCode ==
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()
+ || statusCode == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
+ return true;
+ }
+
+ return status.isSetSubStatus()
+ && status.getSubStatus().stream()
+
.anyMatch(IoTDBDataRegionAsyncSink::isReceiverTemporarilyUnavailable);
+ }
+
+ private static boolean isSuccess(final TSStatus status) {
+ return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ || status.getCode() ==
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode();
+ }
+
+ private static String format(final TEndPoint endPoint) {
+ return Objects.isNull(endPoint) ? null : endPoint.getIp() + ":" +
endPoint.getPort();
+ }
+
//////////////////////////// Operations for close
////////////////////////////
@Override
@@ -793,6 +875,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
// clear reference count of events in retry queue after closing async
client
clearRetryEventsReferenceCount();
droppedPipeTaskKeys.clear();
+ receiverBackoffMap.clear();
super.close();
}
@@ -894,4 +977,37 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
tabletBatchBuilder.setEventSizeHistogram(eventSizeHistogram);
}
}
+
+ private static class ReceiverTemporaryUnavailableBackoff {
+
+ private final long maxBackoffTimeInMs =
+ Math.max(0,
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalMaxMs());
+ private final AtomicLong currentBackoffTimeInMs =
+ new AtomicLong(
+ Math.min(
+ Math.max(1,
PipeConfig.getInstance().getPipeSinkSubtaskSleepIntervalInitMs()),
+ maxBackoffTimeInMs));
+ private final AtomicLong nextAvailableTimeInMs = new AtomicLong(0);
+
+ private long markTemporarilyUnavailable() {
+ final long backoffTimeInMs = currentBackoffTimeInMs.get();
+ nextAvailableTimeInMs.updateAndGet(
+ current -> Math.max(current, System.currentTimeMillis() +
backoffTimeInMs));
+ currentBackoffTimeInMs.updateAndGet(this::getNextBackoffTimeInMs);
+ return backoffTimeInMs;
+ }
+
+ private long getRemainingWaitTimeInMs() {
+ return nextAvailableTimeInMs.get() - System.currentTimeMillis();
+ }
+
+ private long getNextBackoffTimeInMs(final long currentBackoffTimeInMs) {
+ if (currentBackoffTimeInMs <= 0 || currentBackoffTimeInMs >=
maxBackoffTimeInMs) {
+ return maxBackoffTimeInMs;
+ }
+ return currentBackoffTimeInMs >= maxBackoffTimeInMs -
currentBackoffTimeInMs
+ ? maxBackoffTimeInMs
+ : currentBackoffTimeInMs << 1;
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
index 40b05066a93..6c72335b1d2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
@@ -50,6 +50,10 @@ public abstract class PipeTransferTrackableHandler
@Override
public void onComplete(final TPipeTransferResp response) {
+ if (Objects.nonNull(client) && Objects.nonNull(response)) {
+ sink.recordReceiverStatus(client.getEndPoint(), response.getStatus());
+ }
+
if (sink.isClosed()) {
clearEventsReferenceCount();
sink.eliminateHandler(this, true);
@@ -99,27 +103,39 @@ public abstract class PipeTransferTrackableHandler
}
// track handler before checking if connector is closed
sink.trackHandler(this);
- if (sink.isClosed()) {
- clearEventsReferenceCount();
- sink.eliminateHandler(this, true);
- client.setShouldReturnSelf(true);
- client.returnSelf(
- (e) -> {
- if (e instanceof IllegalStateException) {
- PipeLogger.log(
- LOGGER::info,
- "Illegal state when return the client to object pool, maybe
the pool is already cleared. Will ignore.");
- return true;
- }
- return false;
- });
- this.client = null;
+ if (returnFalseIfSinkIsClosed(client)) {
+ return false;
+ }
+ sink.waitIfReceiverTemporarilyUnavailable(client.getEndPoint());
+ if (returnFalseIfSinkIsClosed(client)) {
return false;
}
doTransfer(client, req);
return true;
}
+ private boolean returnFalseIfSinkIsClosed(final
AsyncPipeDataTransferServiceClient client) {
+ if (!sink.isClosed()) {
+ return false;
+ }
+
+ clearEventsReferenceCount();
+ sink.eliminateHandler(this, true);
+ client.setShouldReturnSelf(true);
+ client.returnSelf(
+ (e) -> {
+ if (e instanceof IllegalStateException) {
+ PipeLogger.log(
+ LOGGER::info,
+ "Illegal state when return the client to object pool, maybe
the pool is already cleared. Will ignore.");
+ return true;
+ }
+ return false;
+ });
+ this.client = null;
+ return true;
+ }
+
/**
* @return {@code true} if all transmissions corresponding to the handler
have been completed,
* {@code false} otherwise
@@ -188,6 +204,10 @@ public abstract class PipeTransferTrackableHandler
return;
}
+ if (Objects.nonNull(response)) {
+ sink.recordReceiverStatus(client.getEndPoint(),
response.getStatus());
+ }
+
if (response == null) {
fallbackToWholeRequest(
client,
@@ -251,6 +271,10 @@ public abstract class PipeTransferTrackableHandler
try {
client.setShouldReturnSelf(shouldReturnSelf);
+ sink.waitIfReceiverTemporarilyUnavailable(client.getEndPoint());
+ if (returnFalseIfSinkIsClosed(client)) {
+ return;
+ }
client.pipeTransfer(originalReq, this);
} catch (final Exception e) {
PipeTransferTrackableHandler.this.onError(e);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java
index 60b69235085..8e0f7802998 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.conf.CommonConfig;
@@ -38,6 +39,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
import org.mockito.Mockito;
import java.nio.ByteBuffer;
@@ -154,6 +156,36 @@ public class PipeTransferTrackableHandlerTest {
Assert.assertEquals(0, handler.errorCount);
}
+ @Test
+ public void testTransferWaitsForReceiverBackoffAndRecordsStatus() throws
Exception {
+ final IoTDBDataRegionAsyncSink sink =
Mockito.mock(IoTDBDataRegionAsyncSink.class);
+ final AsyncPipeDataTransferServiceClient client =
+ Mockito.mock(AsyncPipeDataTransferServiceClient.class);
+ final TEndPoint endPoint = new TEndPoint("127.0.0.1", 6667);
+ final TSStatus status =
+ new TSStatus()
+
.setCode(TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode());
+
+ Mockito.when(client.getEndPoint()).thenReturn(endPoint);
+ Mockito.doAnswer(
+ invocation -> {
+ final AsyncMethodCallback<TPipeTransferResp> callback =
invocation.getArgument(1);
+ callback.onComplete(resp(status));
+ return null;
+ })
+ .when(client)
+ .pipeTransfer(Mockito.any(TPipeTransferReq.class), Mockito.any());
+
+ final TestPipeTransferTrackableHandler handler = new
TestPipeTransferTrackableHandler(sink);
+
+ handler.transfer(client, createReq(1));
+
+ final InOrder inOrder = Mockito.inOrder(sink, client);
+ inOrder.verify(sink).waitIfReceiverTemporarilyUnavailable(endPoint);
+ inOrder.verify(client).pipeTransfer(Mockito.any(TPipeTransferReq.class),
Mockito.any());
+ Mockito.verify(sink).recordReceiverStatus(endPoint, status);
+ }
+
private static TPipeTransferReq createReq(final int bodySize) {
final byte[] body = new byte[bodySize];
for (int i = 0; i < body.length; ++i) {
@@ -168,8 +200,12 @@ public class PipeTransferTrackableHandlerTest {
}
private static TPipeTransferResp successResp() {
+ return resp(new
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ }
+
+ private static TPipeTransferResp resp(final TSStatus status) {
final TPipeTransferResp resp = new TPipeTransferResp();
- resp.setStatus(new
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ resp.setStatus(status);
return resp;
}