This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch 13-cnbug in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 992acf777d52be716055505113f6ff999b0d382d Author: Caideyipi <[email protected]> AuthorDate: Wed Apr 29 15:17:10 2026 +0800 Pipe: Fixed the bug that air gap receiver may not respond in temporary timeout exception & Optimized the directory check in receiver & Optimized the configNode pipe logic (#17556) --- .../iotdb/confignode/manager/ProcedureManager.java | 4 +- .../runtime/heartbeat/PipeHeartbeatParser.java | 38 +++-- .../runtime/PipeHandleLeaderChangeProcedure.java | 6 +- .../runtime/heartbeat/PipeHeartbeatParserTest.java | 182 +++++++++++++++++++++ .../PipeHandleLeaderChangeProcedureTest.java | 46 ++++++ .../protocol/airgap/IoTDBAirGapReceiver.java | 5 + .../protocol/airgap/IoTDBAirGapReceiverTest.java | 103 ++++++++++++ .../commons/pipe/receiver/IoTDBFileReceiver.java | 72 ++++++-- .../pipe/receiver/PipeReceiverFilePathUtils.java | 42 +++++ .../pipe/receiver/IoTDBFileReceiverTest.java | 46 ++++++ 10 files changed, 506 insertions(+), 38 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index f0f1b9e6bfb..4912f2dad5c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -1443,7 +1443,7 @@ public class ProcedureManager { } } - public void pipeHandleMetaChange( + public boolean pipeHandleMetaChange( boolean needWriteConsensusOnConfigNodes, boolean needPushPipeMetaToDataNodes) { try { final long procedureId = @@ -1451,8 +1451,10 @@ public class ProcedureManager { new PipeHandleMetaChangeProcedure( needWriteConsensusOnConfigNodes, needPushPipeMetaToDataNodes)); LOGGER.info("PipeHandleMetaChangeProcedure was submitted, procedureId: {}.", procedureId); + return true; } catch (Exception e) { LOGGER.warn("PipeHandleMetaChangeProcedure was failed to submit.", e); + return false; } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java index ace07f5e2d3..6dc11ddd3f3 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java @@ -59,7 +59,7 @@ public class PipeHeartbeatParser { this.configManager = configManager; heartbeatCounter = 0; - registeredNodeNumber = 1; + registeredNodeNumber = getExpectedHeartbeatNodeCount(); needWriteConsensusOnConfigNodes = new AtomicBoolean(false); needPushPipeMetaToDataNodes = new AtomicBoolean(false); @@ -73,17 +73,8 @@ public class PipeHeartbeatParser { if (heartbeatCount % registeredNodeNumber == 0) { canSubmitHandleMetaChangeProcedure.set(true); - // registeredNodeNumber may be changed, update it here when we can submit procedure - registeredNodeNumber = configManager.getNodeManager().getRegisteredNodeCount(); - if (registeredNodeNumber <= 0) { - LOGGER.warn( - "registeredNodeNumber is {} when parseHeartbeat from node (id={}).", - registeredNodeNumber, - nodeId); - // registeredNodeNumber can not be set to 0 in this class, otherwise may cause - // DivideByZeroException - registeredNodeNumber = 1; - } + // The expected reporter set may be changed, update it at the end of the current round. + registeredNodeNumber = getExpectedHeartbeatNodeCount(); } if (pipeHeartbeat.isEmpty() @@ -114,14 +105,14 @@ public class PipeHeartbeatParser { if (canSubmitHandleMetaChangeProcedure.get() && (needWriteConsensusOnConfigNodes.get() || needPushPipeMetaToDataNodes.get())) { - configManager + if (configManager .getProcedureManager() .pipeHandleMetaChange( - needWriteConsensusOnConfigNodes.get(), needPushPipeMetaToDataNodes.get()); - - // Reset flags after procedure is submitted - needWriteConsensusOnConfigNodes.set(false); - needPushPipeMetaToDataNodes.set(false); + needWriteConsensusOnConfigNodes.get(), + needPushPipeMetaToDataNodes.get())) { + needWriteConsensusOnConfigNodes.set(false); + needPushPipeMetaToDataNodes.set(false); + } } } finally { configManager.getPipeManager().getPipeTaskCoordinator().unlock(); @@ -129,6 +120,17 @@ public class PipeHeartbeatParser { }); } + private int getExpectedHeartbeatNodeCount() { + final int expectedNodeCount = + configManager.getNodeManager().getRegisteredDataNodeCount() + + (PipeConfig.getInstance().isSeperatedPipeHeartbeatEnabled() ? 1 : 0); + if (expectedNodeCount <= 0) { + LOGGER.warn("Expected pipe heartbeat node count is {}, fallback to 1.", expectedNodeCount); + return 1; + } + return expectedNodeCount; + } + private void parseHeartbeatAndSaveMetaChangeLocally( final AtomicReference<PipeTaskInfo> pipeTaskInfo, final int nodeId, diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java index f18737e8b7a..61f6f3cae2a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java @@ -164,7 +164,11 @@ public class PipeHandleLeaderChangeProcedure extends AbstractOperatePipeProcedur final int oldDataRegionLeaderId = ReadWriteIOUtils.readInt(byteBuffer); final int newDataRegionLeaderId = ReadWriteIOUtils.readInt(byteBuffer); regionGroupToOldAndNewLeaderPairMap.put( - new TConsensusGroupId(TConsensusGroupType.DataRegion, dataRegionGroupId), + new TConsensusGroupId( + dataRegionGroupId == Integer.MIN_VALUE + ? TConsensusGroupType.ConfigRegion + : TConsensusGroupType.DataRegion, + dataRegionGroupId), new Pair<>(oldDataRegionLeaderId, newDataRegionLeaderId)); } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParserTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParserTest.java new file mode 100644 index 00000000000..d5a46d42c84 --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParserTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat; + +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.confignode.manager.ConfigManager; +import org.apache.iotdb.confignode.manager.ProcedureManager; +import org.apache.iotdb.confignode.manager.node.NodeManager; +import org.apache.iotdb.confignode.manager.pipe.coordinator.PipeManager; +import org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.PipeRuntimeCoordinator; +import org.apache.iotdb.confignode.manager.pipe.coordinator.task.PipeTaskCoordinator; +import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class PipeHeartbeatParserTest { + + private boolean originalSeparatedPipeHeartbeatEnabled; + + @Before + public void setUp() { + originalSeparatedPipeHeartbeatEnabled = + CommonDescriptor.getInstance().getConfig().isSeperatedPipeHeartbeatEnabled(); + } + + @After + public void tearDown() { + CommonDescriptor.getInstance() + .getConfig() + .setSeperatedPipeHeartbeatEnabled(originalSeparatedPipeHeartbeatEnabled); + } + + @Test + public void testParseHeartbeatCountsOnlyDataNodesWhenSeparatedHeartbeatDisabled() + throws Exception { + CommonDescriptor.getInstance().getConfig().setSeperatedPipeHeartbeatEnabled(false); + + final ParserTestContext context = createParserTestContext(2); + setMetaChangeFlags(context.parser, true, false); + + context.parser.parseHeartbeat(1, emptyHeartbeat()); + verify(context.procedureManager, never()).pipeHandleMetaChange(anyBoolean(), anyBoolean()); + + context.parser.parseHeartbeat(2, emptyHeartbeat()); + verify(context.procedureManager, times(1)).pipeHandleMetaChange(true, false); + } + + @Test + public void testParseHeartbeatCountsLocalConfigNodeWhenSeparatedHeartbeatEnabled() + throws Exception { + CommonDescriptor.getInstance().getConfig().setSeperatedPipeHeartbeatEnabled(true); + + final ParserTestContext context = createParserTestContext(2); + setMetaChangeFlags(context.parser, true, false); + + context.parser.parseHeartbeat(1, emptyHeartbeat()); + context.parser.parseHeartbeat(2, emptyHeartbeat()); + verify(context.procedureManager, never()).pipeHandleMetaChange(anyBoolean(), anyBoolean()); + + context.parser.parseHeartbeat(3, emptyHeartbeat()); + verify(context.procedureManager, times(1)).pipeHandleMetaChange(true, false); + } + + @Test + public void testParseHeartbeatKeepsPendingFlagsWhenProcedureSubmissionFails() throws Exception { + CommonDescriptor.getInstance().getConfig().setSeperatedPipeHeartbeatEnabled(false); + + final ParserTestContext context = createParserTestContext(2); + when(context.procedureManager.pipeHandleMetaChange(anyBoolean(), anyBoolean())) + .thenReturn(false, true); + setMetaChangeFlags(context.parser, true, false); + + context.parser.parseHeartbeat(1, emptyHeartbeat()); + verify(context.procedureManager, never()).pipeHandleMetaChange(anyBoolean(), anyBoolean()); + + context.parser.parseHeartbeat(2, emptyHeartbeat()); + verify(context.procedureManager, times(1)).pipeHandleMetaChange(true, false); + + context.parser.parseHeartbeat(3, emptyHeartbeat()); + verify(context.procedureManager, times(1)).pipeHandleMetaChange(true, false); + + context.parser.parseHeartbeat(4, emptyHeartbeat()); + verify(context.procedureManager, times(2)).pipeHandleMetaChange(true, false); + } + + private ParserTestContext createParserTestContext(final int registeredDataNodeCount) { + final ConfigManager configManager = Mockito.mock(ConfigManager.class); + final NodeManager nodeManager = Mockito.mock(NodeManager.class); + final ProcedureManager procedureManager = Mockito.mock(ProcedureManager.class); + final PipeManager pipeManager = Mockito.mock(PipeManager.class); + final PipeRuntimeCoordinator pipeRuntimeCoordinator = + Mockito.mock(PipeRuntimeCoordinator.class); + final PipeTaskCoordinator pipeTaskCoordinator = Mockito.mock(PipeTaskCoordinator.class); + final ExecutorService procedureSubmitter = Mockito.mock(ExecutorService.class); + + when(configManager.getNodeManager()).thenReturn(nodeManager); + when(configManager.getProcedureManager()).thenReturn(procedureManager); + when(configManager.getPipeManager()).thenReturn(pipeManager); + when(nodeManager.getRegisteredDataNodeCount()).thenReturn(registeredDataNodeCount); + when(pipeManager.getPipeRuntimeCoordinator()).thenReturn(pipeRuntimeCoordinator); + when(pipeManager.getPipeTaskCoordinator()).thenReturn(pipeTaskCoordinator); + when(pipeRuntimeCoordinator.getProcedureSubmitter()).thenReturn(procedureSubmitter); + when(pipeTaskCoordinator.tryLock()).thenReturn(new AtomicReference<>(new PipeTaskInfo())); + when(procedureManager.pipeHandleMetaChange(anyBoolean(), anyBoolean())).thenReturn(true); + Mockito.doAnswer( + invocation -> { + ((Runnable) invocation.getArgument(0)).run(); + return CompletableFuture.completedFuture(null); + }) + .when(procedureSubmitter) + .submit(any(Runnable.class)); + + return new ParserTestContext(new PipeHeartbeatParser(configManager), procedureManager); + } + + private void setMetaChangeFlags( + final PipeHeartbeatParser parser, + final boolean needWriteConsensusOnConfigNodes, + final boolean needPushPipeMetaToDataNodes) + throws Exception { + setAtomicBooleanField( + parser, "needWriteConsensusOnConfigNodes", needWriteConsensusOnConfigNodes); + setAtomicBooleanField(parser, "needPushPipeMetaToDataNodes", needPushPipeMetaToDataNodes); + } + + private void setAtomicBooleanField( + final PipeHeartbeatParser parser, final String fieldName, final boolean value) + throws Exception { + final Field field = PipeHeartbeatParser.class.getDeclaredField(fieldName); + field.setAccessible(true); + ((AtomicBoolean) field.get(parser)).set(value); + } + + private PipeHeartbeat emptyHeartbeat() { + return new PipeHeartbeat(Collections.emptyList(), null, null, null); + } + + private static class ParserTestContext { + private final PipeHeartbeatParser parser; + private final ProcedureManager procedureManager; + + private ParserTestContext( + final PipeHeartbeatParser parser, final ProcedureManager procedureManager) { + this.parser = parser; + this.procedureManager = procedureManager; + } + } +} diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java index b76e291b0c3..75c0963a27f 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java @@ -21,7 +21,10 @@ package org.apache.iotdb.confignode.procedure.impl.pipe.runtime; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.confignode.procedure.Procedure; +import org.apache.iotdb.confignode.procedure.state.ProcedureState; import org.apache.iotdb.confignode.procedure.store.ProcedureFactory; +import org.apache.iotdb.confignode.procedure.store.ProcedureType; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.PublicBAOS; @@ -45,6 +48,9 @@ public class PipeHandleLeaderChangeProcedureTest { leaderMap.put(new TConsensusGroupId(TConsensusGroupType.DataRegion, 1), new Pair<>(1, 2)); leaderMap.put(new TConsensusGroupId(TConsensusGroupType.DataRegion, 2), new Pair<>(2, 3)); leaderMap.put(new TConsensusGroupId(TConsensusGroupType.DataRegion, 3), new Pair<>(4, 5)); + leaderMap.put( + new TConsensusGroupId(TConsensusGroupType.ConfigRegion, Integer.MIN_VALUE), + new Pair<>(6, 7)); PipeHandleLeaderChangeProcedure proc = new PipeHandleLeaderChangeProcedure(leaderMap); @@ -60,4 +66,44 @@ public class PipeHandleLeaderChangeProcedureTest { fail(); } } + + @Test + public void deserializeOldFormatConfigRegionTest() { + PublicBAOS byteArrayOutputStream = new PublicBAOS(); + DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); + + Map<TConsensusGroupId, Pair<Integer, Integer>> leaderMap = new HashMap<>(); + leaderMap.put( + new TConsensusGroupId(TConsensusGroupType.ConfigRegion, Integer.MIN_VALUE), + new Pair<>(6, 7)); + + try { + outputStream.writeShort(ProcedureType.PIPE_HANDLE_LEADER_CHANGE_PROCEDURE.getTypeCode()); + outputStream.writeLong(Procedure.NO_PROC_ID); + outputStream.writeInt(ProcedureState.INITIALIZING.ordinal()); + outputStream.writeLong(0L); + outputStream.writeLong(0L); + outputStream.writeLong(Procedure.NO_PROC_ID); + outputStream.writeLong(Procedure.NO_TIMEOUT); + outputStream.writeInt(-1); + outputStream.write((byte) 0); + outputStream.writeInt(-1); + outputStream.write((byte) 0); + outputStream.writeInt(0); + outputStream.write((byte) 0); + outputStream.writeInt(leaderMap.size()); + outputStream.writeInt(Integer.MIN_VALUE); + outputStream.writeInt(6); + outputStream.writeInt(7); + + ByteBuffer buffer = + ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + PipeHandleLeaderChangeProcedure proc = + (PipeHandleLeaderChangeProcedure) ProcedureFactory.getInstance().create(buffer); + + assertEquals(new PipeHandleLeaderChangeProcedure(leaderMap), proc); + } catch (Exception e) { + fail(); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java index 8658d12b6a8..278c1ccaaef 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java @@ -178,6 +178,11 @@ public class IoTDBAirGapReceiver extends WrappedRunnable { if (System.currentTimeMillis() - startTime < PipeConfig.getInstance().getPipeAirGapRetryMaxMs()) { handleReq(req, startTime); + } else { + LOGGER.warn( + "Pipe air gap receiver {}: Temporary unavailable retry timed out, returning FAIL to sender.", + receiverId); + fail(); } } else { LOGGER.warn( diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java index 19dea8140a1..e23db1f1ca8 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java @@ -19,18 +19,32 @@ package org.apache.iotdb.db.pipe.receiver.protocol.airgap; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiver; import org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapELanguageConstant; +import org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapOneByteResponse; +import org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapPseudoTPipeTransferRequest; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion; +import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiverAgent; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; import org.apache.tsfile.utils.BytesUtils; import org.junit.Assert; import org.junit.Test; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; +import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.net.Socket; +import java.nio.ByteBuffer; public class IoTDBAirGapReceiverTest { @@ -69,4 +83,93 @@ public class IoTDBAirGapReceiverTest { Assert.assertThrows(IOException.class, () -> receiver.readData(inputStream)); Assert.assertTrue(exception.getMessage().contains("nested E-Language prefix")); } + + @Test + public void testTemporaryUnavailableRetryTimeoutReturnsFail() throws Exception { + final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig(); + final long originalRetryLocalIntervalMs = commonConfig.getPipeAirGapRetryLocalIntervalMs(); + final long originalRetryMaxMs = commonConfig.getPipeAirGapRetryMaxMs(); + + try { + commonConfig.setPipeAirGapRetryLocalIntervalMs(0); + commonConfig.setPipeAirGapRetryMaxMs(1); + + final RecordingSocket socket = new RecordingSocket(); + final IoTDBAirGapReceiver receiver = new IoTDBAirGapReceiver(socket, 3L); + final StubIoTDBDataNodeReceiverAgent stubAgent = new StubIoTDBDataNodeReceiverAgent(); + stubAgent.setStubReceiver( + new StubReceiver( + new TSStatus( + TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()))); + setField(receiver, "agent", stubAgent); + + final AirGapPseudoTPipeTransferRequest req = new AirGapPseudoTPipeTransferRequest(); + req.setVersion(IoTDBSinkRequestVersion.VERSION_1.getVersion()); + req.setType((short) 0); + req.setBody(ByteBuffer.allocate(0)); + + final Method handleReq = + IoTDBAirGapReceiver.class.getDeclaredMethod( + "handleReq", AirGapPseudoTPipeTransferRequest.class, long.class); + handleReq.setAccessible(true); + handleReq.invoke(receiver, req, System.currentTimeMillis() - 10_000L); + + Assert.assertArrayEquals(AirGapOneByteResponse.FAIL, socket.getWrittenBytes()); + } finally { + commonConfig.setPipeAirGapRetryLocalIntervalMs(originalRetryLocalIntervalMs); + commonConfig.setPipeAirGapRetryMaxMs(originalRetryMaxMs); + } + } + + private static void setField(final Object target, final String fieldName, final Object value) + throws Exception { + final Field field = IoTDBAirGapReceiver.class.getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); + } + + private static class RecordingSocket extends Socket { + + private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + + @Override + public OutputStream getOutputStream() { + return outputStream; + } + + byte[] getWrittenBytes() { + return outputStream.toByteArray(); + } + } + + private static class StubIoTDBDataNodeReceiverAgent extends IoTDBDataNodeReceiverAgent { + + void setStubReceiver(final IoTDBReceiver receiver) { + setReceiverWithSpecifiedClient(null, receiver); + } + } + + private static class StubReceiver implements IoTDBReceiver { + + private final TPipeTransferResp response; + + private StubReceiver(final TSStatus status) { + response = new TPipeTransferResp(status); + } + + @Override + public TPipeTransferResp receive(final TPipeTransferReq req) { + return response; + } + + @Override + public void handleExit() { + // noop for unit test + } + + @Override + public IoTDBSinkRequestVersion getVersion() { + return IoTDBSinkRequestVersion.VERSION_1; + } + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java index 9153acf9b2e..61d9155f996 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java @@ -461,16 +461,7 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { receiverFileDirWithIdSuffix.get().getPath()); } } - Path baseDir = receiverFileDirWithIdSuffix.get().toPath().toAbsolutePath().normalize(); - Path targetPath = baseDir.resolve(fileName).toAbsolutePath().normalize(); - - if (!targetPath.startsWith(baseDir)) { - LOGGER.error( - "Receiver id = {}: Path traversal attempt detected! Filename: {}", - receiverId.get(), - fileName); - throw new IOException("Illegal fileName: " + fileName + " (Path traversal detected)"); - } + final Path targetPath = resolveReceiverFilePath(fileName); writingFile = targetPath.toFile(); writingFileWriter = new RandomAccessFile(writingFile, "rw"); @@ -481,7 +472,37 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { } private boolean isFileExistedAndNameCorrect(final String fileName) { - return writingFile != null && writingFile.exists() && writingFile.getName().equals(fileName); + try { + return writingFile != null + && writingFile.exists() + && receiverFileDirWithIdSuffix.get() != null + && writingFile + .toPath() + .toAbsolutePath() + .normalize() + .equals(resolveReceiverFilePath(fileName)); + } catch (final IOException e) { + PipeLogger.log( + LOGGER::warn, + e, + "Receiver id = %s: Illegal file name %s when checking writing file.", + receiverId.get(), + fileName); + return false; + } + } + + private Path resolveReceiverFilePath(final String fileName) throws IOException { + try { + return PipeReceiverFilePathUtils.resolveFilePath( + receiverFileDirWithIdSuffix.get().toPath(), fileName); + } catch (final IOException e) { + LOGGER.error( + "Receiver id = {}: Path traversal attempt detected! Filename: {}", + receiverId.get(), + fileName); + throw e; + } } private void closeCurrentWritingFileWriter(final boolean fsyncAfterClose) { @@ -641,11 +662,23 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { } protected final TPipeTransferResp handleTransferFileSealV2(final PipeTransferFileSealReqV2 req) { - final List<File> files = - req.getFileNames().stream() - .map(fileName -> new File(receiverFileDirWithIdSuffix.get(), fileName)) - .collect(Collectors.toList()); + final List<String> fileNames = req.getFileNames(); try { + final List<File> files = + fileNames.stream() + .map( + fileName -> { + if (Objects.isNull(fileName)) { + return null; + } + try { + return resolveReceiverFilePath(fileName).toFile(); + } catch (final IOException e) { + throw new IllegalArgumentException(e); + } + }) + .collect(Collectors.toList()); + if (!isWritingFileAvailable()) { final TSStatus status = RpcUtils.getStatus( @@ -707,17 +740,20 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { } return new TPipeTransferResp(status); } catch (final Exception e) { + final Throwable rootCause = e instanceof IllegalArgumentException ? e.getCause() : e; PipeLogger.log( LOGGER::warn, - e, + rootCause, "Receiver id = %s: Failed to seal file %s from req %s.", receiverId.get(), - files, + fileNames, req); return new TPipeTransferResp( RpcUtils.getStatus( TSStatusCode.PIPE_TRANSFER_FILE_ERROR, - String.format("Failed to seal file %s because %s", writingFile, e.getMessage()))); + String.format( + "Failed to seal file %s because %s", + fileNames, rootCause == null ? e.getMessage() : rootCause.getMessage()))); } finally { // If the writing file is not sealed successfully, the writing file will be deleted. // All pieces of the writing file and its mod(if exists) should be retransmitted by the diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverFilePathUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverFilePathUtils.java new file mode 100644 index 00000000000..bc7275d4ebe --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverFilePathUtils.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.receiver; + +import java.io.IOException; +import java.nio.file.Path; + +public final class PipeReceiverFilePathUtils { + + private PipeReceiverFilePathUtils() { + // Utility class + } + + public static Path resolveFilePath(final Path baseDir, final String fileName) throws IOException { + final Path normalizedBaseDir = baseDir.toAbsolutePath().normalize(); + final Path normalizedTargetPath = + normalizedBaseDir.resolve(fileName).toAbsolutePath().normalize(); + + if (!normalizedTargetPath.startsWith(normalizedBaseDir)) { + throw new IOException("Illegal fileName: " + fileName + " (Path traversal detected)"); + } + + return normalizedTargetPath; + } +} diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java index 6658927b9bb..337cece8a48 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java @@ -21,8 +21,10 @@ package org.apache.iotdb.commons.pipe.receiver; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV1; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV2; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; @@ -33,6 +35,8 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; import java.util.List; public class IoTDBFileReceiverTest { @@ -63,6 +67,25 @@ public class IoTDBFileReceiverTest { } } + @Test + public void testRejectPathTraversalFileNameInSealRequest() throws Exception { + final Path baseDir = Files.createTempDirectory("iotdb-file-receiver-test"); + final DummyFileReceiver receiver = new DummyFileReceiver(baseDir.toFile()); + try { + receiver.createWritingFile("normal.tsfile", false); + + final TPipeTransferResp response = + receiver.sealFiles( + Arrays.asList("../outside.mod", "normal.tsfile"), Arrays.asList(0L, 0L)); + + Assert.assertEquals( + TSStatusCode.PIPE_TRANSFER_FILE_ERROR.getStatusCode(), response.getStatus().getCode()); + Assert.assertTrue(response.getStatus().getMessage().contains("Illegal fileName")); + } finally { + receiver.handleExit(); + } + } + private static class DummyFileReceiver extends IoTDBFileReceiver { DummyFileReceiver(final File baseDir) { @@ -73,6 +96,12 @@ public class IoTDBFileReceiverTest { updateWritingFileIfNeeded(fileName, isSingleFile); } + TPipeTransferResp sealFiles(final List<String> fileNames, final List<Long> fileLengths) + throws IOException { + return handleTransferFileSealV2( + DummyFileSealReqV2.toTPipeTransferReq(fileNames, fileLengths, Collections.emptyMap())); + } + File getWritingFileInBaseDir(final String fileName) { return receiverFileDirWithIdSuffix.get().toPath().resolve(fileName).toFile(); } @@ -125,4 +154,21 @@ public class IoTDBFileReceiverTest { return null; } } + + private static class DummyFileSealReqV2 extends PipeTransferFileSealReqV2 { + + static DummyFileSealReqV2 toTPipeTransferReq( + final List<String> fileNames, + final List<Long> fileLengths, + final java.util.Map<String, String> parameters) + throws IOException { + return (DummyFileSealReqV2) + new DummyFileSealReqV2().convertToTPipeTransferReq(fileNames, fileLengths, parameters); + } + + @Override + protected PipeRequestType getPlanType() { + return PipeRequestType.TRANSFER_SCHEMA_SNAPSHOT_SEAL; + } + } }
