This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 33c3ef7196b Pipe: Harden legacy pipe file transfer validation and
access checks (#17741)
33c3ef7196b is described below
commit 33c3ef7196b10bbd269f4d45bfb48f7fd39aa374
Author: Caideyipi <[email protected]>
AuthorDate: Fri May 29 12:18:52 2026 +0800
Pipe: Harden legacy pipe file transfer validation and access checks (#17741)
* Fix
* fix
---
.../single/IoTDBLegacyPipeReceiverSecurityIT.java | 112 +++++++++++++++++++
.../legacy/IoTDBLegacyPipeReceiverAgent.java | 40 ++++++-
.../sink/protocol/legacy/IoTDBLegacyPipeSink.java | 26 +++++
.../protocol/thrift/impl/ClientRPCServiceImpl.java | 53 +++++++--
.../legacy/IoTDBLegacyPipeReceiverAgentTest.java | 122 +++++++++++++++++++++
5 files changed, 339 insertions(+), 14 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBLegacyPipeReceiverSecurityIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBLegacyPipeReceiverSecurityIT.java
new file mode 100644
index 00000000000..51e1e211535
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBLegacyPipeReceiverSecurityIT.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.single;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
+import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
+import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
+import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.ZoneId;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class})
+public class IoTDBLegacyPipeReceiverSecurityIT {
+
+ @BeforeClass
+ public static void setUp() {
+ EnvFactory.getEnv().initClusterEnvironment();
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testRejectPathTraversalFileNameInLegacyTransportFile() throws
Exception {
+ final DataNodeWrapper dataNode = EnvFactory.getEnv().getDataNodeWrapper(0);
+
+ try (final IoTDBSyncClient client =
+ new IoTDBSyncClient(
+ new ThriftClientProperty.Builder().build(),
+ dataNode.getIp(),
+ dataNode.getPort(),
+ false,
+ null,
+ null)) {
+ final TSOpenSessionResp openSessionResp =
client.openSession(createOpenSessionReq());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
openSessionResp.getStatus().getCode());
+
+ try {
+ final TSStatus handshakeStatus =
+ client.handshake(
+ new TSyncIdentityInfo(
+ "pathTraversalPipe", System.currentTimeMillis(),
"UNKNOWN", ""));
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
handshakeStatus.getCode());
+
+ final String maliciousFileName =
+ ".." + File.separator + ".." + File.separator + "pwned.tsfile";
+ final TSStatus status =
+ client.sendFile(
+ new TSyncTransportMetaInfo(maliciousFileName, 0),
+ ByteBuffer.wrap("pwned".getBytes(StandardCharsets.UTF_8)));
+
+ Assert.assertEquals(TSStatusCode.SYNC_FILE_ERROR.getStatusCode(),
status.getCode());
+ Assert.assertTrue(status.getMessage().contains("Illegal fileName"));
+ } finally {
+ client.closeSession(new
TSCloseSessionReq(openSessionResp.getSessionId()));
+ }
+ }
+ }
+
+ private TSOpenSessionReq createOpenSessionReq() {
+ final TSOpenSessionReq req = new TSOpenSessionReq();
+ req.setClient_protocol(TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
+ req.setUsername("root");
+ req.setPassword("root");
+ req.setZoneId(ZoneId.systemDefault().toString());
+ req.putToConfiguration("version",
IoTDBConstant.ClientVersion.V_1_0.toString());
+ req.putToConfiguration("sql_dialect", "tree");
+ return req;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
index e5769a5c6f6..c4c3986259f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
@@ -24,7 +24,9 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.audit.UserEntity;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.i18n.PipeMessages;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.pipe.receiver.PipeReceiverFilePathUtils;
import org.apache.iotdb.commons.queryengine.common.SessionInfo;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.auth.AuthorityChecker;
@@ -54,6 +56,7 @@ import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import java.nio.file.Paths;
import java.time.ZoneId;
import java.util.Map;
import java.util.Objects;
@@ -267,9 +270,11 @@ public class IoTDBLegacyPipeReceiverAgent {
* @param tsFilePipeData pipeData
* @param fileDir path of file data dir
*/
- private void handleTsFilePipeData(final TsFilePipeData tsFilePipeData, final
String fileDir) {
+ private void handleTsFilePipeData(final TsFilePipeData tsFilePipeData, final
String fileDir)
+ throws IOException {
final String tsFileName = tsFilePipeData.getTsFileName();
- final File dir = new File(fileDir);
+ final File tsFile = resolveFileInFileDataDir(fileDir, tsFileName);
+ final File dir = tsFile.getParentFile();
final File[] targetFiles =
dir.listFiles((dir1, name) -> name.startsWith(tsFileName) &&
name.endsWith(PATCH_SUFFIX));
if (targetFiles != null) {
@@ -311,10 +316,18 @@ public class IoTDBLegacyPipeReceiverAgent {
final String fileDir = getFileDataDir(identityInfo);
final String fileName = metaInfo.fileName;
final long startIndex = metaInfo.startIndex;
- final File file = new File(fileDir, fileName + PATCH_SUFFIX);
+ final File file;
+ final File fileWithoutPatch;
+ try {
+ fileWithoutPatch = resolveFileInFileDataDir(fileDir, fileName);
+ file = resolveFileInFileDataDir(fileDir, fileName + PATCH_SUFFIX);
+ } catch (final IOException e) {
+ LOGGER.warn(e.getMessage());
+ return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage());
+ }
// step2. check startIndex
- final IndexCheckResult result = checkStartIndexValid(new File(fileDir,
fileName), startIndex);
+ final IndexCheckResult result = checkStartIndexValid(fileWithoutPatch,
startIndex);
if (!result.isResult()) {
return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_REDIRECTION_ERROR,
result.getIndex());
}
@@ -326,7 +339,7 @@ public class IoTDBLegacyPipeReceiverAgent {
final byte[] byteArray = new byte[length];
buff.get(byteArray);
randomAccessFile.write(byteArray);
- recordStartIndex(new File(fileDir, fileName), startIndex + length);
+ recordStartIndex(fileWithoutPatch, startIndex + length);
LOGGER.debug(
DataNodePipeMessages.SYNC_START_AT_TO_IS_DONE, fileName, startIndex,
startIndex + length);
} catch (final IOException e) {
@@ -337,6 +350,23 @@ public class IoTDBLegacyPipeReceiverAgent {
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "");
}
+ private static File resolveFileInFileDataDir(final String fileDir, final
String fileName)
+ throws IOException {
+ if (StringUtils.isEmpty(fileName)) {
+ throw new
IOException(String.format(PipeMessages.ILLEGAL_FILENAME_PATH_TRAVERSAL,
fileName));
+ }
+
+ final String illegalError = FileUtils.getIllegalError4Directory(fileName);
+ if (Objects.nonNull(illegalError)) {
+ throw new IOException(
+ String.format(PipeMessages.ILLEGAL_FILENAME_PATH_TRAVERSAL, fileName)
+ + ", "
+ + illegalError);
+ }
+
+ return PipeReceiverFilePathUtils.resolveFilePath(Paths.get(fileDir),
fileName).toFile();
+ }
+
private IndexCheckResult checkStartIndexValid(final File file, final long
startIndex) {
// get local index from memory map
long localIndex = getCurrentFileStartIndex(file.getAbsolutePath());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
index 829c9aed6b9..5ae7942d201 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
@@ -52,6 +53,9 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
+import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
import org.apache.iotdb.session.pool.SessionPool;
@@ -66,6 +70,7 @@ import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import java.time.ZoneId;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
@@ -229,6 +234,7 @@ public class IoTDBLegacyPipeSink implements PipeConnector {
useSSL,
trustStore,
trustStorePwd);
+ openClientSession();
final TSyncIdentityInfo identityInfo =
new TSyncIdentityInfo(
pipeName, System.currentTimeMillis(), syncConnectorVersion,
databaseName);
@@ -259,6 +265,26 @@ public class IoTDBLegacyPipeSink implements PipeConnector {
.build();
}
+ private void openClientSession() throws TException {
+ final TSOpenSessionReq openSessionReq = new TSOpenSessionReq();
+
openSessionReq.setClient_protocol(TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3);
+ openSessionReq.setUsername(user);
+ openSessionReq.setPassword(password);
+ openSessionReq.setZoneId(ZoneId.systemDefault().toString());
+ openSessionReq.putToConfiguration("version",
IoTDBConstant.ClientVersion.V_1_0.toString());
+ openSessionReq.putToConfiguration("sql_dialect", "tree");
+
+ final TSOpenSessionResp openSessionResp =
client.openSession(openSessionReq);
+ if (openSessionResp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ final String errorMsg =
+ String.format(
+ "Failed to login to receiver %s:%s for legacy pipe transfer
because %s",
+ ipAddress, port, openSessionResp.getStatus().getMessage());
+ LOGGER.warn(errorMsg);
+ throw new PipeRuntimeCriticalException(errorMsg);
+ }
+ }
+
@Override
public void heartbeat() throws Exception {
// do nothing
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index bc253c05ac8..bd360bebd61 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -28,6 +28,7 @@ import
org.apache.iotdb.common.rpc.thrift.TShowConfigurationTemplateResp;
import org.apache.iotdb.commons.audit.AuditEventType;
import org.apache.iotdb.commons.audit.AuditLogFields;
import org.apache.iotdb.commons.audit.AuditLogOperation;
+import org.apache.iotdb.commons.auth.entity.PrivilegeType;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -3399,24 +3400,58 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
@Override
public TSStatus handshake(final TSyncIdentityInfo info) throws TException {
- return PipeDataNodeAgent.receiver()
- .legacy()
- .handshake(
- info,
- SESSION_MANAGER.getCurrSession().getClientAddress(),
- partitionFetcher,
- schemaFetcher);
+ try {
+ final TSStatus status = checkLegacyPipeReceiverPermission();
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return status;
+ }
+ return PipeDataNodeAgent.receiver()
+ .legacy()
+ .handshake(
+ info,
+ SESSION_MANAGER.getCurrSession().getClientAddress(),
+ partitionFetcher,
+ schemaFetcher);
+ } finally {
+ SESSION_MANAGER.updateIdleTime();
+ }
}
@Override
public TSStatus sendPipeData(final ByteBuffer buff) throws TException {
- return PipeDataNodeAgent.receiver().legacy().transportPipeData(buff);
+ try {
+ final TSStatus status = checkLegacyPipeReceiverPermission();
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return status;
+ }
+ return PipeDataNodeAgent.receiver().legacy().transportPipeData(buff);
+ } finally {
+ SESSION_MANAGER.updateIdleTime();
+ }
}
@Override
public TSStatus sendFile(final TSyncTransportMetaInfo metaInfo, final
ByteBuffer buff)
throws TException {
- return PipeDataNodeAgent.receiver().legacy().transportFile(metaInfo, buff);
+ try {
+ final TSStatus status = checkLegacyPipeReceiverPermission();
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return status;
+ }
+ return PipeDataNodeAgent.receiver().legacy().transportFile(metaInfo,
buff);
+ } finally {
+ SESSION_MANAGER.updateIdleTime();
+ }
+ }
+
+ private TSStatus checkLegacyPipeReceiverPermission() {
+ final IClientSession clientSession =
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
+ if (!SESSION_MANAGER.checkLogin(clientSession)) {
+ return getNotLoggedInStatus();
+ }
+ return AuthorityChecker.getTSStatus(
+ AuthorityChecker.checkSystemPermission(clientSession.getUsername(),
PrivilegeType.USE_PIPE),
+ PrivilegeType.USE_PIPE);
}
@Override
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgentTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgentTest.java
new file mode 100644
index 00000000000..5ce4df74f7f
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgentTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.receiver.protocol.legacy;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.db.pipe.sink.payload.legacy.TsFilePipeData;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
+import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+public class IoTDBLegacyPipeReceiverAgentTest {
+
+ private static final String PIPE_NAME = "poc";
+ private static final long CREATE_TIME = 1700000000000L;
+ private static final String REMOTE_ADDRESS = "127.0.0.1";
+
+ private String originalSyncDir;
+ private Path syncDir;
+ private IoTDBLegacyPipeReceiverAgent agent;
+
+ @Before
+ public void setUp() throws Exception {
+ originalSyncDir = CommonDescriptor.getInstance().getConfig().getSyncDir();
+ syncDir = Files.createTempDirectory("legacy-pipe-receiver");
+ CommonDescriptor.getInstance().getConfig().setSyncDir(syncDir.toString());
+
+ agent = new IoTDBLegacyPipeReceiverAgent();
+ final TSStatus status =
+ agent.handshake(
+ new TSyncIdentityInfo(PIPE_NAME, CREATE_TIME, "UNKNOWN", ""),
+ REMOTE_ADDRESS,
+ null,
+ null);
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (agent != null) {
+ agent.handleClientExit();
+ }
+ CommonDescriptor.getInstance().getConfig().setSyncDir(originalSyncDir);
+ if (syncDir != null) {
+
org.apache.tsfile.external.commons.io.FileUtils.deleteDirectory(syncDir.toFile());
+ }
+ }
+
+ @Test
+ public void testTransportFileRejectsPathTraversal() throws Exception {
+ final String traversal =
+ ".." + File.separator + ".." + File.separator + ".." + File.separator
+ "pwned";
+
+ final TSStatus status =
+ agent.transportFile(
+ new TSyncTransportMetaInfo(traversal, 0),
+ ByteBuffer.wrap("pwned".getBytes(StandardCharsets.UTF_8)));
+
+ Assert.assertEquals(TSStatusCode.SYNC_FILE_ERROR.getStatusCode(),
status.getCode());
+ Assert.assertTrue(status.getMessage().contains("Illegal fileName"));
+ Assert.assertFalse(Files.exists(syncDir.resolve("pwned.patch")));
+ }
+
+ @Test
+ public void testTransportFileWritesPlainFileUnderFileDataDir() throws
Exception {
+ final String fileName = "1-2-3-4.tsfile";
+ final byte[] payload = "iotdb".getBytes(StandardCharsets.UTF_8);
+
+ final TSStatus status =
+ agent.transportFile(new TSyncTransportMetaInfo(fileName, 0),
ByteBuffer.wrap(payload));
+
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
+ final Path patchFile = getFileDataDir().resolve(fileName + ".patch");
+ Assert.assertArrayEquals(payload, Files.readAllBytes(patchFile));
+ }
+
+ @Test
+ public void testTransportPipeDataRejectsPathTraversalTsFileName() throws
Exception {
+ final String traversal = ".." + File.separator + "evil.tsfile";
+
+ final TSStatus status =
+ agent.transportPipeData(ByteBuffer.wrap(new TsFilePipeData("",
traversal, -1).serialize()));
+
+ Assert.assertEquals(TSStatusCode.PIPESERVER_ERROR.getStatusCode(),
status.getCode());
+ Assert.assertTrue(status.getMessage().contains("Illegal fileName"));
+ }
+
+ private Path getFileDataDir() {
+ return syncDir
+ .resolve("receiver")
+ .resolve(String.format("%s-%d-%s", PIPE_NAME, CREATE_TIME,
REMOTE_ADDRESS))
+ .resolve("file-data");
+ }
+}