This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0d382d85ddf Pipe: Implemented slice logic for async sink (#17668)
0d382d85ddf is described below
commit 0d382d85ddf03a5911e391a6a1dff12bb8340443
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 14 23:16:42 2026 +0800
Pipe: Implemented slice logic for async sink (#17668)
* sl
* chew
---
.../PipeTransferTabletBatchEventHandler.java | 2 +-
.../PipeTransferTabletInsertNodeEventHandler.java | 2 +-
.../handler/PipeTransferTabletRawEventHandler.java | 2 +-
.../handler/PipeTransferTrackableHandler.java | 127 ++++++++++++
.../async/handler/PipeTransferTsFileHandler.java | 2 +-
.../handler/PipeTransferTrackableHandlerTest.java | 221 +++++++++++++++++++++
.../async/AsyncPipeDataTransferServiceClient.java | 4 +
.../commons/pipe/sink/client/IoTDBSyncClient.java | 31 +--
.../thrift/common/PipeTransferSliceReqBuilder.java | 73 +++++++
.../common/PipeTransferSliceReqBuilderTest.java | 106 ++++++++++
10 files changed, 542 insertions(+), 28 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index 52c52b1038e..e6899dee3c5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -129,7 +129,7 @@ public class PipeTransferTabletBatchEventHandler extends
PipeTransferTrackableHa
protected void doTransfer(
final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq
req)
throws TException {
- client.pipeTransfer(req, this);
+ transferWithOptionalRequestSlicing(client, req);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
index 912a1e724f7..56d1ce41b02 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
@@ -41,7 +41,7 @@ public class PipeTransferTabletInsertNodeEventHandler
protected void doTransfer(
final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq
req)
throws TException {
- client.pipeTransfer(req, this);
+ transferWithOptionalRequestSlicing(client, req);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
index b64e446827a..eb4677de358 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
@@ -40,7 +40,7 @@ public class PipeTransferTabletRawEventHandler extends
PipeTransferTabletInserti
protected void doTransfer(
final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq
req)
throws TException {
- client.pipeTransfer(req, this);
+ transferWithOptionalRequestSlicing(client, req);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
index a8b4a3b7a79..a0e6ad73fe7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
@@ -21,7 +21,11 @@ package
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler;
import org.apache.iotdb.commons.client.ThriftClient;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import
org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferSliceReqBuilder;
import
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
@@ -126,8 +130,131 @@ public abstract class PipeTransferTrackableHandler
final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq
req)
throws TException;
+ protected final void transferWithOptionalRequestSlicing(
+ final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq
req)
+ throws TException {
+ final int bodySizeLimit = PipeTransferSliceReqBuilder.getBodySizeLimit();
+ if (!PipeTransferSliceReqBuilder.shouldSlice(req, bodySizeLimit)) {
+ client.pipeTransfer(req, this);
+ return;
+ }
+
+ LOGGER.warn(
+ "The body size of the request is too large. The request will be
sliced. Origin req: {}-{}. "
+ + "Request body size: {}, threshold: {}",
+ req.getVersion(),
+ req.getType(),
+ req.body.limit(),
+ bodySizeLimit);
+
+ final int sliceCount = PipeTransferSliceReqBuilder.getSliceCount(req,
bodySizeLimit);
+ final boolean shouldReturnSelf = client.shouldReturnSelf();
+ try {
+ transferSlicedRequest(
+ client,
+ req,
+ shouldReturnSelf,
+ PipeTransferSliceReqBuilder.nextSliceOrderId(),
+ 0,
+ sliceCount,
+ bodySizeLimit);
+ } catch (final Exception e) {
+ fallbackToWholeRequest(client, req, shouldReturnSelf, e);
+ }
+ }
+
public abstract void clearEventsReferenceCount();
+ private void transferSlicedRequest(
+ final AsyncPipeDataTransferServiceClient client,
+ final TPipeTransferReq originalReq,
+ final boolean shouldReturnSelf,
+ final int sliceOrderId,
+ final int sliceIndex,
+ final int sliceCount,
+ final int bodySizeLimit)
+ throws Exception {
+ client.setShouldReturnSelf(shouldReturnSelf && sliceIndex == sliceCount -
1);
+ client.pipeTransfer(
+ PipeTransferSliceReqBuilder.buildSliceReq(
+ originalReq, sliceOrderId, sliceIndex, sliceCount, bodySizeLimit),
+ new AsyncMethodCallback<TPipeTransferResp>() {
+ @Override
+ public void onComplete(final TPipeTransferResp response) {
+ if (sink.isClosed() || sliceIndex == sliceCount - 1) {
+ PipeTransferTrackableHandler.this.onComplete(response);
+ return;
+ }
+
+ if (response == null) {
+ fallbackToWholeRequest(
+ client,
+ originalReq,
+ shouldReturnSelf,
+ new PipeException("TPipeTransferResp is null when
transferring slice."));
+ return;
+ }
+
+ if (response.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ fallbackToWholeRequest(
+ client,
+ originalReq,
+ shouldReturnSelf,
+ new PipeConnectionException(
+ String.format(
+ "Failed to transfer slice. Origin req: %s-%s, slice
index: %d, slice count: %d. Reason: %s",
+ originalReq.getVersion(),
+ originalReq.getType(),
+ sliceIndex,
+ sliceCount,
+ response.getStatus())));
+ return;
+ }
+
+ try {
+ transferSlicedRequest(
+ client,
+ originalReq,
+ shouldReturnSelf,
+ sliceOrderId,
+ sliceIndex + 1,
+ sliceCount,
+ bodySizeLimit);
+ } catch (final Exception e) {
+ fallbackToWholeRequest(client, originalReq, shouldReturnSelf, e);
+ }
+ }
+
+ @Override
+ public void onError(final Exception exception) {
+ if (sink.isClosed() || sliceIndex == sliceCount - 1) {
+ PipeTransferTrackableHandler.this.onError(exception);
+ return;
+ }
+ fallbackToWholeRequest(client, originalReq, shouldReturnSelf,
exception);
+ }
+ });
+ }
+
+ private void fallbackToWholeRequest(
+ final AsyncPipeDataTransferServiceClient client,
+ final TPipeTransferReq originalReq,
+ final boolean shouldReturnSelf,
+ final Exception exception) {
+ LOGGER.warn(
+ "Failed to transfer slice. Origin req: {}-{}. Retry the whole
transfer.",
+ originalReq.getVersion(),
+ originalReq.getType(),
+ exception);
+
+ try {
+ client.setShouldReturnSelf(shouldReturnSelf);
+ client.pipeTransfer(originalReq, this);
+ } catch (final Exception e) {
+ PipeTransferTrackableHandler.this.onError(e);
+ }
+ }
+
public void closeClient() {
if (Objects.isNull(client)) {
return;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 35a28d1413a..b6d3785de7d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -449,7 +449,7 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
return;
}
- client.pipeTransfer(req, this);
+ transferWithOptionalRequestSlicing(client, req);
}
@Override
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java
new file mode 100644
index 00000000000..60b69235085
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandlerTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.protocol.thrift.async.handler;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
+import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
+import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq;
+import
org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class PipeTransferTrackableHandlerTest {
+
+ private final CommonConfig commonConfig =
CommonDescriptor.getInstance().getConfig();
+
+ private int originalRequestSliceThresholdBytes;
+
+ @Before
+ public void setUp() {
+ originalRequestSliceThresholdBytes =
commonConfig.getPipeSinkRequestSliceThresholdBytes();
+ commonConfig.setPipeSinkRequestSliceThresholdBytes(4);
+ }
+
+ @After
+ public void tearDown() {
+
commonConfig.setPipeSinkRequestSliceThresholdBytes(originalRequestSliceThresholdBytes);
+ }
+
+ @Test
+ public void testLargeRequestWillBeSlicedForAsyncTransfer() throws Exception {
+ final IoTDBDataRegionAsyncSink sink =
Mockito.mock(IoTDBDataRegionAsyncSink.class);
+ final AsyncPipeDataTransferServiceClient client =
+ Mockito.mock(AsyncPipeDataTransferServiceClient.class);
+ Mockito.when(client.shouldReturnSelf()).thenReturn(true);
+
+ final List<TPipeTransferReq> transferredRequests = new ArrayList<>();
+ Mockito.doAnswer(
+ invocation -> {
+ final TPipeTransferReq req = invocation.getArgument(0);
+ final AsyncMethodCallback<TPipeTransferResp> callback =
invocation.getArgument(1);
+ transferredRequests.add(req);
+ callback.onComplete(successResp());
+ return null;
+ })
+ .when(client)
+ .pipeTransfer(Mockito.any(TPipeTransferReq.class), Mockito.any());
+
+ final TestPipeTransferTrackableHandler handler = new
TestPipeTransferTrackableHandler(sink);
+ final TPipeTransferReq originalReq = createReq(10);
+
+ handler.transfer(client, originalReq);
+
+ Assert.assertEquals(3, transferredRequests.size());
+ Assert.assertEquals(1, handler.completeCount);
+ Assert.assertEquals(0, handler.errorCount);
+
+ final PipeTransferSliceReq firstSlice =
+ PipeTransferSliceReq.fromTPipeTransferReq(transferredRequests.get(0));
+ final PipeTransferSliceReq secondSlice =
+ PipeTransferSliceReq.fromTPipeTransferReq(transferredRequests.get(1));
+ final PipeTransferSliceReq thirdSlice =
+ PipeTransferSliceReq.fromTPipeTransferReq(transferredRequests.get(2));
+
+ Assert.assertEquals(
+ PipeRequestType.TRANSFER_SLICE.getType(),
transferredRequests.get(0).getType());
+ Assert.assertEquals(firstSlice.getOrderId(), secondSlice.getOrderId());
+ Assert.assertEquals(firstSlice.getOrderId(), thirdSlice.getOrderId());
+ Assert.assertEquals(originalReq.getType(), firstSlice.getOriginReqType());
+ Assert.assertEquals(10, firstSlice.getOriginBodySize());
+ Assert.assertEquals(3, firstSlice.getSliceCount());
+ Assert.assertEquals(0, firstSlice.getSliceIndex());
+ Assert.assertEquals(1, secondSlice.getSliceIndex());
+ Assert.assertEquals(2, thirdSlice.getSliceIndex());
+ Assert.assertEquals(4, firstSlice.getSliceBody().length);
+ Assert.assertEquals(4, secondSlice.getSliceBody().length);
+ Assert.assertEquals(2, thirdSlice.getSliceBody().length);
+
+ final ArgumentCaptor<Boolean> shouldReturnSelfCaptor =
ArgumentCaptor.forClass(Boolean.class);
+ Mockito.verify(client,
Mockito.times(3)).setShouldReturnSelf(shouldReturnSelfCaptor.capture());
+ Assert.assertEquals(Arrays.asList(false, false, true),
shouldReturnSelfCaptor.getAllValues());
+ }
+
+ @Test
+ public void testLargeRequestFallsBackToWholeRequestWhenSliceTransferFails()
throws Exception {
+ final IoTDBDataRegionAsyncSink sink =
Mockito.mock(IoTDBDataRegionAsyncSink.class);
+ final AsyncPipeDataTransferServiceClient client =
+ Mockito.mock(AsyncPipeDataTransferServiceClient.class);
+ Mockito.when(client.shouldReturnSelf()).thenReturn(true);
+
+ final List<TPipeTransferReq> transferredRequests = new ArrayList<>();
+ Mockito.doAnswer(
+ invocation -> {
+ final TPipeTransferReq req = invocation.getArgument(0);
+ final AsyncMethodCallback<TPipeTransferResp> callback =
invocation.getArgument(1);
+ transferredRequests.add(req);
+ if (req.getType() == PipeRequestType.TRANSFER_SLICE.getType()) {
+ callback.onComplete(failedResp());
+ } else {
+ callback.onComplete(successResp());
+ }
+ return null;
+ })
+ .when(client)
+ .pipeTransfer(Mockito.any(TPipeTransferReq.class), Mockito.any());
+
+ final TestPipeTransferTrackableHandler handler = new
TestPipeTransferTrackableHandler(sink);
+ final TPipeTransferReq originalReq = createReq(10);
+
+ handler.transfer(client, originalReq);
+
+ Assert.assertEquals(2, transferredRequests.size());
+ Assert.assertEquals(
+ PipeRequestType.TRANSFER_SLICE.getType(),
transferredRequests.get(0).getType());
+ Assert.assertEquals(originalReq.getType(),
transferredRequests.get(1).getType());
+ Assert.assertEquals(originalReq.getVersion(),
transferredRequests.get(1).getVersion());
+ Assert.assertArrayEquals(originalReq.getBody(),
transferredRequests.get(1).getBody());
+ Assert.assertEquals(1, handler.completeCount);
+ Assert.assertEquals(0, handler.errorCount);
+ }
+
+ private static TPipeTransferReq createReq(final int bodySize) {
+ final byte[] body = new byte[bodySize];
+ for (int i = 0; i < body.length; ++i) {
+ body[i] = (byte) i;
+ }
+
+ final TPipeTransferReq req = new TPipeTransferReq();
+ req.version = IoTDBSinkRequestVersion.VERSION_1.getVersion();
+ req.type = (short) 123;
+ req.body = ByteBuffer.wrap(body);
+ return req;
+ }
+
+ private static TPipeTransferResp successResp() {
+ final TPipeTransferResp resp = new TPipeTransferResp();
+ resp.setStatus(new
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ return resp;
+ }
+
+ private static TPipeTransferResp failedResp() {
+ final TPipeTransferResp resp = new TPipeTransferResp();
+ resp.setStatus(
+ new
TSStatus().setCode(TSStatusCode.PIPE_TRANSFER_SLICE_OUT_OF_ORDER.getStatusCode()));
+ return resp;
+ }
+
+ private static class TestPipeTransferTrackableHandler extends
PipeTransferTrackableHandler {
+
+ private int completeCount;
+ private int errorCount;
+
+ private TestPipeTransferTrackableHandler(final IoTDBDataRegionAsyncSink
sink) {
+ super(sink);
+ }
+
+ private void transfer(
+ final AsyncPipeDataTransferServiceClient client, final
TPipeTransferReq req)
+ throws TException {
+ tryTransfer(client, req);
+ }
+
+ @Override
+ protected boolean onCompleteInternal(final TPipeTransferResp response) {
+ completeCount++;
+ return true;
+ }
+
+ @Override
+ protected void onErrorInternal(final Exception exception) {
+ errorCount++;
+ }
+
+ @Override
+ protected void doTransfer(
+ final AsyncPipeDataTransferServiceClient client, final
TPipeTransferReq req)
+ throws TException {
+ transferWithOptionalRequestSlicing(client, req);
+ }
+
+ @Override
+ public void clearEventsReferenceCount() {
+ // Do nothing
+ }
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
index 36295ec8500..b7edc0c1088 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
@@ -135,6 +135,10 @@ public class AsyncPipeDataTransferServiceClient extends
IClientRPCService.AsyncC
this.shouldReturnSelf.set(shouldReturnSelf);
}
+ public boolean shouldReturnSelf() {
+ return shouldReturnSelf.get();
+ }
+
public void setTimeoutDynamically(final int timeout) {
try {
((TNonblockingSocket) ___transport).setTimeout(timeout);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
index b7f42295e6c..1ad5d0a855f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java
@@ -22,9 +22,7 @@ package org.apache.iotdb.commons.pipe.sink.client;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.ThriftClient;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
-import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq;
+import
org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferSliceReqBuilder;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -39,15 +37,11 @@ import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-
public class IoTDBSyncClient extends IClientRPCService.Client
implements ThriftClient, AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBSyncClient.class);
- private static final AtomicInteger SLICE_ORDER_ID_GENERATOR = new
AtomicInteger(0);
-
private final String ipAddress;
private final int port;
private final TEndPoint endPoint;
@@ -100,9 +94,8 @@ public class IoTDBSyncClient extends IClientRPCService.Client
@Override
public TPipeTransferResp pipeTransfer(final TPipeTransferReq req) throws
TException {
- final int bodySizeLimit =
PipeConfig.getInstance().getPipeSinkRequestSliceThresholdBytes();
- if (req.getVersion() != IoTDBSinkRequestVersion.VERSION_1.getVersion()
- || req.body.limit() < bodySizeLimit) {
+ final int bodySizeLimit = PipeTransferSliceReqBuilder.getBodySizeLimit();
+ if (!PipeTransferSliceReqBuilder.shouldSlice(req, bodySizeLimit)) {
return super.pipeTransfer(req);
}
@@ -115,23 +108,13 @@ public class IoTDBSyncClient extends
IClientRPCService.Client
bodySizeLimit);
try {
- final int sliceOrderId = SLICE_ORDER_ID_GENERATOR.getAndIncrement();
- // Slice the buffer to avoid the buffer being too large
- final int sliceCount =
- req.body.limit() / bodySizeLimit + (req.body.limit() % bodySizeLimit
== 0 ? 0 : 1);
+ final int sliceOrderId = PipeTransferSliceReqBuilder.nextSliceOrderId();
+ final int sliceCount = PipeTransferSliceReqBuilder.getSliceCount(req,
bodySizeLimit);
for (int i = 0; i < sliceCount; ++i) {
- final int startIndexInBody = i * bodySizeLimit;
- final int endIndexInBody = Math.min((i + 1) * bodySizeLimit,
req.body.limit());
final TPipeTransferResp sliceResp =
super.pipeTransfer(
- PipeTransferSliceReq.toTPipeTransferReq(
- sliceOrderId,
- req.getType(),
- i,
- sliceCount,
- req.body.duplicate(),
- startIndexInBody,
- endIndexInBody));
+ PipeTransferSliceReqBuilder.buildSliceReq(
+ req, sliceOrderId, i, sliceCount, bodySizeLimit));
if (i == sliceCount - 1) {
return sliceResp;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilder.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilder.java
new file mode 100644
index 00000000000..b108d6f1d3a
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilder.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.sink.payload.thrift.common;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
+import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public final class PipeTransferSliceReqBuilder {
+
+ private static final AtomicInteger SLICE_ORDER_ID_GENERATOR = new
AtomicInteger(0);
+
+ private PipeTransferSliceReqBuilder() {
+ // Utility class
+ }
+
+ public static int getBodySizeLimit() {
+ return PipeConfig.getInstance().getPipeSinkRequestSliceThresholdBytes();
+ }
+
+ public static boolean shouldSlice(final TPipeTransferReq req, final int
bodySizeLimit) {
+ return req.getVersion() == IoTDBSinkRequestVersion.VERSION_1.getVersion()
+ && req.body.limit() >= bodySizeLimit;
+ }
+
+ public static int nextSliceOrderId() {
+ return SLICE_ORDER_ID_GENERATOR.getAndIncrement();
+ }
+
+ public static int getSliceCount(final TPipeTransferReq req, final int
bodySizeLimit) {
+ return req.body.limit() / bodySizeLimit + (req.body.limit() %
bodySizeLimit == 0 ? 0 : 1);
+ }
+
+ public static PipeTransferSliceReq buildSliceReq(
+ final TPipeTransferReq originalReq,
+ final int sliceOrderId,
+ final int sliceIndex,
+ final int sliceCount,
+ final int bodySizeLimit)
+ throws IOException {
+ final int startIndexInBody = sliceIndex * bodySizeLimit;
+ final int endIndexInBody = Math.min((sliceIndex + 1) * bodySizeLimit,
originalReq.body.limit());
+ return PipeTransferSliceReq.toTPipeTransferReq(
+ sliceOrderId,
+ originalReq.getType(),
+ sliceIndex,
+ sliceCount,
+ originalReq.body.duplicate(),
+ startIndexInBody,
+ endIndexInBody);
+ }
+}
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java
new file mode 100644
index 00000000000..290ce397980
--- /dev/null
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.sink.payload.thrift.common;
+
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
+import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+public class PipeTransferSliceReqBuilderTest {
+
+ private final CommonConfig commonConfig =
CommonDescriptor.getInstance().getConfig();
+
+ private int originalRequestSliceThresholdBytes;
+
+ @Before
+ public void setUp() {
+ originalRequestSliceThresholdBytes =
commonConfig.getPipeSinkRequestSliceThresholdBytes();
+ commonConfig.setPipeSinkRequestSliceThresholdBytes(4);
+ }
+
+ @After
+ public void tearDown() {
+
commonConfig.setPipeSinkRequestSliceThresholdBytes(originalRequestSliceThresholdBytes);
+ }
+
+ @Test
+ public void testBuildSliceReq() throws Exception {
+ final TPipeTransferReq req =
createReq(IoTDBSinkRequestVersion.VERSION_1.getVersion(), 10);
+ final int bodySizeLimit = PipeTransferSliceReqBuilder.getBodySizeLimit();
+
+ Assert.assertTrue(PipeTransferSliceReqBuilder.shouldSlice(req,
bodySizeLimit));
+ Assert.assertEquals(3, PipeTransferSliceReqBuilder.getSliceCount(req,
bodySizeLimit));
+
+ final PipeTransferSliceReq firstSlice =
+ PipeTransferSliceReqBuilder.buildSliceReq(req, 123, 0, 3,
bodySizeLimit);
+ final PipeTransferSliceReq secondSlice =
+ PipeTransferSliceReqBuilder.buildSliceReq(req, 123, 1, 3,
bodySizeLimit);
+ final PipeTransferSliceReq thirdSlice =
+ PipeTransferSliceReqBuilder.buildSliceReq(req, 123, 2, 3,
bodySizeLimit);
+
+ Assert.assertArrayEquals(new byte[] {0, 1, 2, 3},
firstSlice.getSliceBody());
+ Assert.assertArrayEquals(new byte[] {4, 5, 6, 7},
secondSlice.getSliceBody());
+ Assert.assertArrayEquals(new byte[] {8, 9}, thirdSlice.getSliceBody());
+ Assert.assertEquals(0, firstSlice.getSliceIndex());
+ Assert.assertEquals(1, secondSlice.getSliceIndex());
+ Assert.assertEquals(2, thirdSlice.getSliceIndex());
+ Assert.assertEquals(3, firstSlice.getSliceCount());
+ Assert.assertEquals(req.getType(), firstSlice.getOriginReqType());
+ Assert.assertEquals(10, firstSlice.getOriginBodySize());
+ }
+
+ @Test
+ public void testShouldSliceOnlyForVersion1RequestsAboveThreshold() {
+ final int bodySizeLimit = PipeTransferSliceReqBuilder.getBodySizeLimit();
+
+ Assert.assertFalse(
+ PipeTransferSliceReqBuilder.shouldSlice(
+ createReq(IoTDBSinkRequestVersion.VERSION_1.getVersion(), 3),
bodySizeLimit));
+ Assert.assertFalse(
+ PipeTransferSliceReqBuilder.shouldSlice(
+ createReq((byte) (IoTDBSinkRequestVersion.VERSION_1.getVersion() +
1), 10),
+ bodySizeLimit));
+ Assert.assertTrue(
+ PipeTransferSliceReqBuilder.shouldSlice(
+ createReq(IoTDBSinkRequestVersion.VERSION_1.getVersion(), 4),
bodySizeLimit));
+ }
+
+ private static TPipeTransferReq createReq(final byte version, final int
bodySize) {
+ final byte[] body = new byte[bodySize];
+ for (int i = 0; i < body.length; ++i) {
+ body[i] = (byte) i;
+ }
+
+ final TPipeTransferReq req = new TPipeTransferReq();
+ req.version = version;
+ req.type = (short) 123;
+ req.body = ByteBuffer.wrap(body);
+ return req;
+ }
+}