This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 73cf0670b63 Pipe: Fixed the bug that air gap receiver may not respond
in temporary timeout exception & Optimized the directory check in receiver &
Fixed the bug that the "skipIfNoPrivileges" may be wrongly reused at receiver &
Optimized the configNode pipe logic (#17556)
73cf0670b63 is described below
commit 73cf0670b637dd584c2ed806101485b8934ec0b6
Author: Caideyipi <[email protected]>
AuthorDate: Wed Apr 29 11:20:57 2026 +0800
Pipe: Fixed the bug that air gap receiver may not respond in temporary
timeout exception & Optimized the directory check in receiver & Fixed the bug
that the "skipIfNoPrivileges" may be wrongly reused at receiver & Optimized the
configNode pipe logic (#17556)
* re
* sink
* fix
* rollback
---
.../iotdb/confignode/manager/ProcedureManager.java | 4 +-
.../runtime/heartbeat/PipeHeartbeatParser.java | 38 +++--
.../runtime/PipeHandleLeaderChangeProcedure.java | 6 +-
.../runtime/heartbeat/PipeHeartbeatParserTest.java | 182 +++++++++++++++++++++
.../PipeHandleLeaderChangeProcedureTest.java | 46 ++++++
.../protocol/airgap/IoTDBAirGapReceiver.java | 5 +
.../iotconsensusv2/IoTConsensusV2Receiver.java | 68 ++++++--
.../client/IoTDBDataNodeAsyncClientManager.java | 5 +-
.../protocol/airgap/IoTDBAirGapReceiverTest.java | 103 ++++++++++++
.../sink/IoTDBDataNodeAsyncClientManagerTest.java | 88 ++++++++++
.../commons/pipe/receiver/IoTDBFileReceiver.java | 75 ++++++---
.../pipe/receiver/PipeReceiverFilePathUtils.java | 42 +++++
.../pipe/receiver/IoTDBFileReceiverTest.java | 46 ++++++
13 files changed, 651 insertions(+), 57 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index b80376e014a..a9457f23964 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -1666,7 +1666,7 @@ public class ProcedureManager {
}
}
- public void pipeHandleMetaChange(
+ public boolean pipeHandleMetaChange(
boolean needWriteConsensusOnConfigNodes, boolean
needPushPipeMetaToDataNodes) {
try {
final long procedureId =
@@ -1674,8 +1674,10 @@ public class ProcedureManager {
new PipeHandleMetaChangeProcedure(
needWriteConsensusOnConfigNodes,
needPushPipeMetaToDataNodes));
LOGGER.info("PipeHandleMetaChangeProcedure was submitted, procedureId:
{}.", procedureId);
+ return true;
} catch (Exception e) {
LOGGER.warn("PipeHandleMetaChangeProcedure was failed to submit.", e);
+ return false;
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
index ace07f5e2d3..6dc11ddd3f3 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
@@ -59,7 +59,7 @@ public class PipeHeartbeatParser {
this.configManager = configManager;
heartbeatCounter = 0;
- registeredNodeNumber = 1;
+ registeredNodeNumber = getExpectedHeartbeatNodeCount();
needWriteConsensusOnConfigNodes = new AtomicBoolean(false);
needPushPipeMetaToDataNodes = new AtomicBoolean(false);
@@ -73,17 +73,8 @@ public class PipeHeartbeatParser {
if (heartbeatCount % registeredNodeNumber == 0) {
canSubmitHandleMetaChangeProcedure.set(true);
- // registeredNodeNumber may be changed, update it here when we can
submit procedure
- registeredNodeNumber =
configManager.getNodeManager().getRegisteredNodeCount();
- if (registeredNodeNumber <= 0) {
- LOGGER.warn(
- "registeredNodeNumber is {} when parseHeartbeat from node
(id={}).",
- registeredNodeNumber,
- nodeId);
- // registeredNodeNumber can not be set to 0 in this class, otherwise
may cause
- // DivideByZeroException
- registeredNodeNumber = 1;
- }
+ // The expected reporter set may be changed, update it at the end of the
current round.
+ registeredNodeNumber = getExpectedHeartbeatNodeCount();
}
if (pipeHeartbeat.isEmpty()
@@ -114,14 +105,14 @@ public class PipeHeartbeatParser {
if (canSubmitHandleMetaChangeProcedure.get()
&& (needWriteConsensusOnConfigNodes.get()
|| needPushPipeMetaToDataNodes.get())) {
- configManager
+ if (configManager
.getProcedureManager()
.pipeHandleMetaChange(
- needWriteConsensusOnConfigNodes.get(),
needPushPipeMetaToDataNodes.get());
-
- // Reset flags after procedure is submitted
- needWriteConsensusOnConfigNodes.set(false);
- needPushPipeMetaToDataNodes.set(false);
+ needWriteConsensusOnConfigNodes.get(),
+ needPushPipeMetaToDataNodes.get())) {
+ needWriteConsensusOnConfigNodes.set(false);
+ needPushPipeMetaToDataNodes.set(false);
+ }
}
} finally {
configManager.getPipeManager().getPipeTaskCoordinator().unlock();
@@ -129,6 +120,17 @@ public class PipeHeartbeatParser {
});
}
+ private int getExpectedHeartbeatNodeCount() {
+ final int expectedNodeCount =
+ configManager.getNodeManager().getRegisteredDataNodeCount()
+ + (PipeConfig.getInstance().isSeperatedPipeHeartbeatEnabled() ? 1
: 0);
+ if (expectedNodeCount <= 0) {
+ LOGGER.warn("Expected pipe heartbeat node count is {}, fallback to 1.",
expectedNodeCount);
+ return 1;
+ }
+ return expectedNodeCount;
+ }
+
private void parseHeartbeatAndSaveMetaChangeLocally(
final AtomicReference<PipeTaskInfo> pipeTaskInfo,
final int nodeId,
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
index f18737e8b7a..61f6f3cae2a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
@@ -164,7 +164,11 @@ public class PipeHandleLeaderChangeProcedure extends
AbstractOperatePipeProcedur
final int oldDataRegionLeaderId = ReadWriteIOUtils.readInt(byteBuffer);
final int newDataRegionLeaderId = ReadWriteIOUtils.readInt(byteBuffer);
regionGroupToOldAndNewLeaderPairMap.put(
- new TConsensusGroupId(TConsensusGroupType.DataRegion,
dataRegionGroupId),
+ new TConsensusGroupId(
+ dataRegionGroupId == Integer.MIN_VALUE
+ ? TConsensusGroupType.ConfigRegion
+ : TConsensusGroupType.DataRegion,
+ dataRegionGroupId),
new Pair<>(oldDataRegionLeaderId, newDataRegionLeaderId));
}
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParserTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParserTest.java
new file mode 100644
index 00000000000..d5a46d42c84
--- /dev/null
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParserTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat;
+
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.ProcedureManager;
+import org.apache.iotdb.confignode.manager.node.NodeManager;
+import org.apache.iotdb.confignode.manager.pipe.coordinator.PipeManager;
+import
org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.PipeRuntimeCoordinator;
+import
org.apache.iotdb.confignode.manager.pipe.coordinator.task.PipeTaskCoordinator;
+import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class PipeHeartbeatParserTest {
+
+ private boolean originalSeparatedPipeHeartbeatEnabled;
+
+ @Before
+ public void setUp() {
+ originalSeparatedPipeHeartbeatEnabled =
+
CommonDescriptor.getInstance().getConfig().isSeperatedPipeHeartbeatEnabled();
+ }
+
+ @After
+ public void tearDown() {
+ CommonDescriptor.getInstance()
+ .getConfig()
+
.setSeperatedPipeHeartbeatEnabled(originalSeparatedPipeHeartbeatEnabled);
+ }
+
+ @Test
+ public void
testParseHeartbeatCountsOnlyDataNodesWhenSeparatedHeartbeatDisabled()
+ throws Exception {
+
CommonDescriptor.getInstance().getConfig().setSeperatedPipeHeartbeatEnabled(false);
+
+ final ParserTestContext context = createParserTestContext(2);
+ setMetaChangeFlags(context.parser, true, false);
+
+ context.parser.parseHeartbeat(1, emptyHeartbeat());
+ verify(context.procedureManager,
never()).pipeHandleMetaChange(anyBoolean(), anyBoolean());
+
+ context.parser.parseHeartbeat(2, emptyHeartbeat());
+ verify(context.procedureManager, times(1)).pipeHandleMetaChange(true,
false);
+ }
+
+ @Test
+ public void
testParseHeartbeatCountsLocalConfigNodeWhenSeparatedHeartbeatEnabled()
+ throws Exception {
+
CommonDescriptor.getInstance().getConfig().setSeperatedPipeHeartbeatEnabled(true);
+
+ final ParserTestContext context = createParserTestContext(2);
+ setMetaChangeFlags(context.parser, true, false);
+
+ context.parser.parseHeartbeat(1, emptyHeartbeat());
+ context.parser.parseHeartbeat(2, emptyHeartbeat());
+ verify(context.procedureManager,
never()).pipeHandleMetaChange(anyBoolean(), anyBoolean());
+
+ context.parser.parseHeartbeat(3, emptyHeartbeat());
+ verify(context.procedureManager, times(1)).pipeHandleMetaChange(true,
false);
+ }
+
+ @Test
+ public void
testParseHeartbeatKeepsPendingFlagsWhenProcedureSubmissionFails() throws
Exception {
+
CommonDescriptor.getInstance().getConfig().setSeperatedPipeHeartbeatEnabled(false);
+
+ final ParserTestContext context = createParserTestContext(2);
+ when(context.procedureManager.pipeHandleMetaChange(anyBoolean(),
anyBoolean()))
+ .thenReturn(false, true);
+ setMetaChangeFlags(context.parser, true, false);
+
+ context.parser.parseHeartbeat(1, emptyHeartbeat());
+ verify(context.procedureManager,
never()).pipeHandleMetaChange(anyBoolean(), anyBoolean());
+
+ context.parser.parseHeartbeat(2, emptyHeartbeat());
+ verify(context.procedureManager, times(1)).pipeHandleMetaChange(true,
false);
+
+ context.parser.parseHeartbeat(3, emptyHeartbeat());
+ verify(context.procedureManager, times(1)).pipeHandleMetaChange(true,
false);
+
+ context.parser.parseHeartbeat(4, emptyHeartbeat());
+ verify(context.procedureManager, times(2)).pipeHandleMetaChange(true,
false);
+ }
+
+ private ParserTestContext createParserTestContext(final int
registeredDataNodeCount) {
+ final ConfigManager configManager = Mockito.mock(ConfigManager.class);
+ final NodeManager nodeManager = Mockito.mock(NodeManager.class);
+ final ProcedureManager procedureManager =
Mockito.mock(ProcedureManager.class);
+ final PipeManager pipeManager = Mockito.mock(PipeManager.class);
+ final PipeRuntimeCoordinator pipeRuntimeCoordinator =
+ Mockito.mock(PipeRuntimeCoordinator.class);
+ final PipeTaskCoordinator pipeTaskCoordinator =
Mockito.mock(PipeTaskCoordinator.class);
+ final ExecutorService procedureSubmitter =
Mockito.mock(ExecutorService.class);
+
+ when(configManager.getNodeManager()).thenReturn(nodeManager);
+ when(configManager.getProcedureManager()).thenReturn(procedureManager);
+ when(configManager.getPipeManager()).thenReturn(pipeManager);
+
when(nodeManager.getRegisteredDataNodeCount()).thenReturn(registeredDataNodeCount);
+
when(pipeManager.getPipeRuntimeCoordinator()).thenReturn(pipeRuntimeCoordinator);
+ when(pipeManager.getPipeTaskCoordinator()).thenReturn(pipeTaskCoordinator);
+
when(pipeRuntimeCoordinator.getProcedureSubmitter()).thenReturn(procedureSubmitter);
+ when(pipeTaskCoordinator.tryLock()).thenReturn(new AtomicReference<>(new
PipeTaskInfo()));
+ when(procedureManager.pipeHandleMetaChange(anyBoolean(),
anyBoolean())).thenReturn(true);
+ Mockito.doAnswer(
+ invocation -> {
+ ((Runnable) invocation.getArgument(0)).run();
+ return CompletableFuture.completedFuture(null);
+ })
+ .when(procedureSubmitter)
+ .submit(any(Runnable.class));
+
+ return new ParserTestContext(new PipeHeartbeatParser(configManager),
procedureManager);
+ }
+
+ private void setMetaChangeFlags(
+ final PipeHeartbeatParser parser,
+ final boolean needWriteConsensusOnConfigNodes,
+ final boolean needPushPipeMetaToDataNodes)
+ throws Exception {
+ setAtomicBooleanField(
+ parser, "needWriteConsensusOnConfigNodes",
needWriteConsensusOnConfigNodes);
+ setAtomicBooleanField(parser, "needPushPipeMetaToDataNodes",
needPushPipeMetaToDataNodes);
+ }
+
+ private void setAtomicBooleanField(
+ final PipeHeartbeatParser parser, final String fieldName, final boolean
value)
+ throws Exception {
+ final Field field = PipeHeartbeatParser.class.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ ((AtomicBoolean) field.get(parser)).set(value);
+ }
+
+ private PipeHeartbeat emptyHeartbeat() {
+ return new PipeHeartbeat(Collections.emptyList(), null, null, null);
+ }
+
+ private static class ParserTestContext {
+ private final PipeHeartbeatParser parser;
+ private final ProcedureManager procedureManager;
+
+ private ParserTestContext(
+ final PipeHeartbeatParser parser, final ProcedureManager
procedureManager) {
+ this.parser = parser;
+ this.procedureManager = procedureManager;
+ }
+ }
+}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java
index b76e291b0c3..75c0963a27f 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java
@@ -21,7 +21,10 @@ package
org.apache.iotdb.confignode.procedure.impl.pipe.runtime;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.confignode.procedure.Procedure;
+import org.apache.iotdb.confignode.procedure.state.ProcedureState;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.PublicBAOS;
@@ -45,6 +48,9 @@ public class PipeHandleLeaderChangeProcedureTest {
leaderMap.put(new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
new Pair<>(1, 2));
leaderMap.put(new TConsensusGroupId(TConsensusGroupType.DataRegion, 2),
new Pair<>(2, 3));
leaderMap.put(new TConsensusGroupId(TConsensusGroupType.DataRegion, 3),
new Pair<>(4, 5));
+ leaderMap.put(
+ new TConsensusGroupId(TConsensusGroupType.ConfigRegion,
Integer.MIN_VALUE),
+ new Pair<>(6, 7));
PipeHandleLeaderChangeProcedure proc = new
PipeHandleLeaderChangeProcedure(leaderMap);
@@ -60,4 +66,44 @@ public class PipeHandleLeaderChangeProcedureTest {
fail();
}
}
+
+ @Test
+ public void deserializeOldFormatConfigRegionTest() {
+ PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream);
+
+ Map<TConsensusGroupId, Pair<Integer, Integer>> leaderMap = new HashMap<>();
+ leaderMap.put(
+ new TConsensusGroupId(TConsensusGroupType.ConfigRegion,
Integer.MIN_VALUE),
+ new Pair<>(6, 7));
+
+ try {
+
outputStream.writeShort(ProcedureType.PIPE_HANDLE_LEADER_CHANGE_PROCEDURE.getTypeCode());
+ outputStream.writeLong(Procedure.NO_PROC_ID);
+ outputStream.writeInt(ProcedureState.INITIALIZING.ordinal());
+ outputStream.writeLong(0L);
+ outputStream.writeLong(0L);
+ outputStream.writeLong(Procedure.NO_PROC_ID);
+ outputStream.writeLong(Procedure.NO_TIMEOUT);
+ outputStream.writeInt(-1);
+ outputStream.write((byte) 0);
+ outputStream.writeInt(-1);
+ outputStream.write((byte) 0);
+ outputStream.writeInt(0);
+ outputStream.write((byte) 0);
+ outputStream.writeInt(leaderMap.size());
+ outputStream.writeInt(Integer.MIN_VALUE);
+ outputStream.writeInt(6);
+ outputStream.writeInt(7);
+
+ ByteBuffer buffer =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ PipeHandleLeaderChangeProcedure proc =
+ (PipeHandleLeaderChangeProcedure)
ProcedureFactory.getInstance().create(buffer);
+
+ assertEquals(new PipeHandleLeaderChangeProcedure(leaderMap), proc);
+ } catch (Exception e) {
+ fail();
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
index 8658d12b6a8..278c1ccaaef 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
@@ -178,6 +178,11 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
if (System.currentTimeMillis() - startTime
< PipeConfig.getInstance().getPipeAirGapRetryMaxMs()) {
handleReq(req, startTime);
+ } else {
+ LOGGER.warn(
+ "Pipe air gap receiver {}: Temporary unavailable retry timed out,
returning FAIL to sender.",
+ receiverId);
+ fail();
}
} else {
LOGGER.warn(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
index e0ef8e4072c..27ce077252e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiverAgent;
+import org.apache.iotdb.commons.pipe.receiver.PipeReceiverFilePathUtils;
import
org.apache.iotdb.commons.pipe.sink.payload.iotconsensusv2.request.IoTConsensusV2RequestType;
import
org.apache.iotdb.commons.pipe.sink.payload.iotconsensusv2.request.IoTConsensusV2RequestVersion;
import
org.apache.iotdb.commons.pipe.sink.payload.iotconsensusv2.request.IoTConsensusV2TransferFilePieceReq;
@@ -78,6 +79,7 @@ import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
@@ -517,16 +519,23 @@ public class IoTConsensusV2Receiver {
long startPreCheckNanos = System.nanoTime();
iotConsensusV2ReceiverMetrics.recordBorrowTsFileWriterTimer(
startPreCheckNanos - startBorrowTsFileWriterNanos);
- File writingFile = tsFileWriter.getWritingFile();
- RandomAccessFile writingFileWriter = tsFileWriter.getWritingFileWriter();
-
- File currentWritingDirPath = tsFileWriter.getLocalWritingDir();
+ final File writingFile = tsFileWriter.getWritingFile();
+ final RandomAccessFile writingFileWriter =
tsFileWriter.getWritingFileWriter();
- final List<File> files =
- req.getFileNames().stream()
- .map(fileName -> new File(currentWritingDirPath, fileName))
- .collect(Collectors.toList());
+ final File currentWritingDirPath = tsFileWriter.getLocalWritingDir();
try {
+ final List<File> files =
+ req.getFileNames().stream()
+ .map(
+ fileName -> {
+ try {
+ return resolveWritingFilePath(tsFileWriter,
fileName).toFile();
+ } catch (final IOException e) {
+ throw new IllegalArgumentException(e);
+ }
+ })
+ .collect(Collectors.toList());
+
if (isWritingFileNonAvailable(tsFileWriter)) {
final TSStatus status =
RpcUtils.getStatus(
@@ -601,16 +610,20 @@ public class IoTConsensusV2Receiver {
}
return new TIoTConsensusV2TransferResp(status);
} catch (Exception e) {
+ final Throwable rootCause = e instanceof IllegalArgumentException ?
e.getCause() : e;
LOGGER.warn(
"IoTConsensusV2-PipeName-{}: Failed to seal file {} from req {}.",
consensusPipeName,
- files,
+ req.getFileNames(),
req,
- e);
+ rootCause);
return new TIoTConsensusV2TransferResp(
RpcUtils.getStatus(
TSStatusCode.IOT_CONSENSUS_V2_TRANSFER_FILE_ERROR,
- String.format("Failed to seal file %s because %s", writingFile,
e.getMessage())));
+ String.format(
+ "Failed to seal file %s because %s",
+ req.getFileNames(),
+ rootCause == null ? e.getMessage() :
rootCause.getMessage())));
} finally {
// If the writing file is not sealed successfully, the writing file will
be deleted.
// All pieces of the writing file and its mod(if exists) should be
retransmitted by the
@@ -809,7 +822,22 @@ public class IoTConsensusV2Receiver {
private boolean isFileExistedAndNameCorrect(
IoTConsensusV2TsFileWriter tsFileWriter, String fileName) {
final File writingFile = tsFileWriter.getWritingFile();
- return writingFile != null && writingFile.getName().equals(fileName);
+ try {
+ return writingFile != null
+ && writingFile.exists()
+ && writingFile
+ .toPath()
+ .toAbsolutePath()
+ .normalize()
+ .equals(resolveWritingFilePath(tsFileWriter, fileName));
+ } catch (final IOException e) {
+ LOGGER.warn(
+ "IoTConsensusV2-PipeName-{}: Illegal file name {} when checking
writing file.",
+ consensusPipeName,
+ fileName,
+ e);
+ return false;
+ }
}
private boolean isWritingFileOffsetNonCorrect(
@@ -874,7 +902,7 @@ public class IoTConsensusV2Receiver {
}
// Every tsFileWriter has its own writing path.
// 1 Thread --> 1 connection --> 1 tsFileWriter --> 1 path
- tsFileWriter.setWritingFile(new File(tsFileWriter.getLocalWritingDir(),
fileName));
+ tsFileWriter.setWritingFile(resolveWritingFilePath(tsFileWriter,
fileName).toFile());
tsFileWriter.setWritingFileWriter(new
RandomAccessFile(tsFileWriter.getWritingFile(), "rw"));
LOGGER.info(
"IoTConsensusV2-PipeName-{}: Writing file {} was created. Ready to
write file pieces.",
@@ -882,6 +910,20 @@ public class IoTConsensusV2Receiver {
tsFileWriter.getWritingFile().getPath());
}
+ private Path resolveWritingFilePath(
+ final IoTConsensusV2TsFileWriter tsFileWriter, final String fileName)
throws IOException {
+ try {
+ return PipeReceiverFilePathUtils.resolveFilePath(
+ tsFileWriter.getLocalWritingDir().toPath(), fileName);
+ } catch (final IOException e) {
+ LOGGER.error(
+ "IoTConsensusV2-PipeName-{}: Path traversal attempt detected!
Filename: {}",
+ consensusPipeName,
+ fileName);
+ throw e;
+ }
+ }
+
private void initiateTsFileBufferFolder(List<String> receiverBaseDirsName)
throws IOException {
// initiate receiverFileDirs
for (String receiverFileBaseDir : receiverBaseDirsName) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
index 88a79146295..09580fec279 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
@@ -119,14 +119,15 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
receiverAttributes =
String.format(
- "%s-%s-%s-%s-%s-%s",
+ "%s-%s-%s-%s-%s-%s-%s",
Base64.getEncoder()
.encodeToString((userEntity.getUsername() + ":" +
password).getBytes()),
shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy,
validateTsFile,
shouldMarkAsPipeRequest,
- isTSFileUsed);
+ isTSFileUsed,
+ skipIfNoPrivileges);
synchronized (IoTDBDataNodeAsyncClientManager.class) {
if
(!ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.containsKey(receiverAttributes))
{
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.putIfAbsent(
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java
index 19dea8140a1..e23db1f1ca8 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java
@@ -19,18 +19,32 @@
package org.apache.iotdb.db.pipe.receiver.protocol.airgap;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiver;
import
org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapELanguageConstant;
+import org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapOneByteResponse;
+import
org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapPseudoTPipeTransferRequest;
+import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
+import
org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiverAgent;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.tsfile.utils.BytesUtils;
import org.junit.Assert;
import org.junit.Test;
import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.net.Socket;
+import java.nio.ByteBuffer;
public class IoTDBAirGapReceiverTest {
@@ -69,4 +83,93 @@ public class IoTDBAirGapReceiverTest {
Assert.assertThrows(IOException.class, () ->
receiver.readData(inputStream));
Assert.assertTrue(exception.getMessage().contains("nested E-Language
prefix"));
}
+
+ @Test
+ public void testTemporaryUnavailableRetryTimeoutReturnsFail() throws
Exception {
+ final CommonConfig commonConfig =
CommonDescriptor.getInstance().getConfig();
+ final long originalRetryLocalIntervalMs =
commonConfig.getPipeAirGapRetryLocalIntervalMs();
+ final long originalRetryMaxMs = commonConfig.getPipeAirGapRetryMaxMs();
+
+ try {
+ commonConfig.setPipeAirGapRetryLocalIntervalMs(0);
+ commonConfig.setPipeAirGapRetryMaxMs(1);
+
+ final RecordingSocket socket = new RecordingSocket();
+ final IoTDBAirGapReceiver receiver = new IoTDBAirGapReceiver(socket, 3L);
+ final StubIoTDBDataNodeReceiverAgent stubAgent = new
StubIoTDBDataNodeReceiverAgent();
+ stubAgent.setStubReceiver(
+ new StubReceiver(
+ new TSStatus(
+
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())));
+ setField(receiver, "agent", stubAgent);
+
+ final AirGapPseudoTPipeTransferRequest req = new
AirGapPseudoTPipeTransferRequest();
+ req.setVersion(IoTDBSinkRequestVersion.VERSION_1.getVersion());
+ req.setType((short) 0);
+ req.setBody(ByteBuffer.allocate(0));
+
+ final Method handleReq =
+ IoTDBAirGapReceiver.class.getDeclaredMethod(
+ "handleReq", AirGapPseudoTPipeTransferRequest.class, long.class);
+ handleReq.setAccessible(true);
+ handleReq.invoke(receiver, req, System.currentTimeMillis() - 10_000L);
+
+ Assert.assertArrayEquals(AirGapOneByteResponse.FAIL,
socket.getWrittenBytes());
+ } finally {
+
commonConfig.setPipeAirGapRetryLocalIntervalMs(originalRetryLocalIntervalMs);
+ commonConfig.setPipeAirGapRetryMaxMs(originalRetryMaxMs);
+ }
+ }
+
+ private static void setField(final Object target, final String fieldName,
final Object value)
+ throws Exception {
+ final Field field = IoTDBAirGapReceiver.class.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(target, value);
+ }
+
+ private static class RecordingSocket extends Socket {
+
+ private final ByteArrayOutputStream outputStream = new
ByteArrayOutputStream();
+
+ @Override
+ public OutputStream getOutputStream() {
+ return outputStream;
+ }
+
+ byte[] getWrittenBytes() {
+ return outputStream.toByteArray();
+ }
+ }
+
+ private static class StubIoTDBDataNodeReceiverAgent extends
IoTDBDataNodeReceiverAgent {
+
+ void setStubReceiver(final IoTDBReceiver receiver) {
+ setReceiverWithSpecifiedClient(null, receiver);
+ }
+ }
+
+ private static class StubReceiver implements IoTDBReceiver {
+
+ private final TPipeTransferResp response;
+
+ private StubReceiver(final TSStatus status) {
+ response = new TPipeTransferResp(status);
+ }
+
+ @Override
+ public TPipeTransferResp receive(final TPipeTransferReq req) {
+ return response;
+ }
+
+ @Override
+ public void handleExit() {
+ // noop for unit test
+ }
+
+ @Override
+ public IoTDBSinkRequestVersion getVersion() {
+ return IoTDBSinkRequestVersion.VERSION_1;
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/IoTDBDataNodeAsyncClientManagerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/IoTDBDataNodeAsyncClientManagerTest.java
new file mode 100644
index 00000000000..fb13e438dec
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/IoTDBDataNodeAsyncClientManagerTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.audit.UserEntity;
+import org.apache.iotdb.db.pipe.sink.client.IoTDBDataNodeAsyncClientManager;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.Collections;
+
+public class IoTDBDataNodeAsyncClientManagerTest {
+
+ @Test
+ public void testReceiverAttributesShouldDifferentiateSkipIfNoPrivileges()
throws Exception {
+ final IoTDBDataNodeAsyncClientManager managerWithSkipIf =
+ new IoTDBDataNodeAsyncClientManager(
+ Collections.singletonList(new TEndPoint("127.0.0.1", 6667)),
+ false,
+ "round-robin",
+ new UserEntity(1L, "user", "cli-host"),
+ "password",
+ true,
+ "sync",
+ true,
+ true,
+ false,
+ true);
+ final IoTDBDataNodeAsyncClientManager managerWithoutSkipIf =
+ new IoTDBDataNodeAsyncClientManager(
+ Collections.singletonList(new TEndPoint("127.0.0.1", 6667)),
+ false,
+ "round-robin",
+ new UserEntity(1L, "user", "cli-host"),
+ "password",
+ true,
+ "sync",
+ true,
+ true,
+ false,
+ false);
+
+ try {
+ Assert.assertNotEquals(
+ getReceiverAttributes(managerWithSkipIf),
getReceiverAttributes(managerWithoutSkipIf));
+ Assert.assertNotSame(
+ getEndPoint2Client(managerWithSkipIf),
getEndPoint2Client(managerWithoutSkipIf));
+ } finally {
+ managerWithSkipIf.close();
+ managerWithoutSkipIf.close();
+ }
+ }
+
+ private static String getReceiverAttributes(final
IoTDBDataNodeAsyncClientManager manager)
+ throws Exception {
+ final Field field =
+
IoTDBDataNodeAsyncClientManager.class.getDeclaredField("receiverAttributes");
+ field.setAccessible(true);
+ return (String) field.get(manager);
+ }
+
+ private static Object getEndPoint2Client(final
IoTDBDataNodeAsyncClientManager manager)
+ throws Exception {
+ final Field field =
IoTDBDataNodeAsyncClientManager.class.getDeclaredField("endPoint2Client");
+ field.setAccessible(true);
+ return field.get(manager);
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index e2484576a77..6879f88ced8 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -497,16 +497,7 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
receiverFileDirWithIdSuffix.get().getPath());
}
}
- Path baseDir =
receiverFileDirWithIdSuffix.get().toPath().toAbsolutePath().normalize();
- Path targetPath = baseDir.resolve(fileName).toAbsolutePath().normalize();
-
- if (!targetPath.startsWith(baseDir)) {
- LOGGER.error(
- "Receiver id = {}: Path traversal attempt detected! Filename: {}",
- receiverId.get(),
- fileName);
- throw new IOException("Illegal fileName: " + fileName + " (Path
traversal detected)");
- }
+ final Path targetPath = resolveReceiverFilePath(fileName);
writingFile = targetPath.toFile();
writingFileWriter = new RandomAccessFile(writingFile, "rw");
@@ -517,7 +508,37 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
}
private boolean isFileExistedAndNameCorrect(final String fileName) {
- return writingFile != null && writingFile.exists() &&
writingFile.getName().equals(fileName);
+ try {
+ return writingFile != null
+ && writingFile.exists()
+ && receiverFileDirWithIdSuffix.get() != null
+ && writingFile
+ .toPath()
+ .toAbsolutePath()
+ .normalize()
+ .equals(resolveReceiverFilePath(fileName));
+ } catch (final IOException e) {
+ PipeLogger.log(
+ LOGGER::warn,
+ e,
+ "Receiver id = %s: Illegal file name %s when checking writing file.",
+ receiverId.get(),
+ fileName);
+ return false;
+ }
+ }
+
+ private Path resolveReceiverFilePath(final String fileName) throws
IOException {
+ try {
+ return PipeReceiverFilePathUtils.resolveFilePath(
+ receiverFileDirWithIdSuffix.get().toPath(), fileName);
+ } catch (final IOException e) {
+ LOGGER.error(
+ "Receiver id = {}: Path traversal attempt detected! Filename: {}",
+ receiverId.get(),
+ fileName);
+ throw e;
+ }
}
private void closeCurrentWritingFileWriter(final boolean fsyncBeforeClose) {
@@ -680,15 +701,22 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
// Support null in fileName list, which means that this file is optional and
is currently absent
protected final TPipeTransferResp handleTransferFileSealV2(final
PipeTransferFileSealReqV2 req) {
final List<String> fileNames = req.getFileNames();
- final List<File> files =
- fileNames.stream()
- .map(
- fileName ->
- Objects.nonNull(fileName)
- ? new File(receiverFileDirWithIdSuffix.get(), fileName)
- : null)
- .collect(Collectors.toList());
try {
+ final List<File> files =
+ fileNames.stream()
+ .map(
+ fileName -> {
+ if (Objects.isNull(fileName)) {
+ return null;
+ }
+ try {
+ return resolveReceiverFilePath(fileName).toFile();
+ } catch (final IOException e) {
+ throw new IllegalArgumentException(e);
+ }
+ })
+ .collect(Collectors.toList());
+
if (!isWritingFileAvailable()) {
final TSStatus status =
RpcUtils.getStatus(
@@ -754,17 +782,20 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
}
return new TPipeTransferResp(status);
} catch (final Exception e) {
+ final Throwable rootCause = e instanceof IllegalArgumentException ?
e.getCause() : e;
PipeLogger.log(
LOGGER::warn,
- e,
+ rootCause,
"Receiver id = %s: Failed to seal file %s from req %s.",
receiverId.get(),
- files,
+ fileNames,
req);
return new TPipeTransferResp(
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
- String.format("Failed to seal file %s because %s", files,
e.getMessage())));
+ String.format(
+ "Failed to seal file %s because %s",
+ fileNames, rootCause == null ? e.getMessage() :
rootCause.getMessage())));
} finally {
// If the writing file is not sealed successfully, the writing file will
be deleted.
// All pieces of the writing file and its mod(if exists) should be
retransmitted by the
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverFilePathUtils.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverFilePathUtils.java
new file mode 100644
index 00000000000..bc7275d4ebe
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverFilePathUtils.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.receiver;
+
+import java.io.IOException;
+import java.nio.file.Path;
+
+public final class PipeReceiverFilePathUtils {
+
+ private PipeReceiverFilePathUtils() {
+ // Utility class
+ }
+
+ public static Path resolveFilePath(final Path baseDir, final String
fileName) throws IOException {
+ final Path normalizedBaseDir = baseDir.toAbsolutePath().normalize();
+ final Path normalizedTargetPath =
+ normalizedBaseDir.resolve(fileName).toAbsolutePath().normalize();
+
+ if (!normalizedTargetPath.startsWith(normalizedBaseDir)) {
+ throw new IOException("Illegal fileName: " + fileName + " (Path
traversal detected)");
+ }
+
+ return normalizedTargetPath;
+ }
+}
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java
index a372326433d..8d2db54d5b6 100644
---
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java
@@ -21,8 +21,10 @@ package org.apache.iotdb.commons.pipe.receiver;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV1;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV2;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
@@ -33,6 +35,8 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
public class IoTDBFileReceiverTest {
@@ -63,6 +67,25 @@ public class IoTDBFileReceiverTest {
}
}
+ @Test
+ public void testRejectPathTraversalFileNameInSealRequest() throws Exception {
+ final Path baseDir = Files.createTempDirectory("iotdb-file-receiver-test");
+ final DummyFileReceiver receiver = new DummyFileReceiver(baseDir.toFile());
+ try {
+ receiver.createWritingFile("normal.tsfile", false);
+
+ final TPipeTransferResp response =
+ receiver.sealFiles(
+ Arrays.asList("../outside.mod", "normal.tsfile"),
Arrays.asList(0L, 0L));
+
+ Assert.assertEquals(
+ TSStatusCode.PIPE_TRANSFER_FILE_ERROR.getStatusCode(),
response.getStatus().getCode());
+ Assert.assertTrue(response.getStatus().getMessage().contains("Illegal
fileName"));
+ } finally {
+ receiver.handleExit();
+ }
+ }
+
private static class DummyFileReceiver extends IoTDBFileReceiver {
DummyFileReceiver(final File baseDir) {
@@ -73,6 +96,12 @@ public class IoTDBFileReceiverTest {
updateWritingFileIfNeeded(fileName, isSingleFile);
}
+ TPipeTransferResp sealFiles(final List<String> fileNames, final List<Long>
fileLengths)
+ throws IOException {
+ return handleTransferFileSealV2(
+ DummyFileSealReqV2.toTPipeTransferReq(fileNames, fileLengths,
Collections.emptyMap()));
+ }
+
File getWritingFileInBaseDir(final String fileName) {
return
receiverFileDirWithIdSuffix.get().toPath().resolve(fileName).toFile();
}
@@ -130,4 +159,21 @@ public class IoTDBFileReceiverTest {
return null;
}
}
+
+ private static class DummyFileSealReqV2 extends PipeTransferFileSealReqV2 {
+
+ static DummyFileSealReqV2 toTPipeTransferReq(
+ final List<String> fileNames,
+ final List<Long> fileLengths,
+ final java.util.Map<String, String> parameters)
+ throws IOException {
+ return (DummyFileSealReqV2)
+ new DummyFileSealReqV2().convertToTPipeTransferReq(fileNames,
fileLengths, parameters);
+ }
+
+ @Override
+ protected PipeRequestType getPlanType() {
+ return PipeRequestType.TRANSFER_SCHEMA_SNAPSHOT_SEAL;
+ }
+ }
}