This is an automated email from the ASF dual-hosted git repository.
Caideyipi pushed a commit to branch receiver-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/receiver-fix by this push:
new 09e43a4ceec re
09e43a4ceec is described below
commit 09e43a4ceec69863383364fcc7b9a29e6bec5803
Author: Caideyipi <[email protected]>
AuthorDate: Mon Apr 27 14:05:23 2026 +0800
re
---
.../protocol/airgap/IoTDBAirGapReceiver.java | 5 +
.../iotconsensusv2/IoTConsensusV2Receiver.java | 63 ++++++++++---
.../protocol/airgap/IoTDBAirGapReceiverTest.java | 103 +++++++++++++++++++++
.../commons/pipe/receiver/IoTDBFileReceiver.java | 75 ++++++++++-----
.../pipe/receiver/PipeReceiverFilePathUtils.java | 42 +++++++++
.../pipe/receiver/IoTDBFileReceiverTest.java | 46 +++++++++
6 files changed, 301 insertions(+), 33 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
index 8658d12b6a8..278c1ccaaef 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
@@ -178,6 +178,11 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
if (System.currentTimeMillis() - startTime
< PipeConfig.getInstance().getPipeAirGapRetryMaxMs()) {
handleReq(req, startTime);
+ } else {
+ LOGGER.warn(
+ "Pipe air gap receiver {}: Temporary unavailable retry timed out,
returning FAIL to sender.",
+ receiverId);
+ fail();
}
} else {
LOGGER.warn(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
index e0ef8e4072c..31c7db57c33 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiverAgent;
+import org.apache.iotdb.commons.pipe.receiver.PipeReceiverFilePathUtils;
import
org.apache.iotdb.commons.pipe.sink.payload.iotconsensusv2.request.IoTConsensusV2RequestType;
import
org.apache.iotdb.commons.pipe.sink.payload.iotconsensusv2.request.IoTConsensusV2RequestVersion;
import
org.apache.iotdb.commons.pipe.sink.payload.iotconsensusv2.request.IoTConsensusV2TransferFilePieceReq;
@@ -78,6 +79,7 @@ import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
@@ -520,13 +522,19 @@ public class IoTConsensusV2Receiver {
File writingFile = tsFileWriter.getWritingFile();
RandomAccessFile writingFileWriter = tsFileWriter.getWritingFileWriter();
- File currentWritingDirPath = tsFileWriter.getLocalWritingDir();
-
- final List<File> files =
- req.getFileNames().stream()
- .map(fileName -> new File(currentWritingDirPath, fileName))
- .collect(Collectors.toList());
try {
+ final List<File> files =
+ req.getFileNames().stream()
+ .map(
+ fileName -> {
+ try {
+ return resolveWritingFilePath(tsFileWriter,
fileName).toFile();
+ } catch (final IOException e) {
+ throw new IllegalArgumentException(e);
+ }
+ })
+ .collect(Collectors.toList());
+
if (isWritingFileNonAvailable(tsFileWriter)) {
final TSStatus status =
RpcUtils.getStatus(
@@ -601,16 +609,20 @@ public class IoTConsensusV2Receiver {
}
return new TIoTConsensusV2TransferResp(status);
} catch (Exception e) {
+ final Throwable rootCause = e instanceof IllegalArgumentException ?
e.getCause() : e;
LOGGER.warn(
"IoTConsensusV2-PipeName-{}: Failed to seal file {} from req {}.",
consensusPipeName,
- files,
+ req.getFileNames(),
req,
- e);
+ rootCause);
return new TIoTConsensusV2TransferResp(
RpcUtils.getStatus(
TSStatusCode.IOT_CONSENSUS_V2_TRANSFER_FILE_ERROR,
- String.format("Failed to seal file %s because %s", writingFile,
e.getMessage())));
+ String.format(
+ "Failed to seal file %s because %s",
+ req.getFileNames(),
+ rootCause == null ? e.getMessage() :
rootCause.getMessage())));
} finally {
// If the writing file is not sealed successfully, the writing file will
be deleted.
// All pieces of the writing file and its mod(if exists) should be
retransmitted by the
@@ -809,7 +821,22 @@ public class IoTConsensusV2Receiver {
private boolean isFileExistedAndNameCorrect(
IoTConsensusV2TsFileWriter tsFileWriter, String fileName) {
final File writingFile = tsFileWriter.getWritingFile();
- return writingFile != null && writingFile.getName().equals(fileName);
+ try {
+ return writingFile != null
+ && writingFile.exists()
+ && writingFile
+ .toPath()
+ .toAbsolutePath()
+ .normalize()
+ .equals(resolveWritingFilePath(tsFileWriter, fileName));
+ } catch (final IOException e) {
+ LOGGER.warn(
+ "IoTConsensusV2-PipeName-{}: Illegal file name {} when checking
writing file.",
+ consensusPipeName,
+ fileName,
+ e);
+ return false;
+ }
}
private boolean isWritingFileOffsetNonCorrect(
@@ -874,7 +901,7 @@ public class IoTConsensusV2Receiver {
}
// Every tsFileWriter has its own writing path.
// 1 Thread --> 1 connection --> 1 tsFileWriter --> 1 path
- tsFileWriter.setWritingFile(new File(tsFileWriter.getLocalWritingDir(),
fileName));
+ tsFileWriter.setWritingFile(resolveWritingFilePath(tsFileWriter,
fileName).toFile());
tsFileWriter.setWritingFileWriter(new
RandomAccessFile(tsFileWriter.getWritingFile(), "rw"));
LOGGER.info(
"IoTConsensusV2-PipeName-{}: Writing file {} was created. Ready to
write file pieces.",
@@ -882,6 +909,20 @@ public class IoTConsensusV2Receiver {
tsFileWriter.getWritingFile().getPath());
}
+ private Path resolveWritingFilePath(
+ final IoTConsensusV2TsFileWriter tsFileWriter, final String fileName)
throws IOException {
+ try {
+ return PipeReceiverFilePathUtils.resolveFilePath(
+ tsFileWriter.getLocalWritingDir().toPath(), fileName);
+ } catch (final IOException e) {
+ LOGGER.error(
+ "IoTConsensusV2-PipeName-{}: Path traversal attempt detected!
Filename: {}",
+ consensusPipeName,
+ fileName);
+ throw e;
+ }
+ }
+
private void initiateTsFileBufferFolder(List<String> receiverBaseDirsName)
throws IOException {
// initiate receiverFileDirs
for (String receiverFileBaseDir : receiverBaseDirsName) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java
index 19dea8140a1..e23db1f1ca8 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java
@@ -19,18 +19,32 @@
package org.apache.iotdb.db.pipe.receiver.protocol.airgap;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiver;
import
org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapELanguageConstant;
+import org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapOneByteResponse;
+import
org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapPseudoTPipeTransferRequest;
+import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
+import
org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiverAgent;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.tsfile.utils.BytesUtils;
import org.junit.Assert;
import org.junit.Test;
import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.net.Socket;
+import java.nio.ByteBuffer;
public class IoTDBAirGapReceiverTest {
@@ -69,4 +83,93 @@ public class IoTDBAirGapReceiverTest {
Assert.assertThrows(IOException.class, () ->
receiver.readData(inputStream));
Assert.assertTrue(exception.getMessage().contains("nested E-Language
prefix"));
}
+
+ @Test
+ public void testTemporaryUnavailableRetryTimeoutReturnsFail() throws
Exception {
+ final CommonConfig commonConfig =
CommonDescriptor.getInstance().getConfig();
+ final long originalRetryLocalIntervalMs =
commonConfig.getPipeAirGapRetryLocalIntervalMs();
+ final long originalRetryMaxMs = commonConfig.getPipeAirGapRetryMaxMs();
+
+ try {
+ commonConfig.setPipeAirGapRetryLocalIntervalMs(0);
+ commonConfig.setPipeAirGapRetryMaxMs(1);
+
+ final RecordingSocket socket = new RecordingSocket();
+ final IoTDBAirGapReceiver receiver = new IoTDBAirGapReceiver(socket, 3L);
+ final StubIoTDBDataNodeReceiverAgent stubAgent = new
StubIoTDBDataNodeReceiverAgent();
+ stubAgent.setStubReceiver(
+ new StubReceiver(
+ new TSStatus(
+
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())));
+ setField(receiver, "agent", stubAgent);
+
+ final AirGapPseudoTPipeTransferRequest req = new
AirGapPseudoTPipeTransferRequest();
+ req.setVersion(IoTDBSinkRequestVersion.VERSION_1.getVersion());
+ req.setType((short) 0);
+ req.setBody(ByteBuffer.allocate(0));
+
+ final Method handleReq =
+ IoTDBAirGapReceiver.class.getDeclaredMethod(
+ "handleReq", AirGapPseudoTPipeTransferRequest.class, long.class);
+ handleReq.setAccessible(true);
+ handleReq.invoke(receiver, req, System.currentTimeMillis() - 10_000L);
+
+ Assert.assertArrayEquals(AirGapOneByteResponse.FAIL,
socket.getWrittenBytes());
+ } finally {
+
commonConfig.setPipeAirGapRetryLocalIntervalMs(originalRetryLocalIntervalMs);
+ commonConfig.setPipeAirGapRetryMaxMs(originalRetryMaxMs);
+ }
+ }
+
+ private static void setField(final Object target, final String fieldName,
final Object value)
+ throws Exception {
+ final Field field = IoTDBAirGapReceiver.class.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(target, value);
+ }
+
+ private static class RecordingSocket extends Socket {
+
+ private final ByteArrayOutputStream outputStream = new
ByteArrayOutputStream();
+
+ @Override
+ public OutputStream getOutputStream() {
+ return outputStream;
+ }
+
+ byte[] getWrittenBytes() {
+ return outputStream.toByteArray();
+ }
+ }
+
+ private static class StubIoTDBDataNodeReceiverAgent extends
IoTDBDataNodeReceiverAgent {
+
+ void setStubReceiver(final IoTDBReceiver receiver) {
+ setReceiverWithSpecifiedClient(null, receiver);
+ }
+ }
+
+ private static class StubReceiver implements IoTDBReceiver {
+
+ private final TPipeTransferResp response;
+
+ private StubReceiver(final TSStatus status) {
+ response = new TPipeTransferResp(status);
+ }
+
+ @Override
+ public TPipeTransferResp receive(final TPipeTransferReq req) {
+ return response;
+ }
+
+ @Override
+ public void handleExit() {
+ // noop for unit test
+ }
+
+ @Override
+ public IoTDBSinkRequestVersion getVersion() {
+ return IoTDBSinkRequestVersion.VERSION_1;
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index e2484576a77..6879f88ced8 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -497,16 +497,7 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
receiverFileDirWithIdSuffix.get().getPath());
}
}
- Path baseDir =
receiverFileDirWithIdSuffix.get().toPath().toAbsolutePath().normalize();
- Path targetPath = baseDir.resolve(fileName).toAbsolutePath().normalize();
-
- if (!targetPath.startsWith(baseDir)) {
- LOGGER.error(
- "Receiver id = {}: Path traversal attempt detected! Filename: {}",
- receiverId.get(),
- fileName);
- throw new IOException("Illegal fileName: " + fileName + " (Path
traversal detected)");
- }
+ final Path targetPath = resolveReceiverFilePath(fileName);
writingFile = targetPath.toFile();
writingFileWriter = new RandomAccessFile(writingFile, "rw");
@@ -517,7 +508,37 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
}
private boolean isFileExistedAndNameCorrect(final String fileName) {
- return writingFile != null && writingFile.exists() &&
writingFile.getName().equals(fileName);
+ try {
+ return writingFile != null
+ && writingFile.exists()
+ && receiverFileDirWithIdSuffix.get() != null
+ && writingFile
+ .toPath()
+ .toAbsolutePath()
+ .normalize()
+ .equals(resolveReceiverFilePath(fileName));
+ } catch (final IOException e) {
+ PipeLogger.log(
+ LOGGER::warn,
+ e,
+ "Receiver id = %s: Illegal file name %s when checking writing file.",
+ receiverId.get(),
+ fileName);
+ return false;
+ }
+ }
+
+ private Path resolveReceiverFilePath(final String fileName) throws
IOException {
+ try {
+ return PipeReceiverFilePathUtils.resolveFilePath(
+ receiverFileDirWithIdSuffix.get().toPath(), fileName);
+ } catch (final IOException e) {
+ LOGGER.error(
+ "Receiver id = {}: Path traversal attempt detected! Filename: {}",
+ receiverId.get(),
+ fileName);
+ throw e;
+ }
}
private void closeCurrentWritingFileWriter(final boolean fsyncBeforeClose) {
@@ -680,15 +701,22 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
// Support null in fileName list, which means that this file is optional and
is currently absent
protected final TPipeTransferResp handleTransferFileSealV2(final
PipeTransferFileSealReqV2 req) {
final List<String> fileNames = req.getFileNames();
- final List<File> files =
- fileNames.stream()
- .map(
- fileName ->
- Objects.nonNull(fileName)
- ? new File(receiverFileDirWithIdSuffix.get(), fileName)
- : null)
- .collect(Collectors.toList());
try {
+ final List<File> files =
+ fileNames.stream()
+ .map(
+ fileName -> {
+ if (Objects.isNull(fileName)) {
+ return null;
+ }
+ try {
+ return resolveReceiverFilePath(fileName).toFile();
+ } catch (final IOException e) {
+ throw new IllegalArgumentException(e);
+ }
+ })
+ .collect(Collectors.toList());
+
if (!isWritingFileAvailable()) {
final TSStatus status =
RpcUtils.getStatus(
@@ -754,17 +782,20 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
}
return new TPipeTransferResp(status);
} catch (final Exception e) {
+ final Throwable rootCause = e instanceof IllegalArgumentException ?
e.getCause() : e;
PipeLogger.log(
LOGGER::warn,
- e,
+ rootCause,
"Receiver id = %s: Failed to seal file %s from req %s.",
receiverId.get(),
- files,
+ fileNames,
req);
return new TPipeTransferResp(
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
- String.format("Failed to seal file %s because %s", files,
e.getMessage())));
+ String.format(
+ "Failed to seal file %s because %s",
+ fileNames, rootCause == null ? e.getMessage() :
rootCause.getMessage())));
} finally {
// If the writing file is not sealed successfully, the writing file will
be deleted.
// All pieces of the writing file and its mod(if exists) should be
retransmitted by the
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverFilePathUtils.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverFilePathUtils.java
new file mode 100644
index 00000000000..bc7275d4ebe
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverFilePathUtils.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.receiver;
+
+import java.io.IOException;
+import java.nio.file.Path;
+
+public final class PipeReceiverFilePathUtils {
+
+ private PipeReceiverFilePathUtils() {
+ // Utility class
+ }
+
+ public static Path resolveFilePath(final Path baseDir, final String
fileName) throws IOException {
+ final Path normalizedBaseDir = baseDir.toAbsolutePath().normalize();
+ final Path normalizedTargetPath =
+ normalizedBaseDir.resolve(fileName).toAbsolutePath().normalize();
+
+ if (!normalizedTargetPath.startsWith(normalizedBaseDir)) {
+ throw new IOException("Illegal fileName: " + fileName + " (Path
traversal detected)");
+ }
+
+ return normalizedTargetPath;
+ }
+}
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java
index a372326433d..8d2db54d5b6 100644
---
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java
@@ -21,8 +21,10 @@ package org.apache.iotdb.commons.pipe.receiver;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV1;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV2;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
@@ -33,6 +35,8 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
public class IoTDBFileReceiverTest {
@@ -63,6 +67,25 @@ public class IoTDBFileReceiverTest {
}
}
+ @Test
+ public void testRejectPathTraversalFileNameInSealRequest() throws Exception {
+ final Path baseDir = Files.createTempDirectory("iotdb-file-receiver-test");
+ final DummyFileReceiver receiver = new DummyFileReceiver(baseDir.toFile());
+ try {
+ receiver.createWritingFile("normal.tsfile", false);
+
+ final TPipeTransferResp response =
+ receiver.sealFiles(
+ Arrays.asList("../outside.mod", "normal.tsfile"),
Arrays.asList(0L, 0L));
+
+ Assert.assertEquals(
+ TSStatusCode.PIPE_TRANSFER_FILE_ERROR.getStatusCode(),
response.getStatus().getCode());
+ Assert.assertTrue(response.getStatus().getMessage().contains("Illegal
fileName"));
+ } finally {
+ receiver.handleExit();
+ }
+ }
+
private static class DummyFileReceiver extends IoTDBFileReceiver {
DummyFileReceiver(final File baseDir) {
@@ -73,6 +96,12 @@ public class IoTDBFileReceiverTest {
updateWritingFileIfNeeded(fileName, isSingleFile);
}
+ TPipeTransferResp sealFiles(final List<String> fileNames, final List<Long>
fileLengths)
+ throws IOException {
+ return handleTransferFileSealV2(
+ DummyFileSealReqV2.toTPipeTransferReq(fileNames, fileLengths,
Collections.emptyMap()));
+ }
+
File getWritingFileInBaseDir(final String fileName) {
return
receiverFileDirWithIdSuffix.get().toPath().resolve(fileName).toFile();
}
@@ -130,4 +159,21 @@ public class IoTDBFileReceiverTest {
return null;
}
}
+
+ private static class DummyFileSealReqV2 extends PipeTransferFileSealReqV2 {
+
+ static DummyFileSealReqV2 toTPipeTransferReq(
+ final List<String> fileNames,
+ final List<Long> fileLengths,
+ final java.util.Map<String, String> parameters)
+ throws IOException {
+ return (DummyFileSealReqV2)
+ new DummyFileSealReqV2().convertToTPipeTransferReq(fileNames,
fileLengths, parameters);
+ }
+
+ @Override
+ protected PipeRequestType getPlanType() {
+ return PipeRequestType.TRANSFER_SCHEMA_SNAPSHOT_SEAL;
+ }
+ }
}