This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 291f01c6041 [To dev/1.3] Pipe: Fixed the bug that air gap receiver may
not respond in temporary timeout exception & Optimized the directory check in
receiver & Optimized the configNode pipe logic (#17556) & Optimized the clear
logic of Schema Region (#17553) (#17576)
291f01c6041 is described below
commit 291f01c60410901700a96b1cd3dc110d0500b61f
Author: Caideyipi <[email protected]>
AuthorDate: Thu Apr 30 09:38:31 2026 +0800
[To dev/1.3] Pipe: Fixed the bug that air gap receiver may not respond in
temporary timeout exception & Optimized the directory check in receiver &
Optimized the configNode pipe logic (#17556) & Optimized the clear logic of
Schema Region (#17553) (#17576)
* Optimized the clear logic of Schema Region && Pipe: Fixed the bug that
the historical pipe does not work for deletion-only sync (#17553)
* Pipe: Fixed the bug that air gap receiver may not respond in temporary
timeout exception & Optimized the directory check in receiver & Optimized the
configNode pipe logic (#17556)
---
.../iotdb/confignode/manager/ProcedureManager.java | 4 +-
.../runtime/heartbeat/PipeHeartbeatParser.java | 38 +++--
.../runtime/PipeHandleLeaderChangeProcedure.java | 6 +-
.../runtime/heartbeat/PipeHeartbeatParserTest.java | 182 +++++++++++++++++++++
.../PipeHandleLeaderChangeProcedureTest.java | 46 ++++++
.../protocol/airgap/IoTDBAirGapReceiver.java | 5 +
.../schemaregion/impl/SchemaRegionMemoryImpl.java | 4 +-
.../protocol/airgap/IoTDBAirGapReceiverTest.java | 103 ++++++++++++
.../commons/pipe/receiver/IoTDBFileReceiver.java | 72 ++++++--
.../pipe/receiver/PipeReceiverFilePathUtils.java | 42 +++++
.../pipe/receiver/IoTDBFileReceiverTest.java | 46 ++++++
11 files changed, 509 insertions(+), 39 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index f0f1b9e6bfb..4912f2dad5c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -1443,7 +1443,7 @@ public class ProcedureManager {
}
}
- public void pipeHandleMetaChange(
+ public boolean pipeHandleMetaChange(
boolean needWriteConsensusOnConfigNodes, boolean
needPushPipeMetaToDataNodes) {
try {
final long procedureId =
@@ -1451,8 +1451,10 @@ public class ProcedureManager {
new PipeHandleMetaChangeProcedure(
needWriteConsensusOnConfigNodes,
needPushPipeMetaToDataNodes));
LOGGER.info("PipeHandleMetaChangeProcedure was submitted, procedureId:
{}.", procedureId);
+ return true;
} catch (Exception e) {
LOGGER.warn("PipeHandleMetaChangeProcedure was failed to submit.", e);
+ return false;
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
index ace07f5e2d3..6dc11ddd3f3 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
@@ -59,7 +59,7 @@ public class PipeHeartbeatParser {
this.configManager = configManager;
heartbeatCounter = 0;
- registeredNodeNumber = 1;
+ registeredNodeNumber = getExpectedHeartbeatNodeCount();
needWriteConsensusOnConfigNodes = new AtomicBoolean(false);
needPushPipeMetaToDataNodes = new AtomicBoolean(false);
@@ -73,17 +73,8 @@ public class PipeHeartbeatParser {
if (heartbeatCount % registeredNodeNumber == 0) {
canSubmitHandleMetaChangeProcedure.set(true);
- // registeredNodeNumber may be changed, update it here when we can
submit procedure
- registeredNodeNumber =
configManager.getNodeManager().getRegisteredNodeCount();
- if (registeredNodeNumber <= 0) {
- LOGGER.warn(
- "registeredNodeNumber is {} when parseHeartbeat from node
(id={}).",
- registeredNodeNumber,
- nodeId);
- // registeredNodeNumber can not be set to 0 in this class, otherwise
may cause
- // DivideByZeroException
- registeredNodeNumber = 1;
- }
+ // The expected reporter set may be changed, update it at the end of the
current round.
+ registeredNodeNumber = getExpectedHeartbeatNodeCount();
}
if (pipeHeartbeat.isEmpty()
@@ -114,14 +105,14 @@ public class PipeHeartbeatParser {
if (canSubmitHandleMetaChangeProcedure.get()
&& (needWriteConsensusOnConfigNodes.get()
|| needPushPipeMetaToDataNodes.get())) {
- configManager
+ if (configManager
.getProcedureManager()
.pipeHandleMetaChange(
- needWriteConsensusOnConfigNodes.get(),
needPushPipeMetaToDataNodes.get());
-
- // Reset flags after procedure is submitted
- needWriteConsensusOnConfigNodes.set(false);
- needPushPipeMetaToDataNodes.set(false);
+ needWriteConsensusOnConfigNodes.get(),
+ needPushPipeMetaToDataNodes.get())) {
+ needWriteConsensusOnConfigNodes.set(false);
+ needPushPipeMetaToDataNodes.set(false);
+ }
}
} finally {
configManager.getPipeManager().getPipeTaskCoordinator().unlock();
@@ -129,6 +120,17 @@ public class PipeHeartbeatParser {
});
}
+ private int getExpectedHeartbeatNodeCount() {
+ final int expectedNodeCount =
+ configManager.getNodeManager().getRegisteredDataNodeCount()
+ + (PipeConfig.getInstance().isSeperatedPipeHeartbeatEnabled() ? 1
: 0);
+ if (expectedNodeCount <= 0) {
+ LOGGER.warn("Expected pipe heartbeat node count is {}, fallback to 1.",
expectedNodeCount);
+ return 1;
+ }
+ return expectedNodeCount;
+ }
+
private void parseHeartbeatAndSaveMetaChangeLocally(
final AtomicReference<PipeTaskInfo> pipeTaskInfo,
final int nodeId,
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
index f18737e8b7a..61f6f3cae2a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
@@ -164,7 +164,11 @@ public class PipeHandleLeaderChangeProcedure extends
AbstractOperatePipeProcedur
final int oldDataRegionLeaderId = ReadWriteIOUtils.readInt(byteBuffer);
final int newDataRegionLeaderId = ReadWriteIOUtils.readInt(byteBuffer);
regionGroupToOldAndNewLeaderPairMap.put(
- new TConsensusGroupId(TConsensusGroupType.DataRegion,
dataRegionGroupId),
+ new TConsensusGroupId(
+ dataRegionGroupId == Integer.MIN_VALUE
+ ? TConsensusGroupType.ConfigRegion
+ : TConsensusGroupType.DataRegion,
+ dataRegionGroupId),
new Pair<>(oldDataRegionLeaderId, newDataRegionLeaderId));
}
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParserTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParserTest.java
new file mode 100644
index 00000000000..d5a46d42c84
--- /dev/null
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParserTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat;
+
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.ProcedureManager;
+import org.apache.iotdb.confignode.manager.node.NodeManager;
+import org.apache.iotdb.confignode.manager.pipe.coordinator.PipeManager;
+import
org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.PipeRuntimeCoordinator;
+import
org.apache.iotdb.confignode.manager.pipe.coordinator.task.PipeTaskCoordinator;
+import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class PipeHeartbeatParserTest {
+
+ private boolean originalSeparatedPipeHeartbeatEnabled;
+
+ @Before
+ public void setUp() {
+ originalSeparatedPipeHeartbeatEnabled =
+
CommonDescriptor.getInstance().getConfig().isSeperatedPipeHeartbeatEnabled();
+ }
+
+ @After
+ public void tearDown() {
+ CommonDescriptor.getInstance()
+ .getConfig()
+
.setSeperatedPipeHeartbeatEnabled(originalSeparatedPipeHeartbeatEnabled);
+ }
+
+ @Test
+ public void
testParseHeartbeatCountsOnlyDataNodesWhenSeparatedHeartbeatDisabled()
+ throws Exception {
+
CommonDescriptor.getInstance().getConfig().setSeperatedPipeHeartbeatEnabled(false);
+
+ final ParserTestContext context = createParserTestContext(2);
+ setMetaChangeFlags(context.parser, true, false);
+
+ context.parser.parseHeartbeat(1, emptyHeartbeat());
+ verify(context.procedureManager,
never()).pipeHandleMetaChange(anyBoolean(), anyBoolean());
+
+ context.parser.parseHeartbeat(2, emptyHeartbeat());
+ verify(context.procedureManager, times(1)).pipeHandleMetaChange(true,
false);
+ }
+
+ @Test
+ public void
testParseHeartbeatCountsLocalConfigNodeWhenSeparatedHeartbeatEnabled()
+ throws Exception {
+
CommonDescriptor.getInstance().getConfig().setSeperatedPipeHeartbeatEnabled(true);
+
+ final ParserTestContext context = createParserTestContext(2);
+ setMetaChangeFlags(context.parser, true, false);
+
+ context.parser.parseHeartbeat(1, emptyHeartbeat());
+ context.parser.parseHeartbeat(2, emptyHeartbeat());
+ verify(context.procedureManager,
never()).pipeHandleMetaChange(anyBoolean(), anyBoolean());
+
+ context.parser.parseHeartbeat(3, emptyHeartbeat());
+ verify(context.procedureManager, times(1)).pipeHandleMetaChange(true,
false);
+ }
+
+ @Test
+ public void
testParseHeartbeatKeepsPendingFlagsWhenProcedureSubmissionFails() throws
Exception {
+
CommonDescriptor.getInstance().getConfig().setSeperatedPipeHeartbeatEnabled(false);
+
+ final ParserTestContext context = createParserTestContext(2);
+ when(context.procedureManager.pipeHandleMetaChange(anyBoolean(),
anyBoolean()))
+ .thenReturn(false, true);
+ setMetaChangeFlags(context.parser, true, false);
+
+ context.parser.parseHeartbeat(1, emptyHeartbeat());
+ verify(context.procedureManager,
never()).pipeHandleMetaChange(anyBoolean(), anyBoolean());
+
+ context.parser.parseHeartbeat(2, emptyHeartbeat());
+ verify(context.procedureManager, times(1)).pipeHandleMetaChange(true,
false);
+
+ context.parser.parseHeartbeat(3, emptyHeartbeat());
+ verify(context.procedureManager, times(1)).pipeHandleMetaChange(true,
false);
+
+ context.parser.parseHeartbeat(4, emptyHeartbeat());
+ verify(context.procedureManager, times(2)).pipeHandleMetaChange(true,
false);
+ }
+
+ private ParserTestContext createParserTestContext(final int
registeredDataNodeCount) {
+ final ConfigManager configManager = Mockito.mock(ConfigManager.class);
+ final NodeManager nodeManager = Mockito.mock(NodeManager.class);
+ final ProcedureManager procedureManager =
Mockito.mock(ProcedureManager.class);
+ final PipeManager pipeManager = Mockito.mock(PipeManager.class);
+ final PipeRuntimeCoordinator pipeRuntimeCoordinator =
+ Mockito.mock(PipeRuntimeCoordinator.class);
+ final PipeTaskCoordinator pipeTaskCoordinator =
Mockito.mock(PipeTaskCoordinator.class);
+ final ExecutorService procedureSubmitter =
Mockito.mock(ExecutorService.class);
+
+ when(configManager.getNodeManager()).thenReturn(nodeManager);
+ when(configManager.getProcedureManager()).thenReturn(procedureManager);
+ when(configManager.getPipeManager()).thenReturn(pipeManager);
+
when(nodeManager.getRegisteredDataNodeCount()).thenReturn(registeredDataNodeCount);
+
when(pipeManager.getPipeRuntimeCoordinator()).thenReturn(pipeRuntimeCoordinator);
+ when(pipeManager.getPipeTaskCoordinator()).thenReturn(pipeTaskCoordinator);
+
when(pipeRuntimeCoordinator.getProcedureSubmitter()).thenReturn(procedureSubmitter);
+ when(pipeTaskCoordinator.tryLock()).thenReturn(new AtomicReference<>(new
PipeTaskInfo()));
+ when(procedureManager.pipeHandleMetaChange(anyBoolean(),
anyBoolean())).thenReturn(true);
+ Mockito.doAnswer(
+ invocation -> {
+ ((Runnable) invocation.getArgument(0)).run();
+ return CompletableFuture.completedFuture(null);
+ })
+ .when(procedureSubmitter)
+ .submit(any(Runnable.class));
+
+ return new ParserTestContext(new PipeHeartbeatParser(configManager),
procedureManager);
+ }
+
+ private void setMetaChangeFlags(
+ final PipeHeartbeatParser parser,
+ final boolean needWriteConsensusOnConfigNodes,
+ final boolean needPushPipeMetaToDataNodes)
+ throws Exception {
+ setAtomicBooleanField(
+ parser, "needWriteConsensusOnConfigNodes",
needWriteConsensusOnConfigNodes);
+ setAtomicBooleanField(parser, "needPushPipeMetaToDataNodes",
needPushPipeMetaToDataNodes);
+ }
+
+ private void setAtomicBooleanField(
+ final PipeHeartbeatParser parser, final String fieldName, final boolean
value)
+ throws Exception {
+ final Field field = PipeHeartbeatParser.class.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ ((AtomicBoolean) field.get(parser)).set(value);
+ }
+
+ private PipeHeartbeat emptyHeartbeat() {
+ return new PipeHeartbeat(Collections.emptyList(), null, null, null);
+ }
+
+ private static class ParserTestContext {
+ private final PipeHeartbeatParser parser;
+ private final ProcedureManager procedureManager;
+
+ private ParserTestContext(
+ final PipeHeartbeatParser parser, final ProcedureManager
procedureManager) {
+ this.parser = parser;
+ this.procedureManager = procedureManager;
+ }
+ }
+}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java
index b76e291b0c3..75c0963a27f 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java
@@ -21,7 +21,10 @@ package
org.apache.iotdb.confignode.procedure.impl.pipe.runtime;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.confignode.procedure.Procedure;
+import org.apache.iotdb.confignode.procedure.state.ProcedureState;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.PublicBAOS;
@@ -45,6 +48,9 @@ public class PipeHandleLeaderChangeProcedureTest {
leaderMap.put(new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
new Pair<>(1, 2));
leaderMap.put(new TConsensusGroupId(TConsensusGroupType.DataRegion, 2),
new Pair<>(2, 3));
leaderMap.put(new TConsensusGroupId(TConsensusGroupType.DataRegion, 3),
new Pair<>(4, 5));
+ leaderMap.put(
+ new TConsensusGroupId(TConsensusGroupType.ConfigRegion,
Integer.MIN_VALUE),
+ new Pair<>(6, 7));
PipeHandleLeaderChangeProcedure proc = new
PipeHandleLeaderChangeProcedure(leaderMap);
@@ -60,4 +66,44 @@ public class PipeHandleLeaderChangeProcedureTest {
fail();
}
}
+
+ @Test
+ public void deserializeOldFormatConfigRegionTest() {
+ PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream);
+
+ Map<TConsensusGroupId, Pair<Integer, Integer>> leaderMap = new HashMap<>();
+ leaderMap.put(
+ new TConsensusGroupId(TConsensusGroupType.ConfigRegion,
Integer.MIN_VALUE),
+ new Pair<>(6, 7));
+
+ try {
+
outputStream.writeShort(ProcedureType.PIPE_HANDLE_LEADER_CHANGE_PROCEDURE.getTypeCode());
+ outputStream.writeLong(Procedure.NO_PROC_ID);
+ outputStream.writeInt(ProcedureState.INITIALIZING.ordinal());
+ outputStream.writeLong(0L);
+ outputStream.writeLong(0L);
+ outputStream.writeLong(Procedure.NO_PROC_ID);
+ outputStream.writeLong(Procedure.NO_TIMEOUT);
+ outputStream.writeInt(-1);
+ outputStream.write((byte) 0);
+ outputStream.writeInt(-1);
+ outputStream.write((byte) 0);
+ outputStream.writeInt(0);
+ outputStream.write((byte) 0);
+ outputStream.writeInt(leaderMap.size());
+ outputStream.writeInt(Integer.MIN_VALUE);
+ outputStream.writeInt(6);
+ outputStream.writeInt(7);
+
+ ByteBuffer buffer =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ PipeHandleLeaderChangeProcedure proc =
+ (PipeHandleLeaderChangeProcedure)
ProcedureFactory.getInstance().create(buffer);
+
+ assertEquals(new PipeHandleLeaderChangeProcedure(leaderMap), proc);
+ } catch (Exception e) {
+ fail();
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
index 8658d12b6a8..278c1ccaaef 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
@@ -178,6 +178,11 @@ public class IoTDBAirGapReceiver extends WrappedRunnable {
if (System.currentTimeMillis() - startTime
< PipeConfig.getInstance().getPipeAirGapRetryMaxMs()) {
handleReq(req, startTime);
+ } else {
+ LOGGER.warn(
+ "Pipe air gap receiver {}: Temporary unavailable retry timed out,
returning FAIL to sender.",
+ receiverId);
+ fail();
}
} else {
LOGGER.warn(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
index a55de193914..0f6490eceb4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
@@ -393,7 +393,9 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
logWriter.close();
logWriter = null;
}
- tagManager.clear();
+ if (tagManager != null) {
+ tagManager.clear();
+ }
isRecovering = true;
initialized = false;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java
index 19dea8140a1..e23db1f1ca8 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverTest.java
@@ -19,18 +19,32 @@
package org.apache.iotdb.db.pipe.receiver.protocol.airgap;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiver;
import
org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapELanguageConstant;
+import org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapOneByteResponse;
+import
org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapPseudoTPipeTransferRequest;
+import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
+import
org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiverAgent;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.tsfile.utils.BytesUtils;
import org.junit.Assert;
import org.junit.Test;
import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.net.Socket;
+import java.nio.ByteBuffer;
public class IoTDBAirGapReceiverTest {
@@ -69,4 +83,93 @@ public class IoTDBAirGapReceiverTest {
Assert.assertThrows(IOException.class, () ->
receiver.readData(inputStream));
Assert.assertTrue(exception.getMessage().contains("nested E-Language
prefix"));
}
+
+ @Test
+ public void testTemporaryUnavailableRetryTimeoutReturnsFail() throws
Exception {
+ final CommonConfig commonConfig =
CommonDescriptor.getInstance().getConfig();
+ final long originalRetryLocalIntervalMs =
commonConfig.getPipeAirGapRetryLocalIntervalMs();
+ final long originalRetryMaxMs = commonConfig.getPipeAirGapRetryMaxMs();
+
+ try {
+ commonConfig.setPipeAirGapRetryLocalIntervalMs(0);
+ commonConfig.setPipeAirGapRetryMaxMs(1);
+
+ final RecordingSocket socket = new RecordingSocket();
+ final IoTDBAirGapReceiver receiver = new IoTDBAirGapReceiver(socket, 3L);
+ final StubIoTDBDataNodeReceiverAgent stubAgent = new
StubIoTDBDataNodeReceiverAgent();
+ stubAgent.setStubReceiver(
+ new StubReceiver(
+ new TSStatus(
+
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())));
+ setField(receiver, "agent", stubAgent);
+
+ final AirGapPseudoTPipeTransferRequest req = new
AirGapPseudoTPipeTransferRequest();
+ req.setVersion(IoTDBSinkRequestVersion.VERSION_1.getVersion());
+ req.setType((short) 0);
+ req.setBody(ByteBuffer.allocate(0));
+
+ final Method handleReq =
+ IoTDBAirGapReceiver.class.getDeclaredMethod(
+ "handleReq", AirGapPseudoTPipeTransferRequest.class, long.class);
+ handleReq.setAccessible(true);
+ handleReq.invoke(receiver, req, System.currentTimeMillis() - 10_000L);
+
+ Assert.assertArrayEquals(AirGapOneByteResponse.FAIL,
socket.getWrittenBytes());
+ } finally {
+
commonConfig.setPipeAirGapRetryLocalIntervalMs(originalRetryLocalIntervalMs);
+ commonConfig.setPipeAirGapRetryMaxMs(originalRetryMaxMs);
+ }
+ }
+
+ private static void setField(final Object target, final String fieldName,
final Object value)
+ throws Exception {
+ final Field field = IoTDBAirGapReceiver.class.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(target, value);
+ }
+
+ private static class RecordingSocket extends Socket {
+
+ private final ByteArrayOutputStream outputStream = new
ByteArrayOutputStream();
+
+ @Override
+ public OutputStream getOutputStream() {
+ return outputStream;
+ }
+
+ byte[] getWrittenBytes() {
+ return outputStream.toByteArray();
+ }
+ }
+
+ private static class StubIoTDBDataNodeReceiverAgent extends
IoTDBDataNodeReceiverAgent {
+
+ void setStubReceiver(final IoTDBReceiver receiver) {
+ setReceiverWithSpecifiedClient(null, receiver);
+ }
+ }
+
+ private static class StubReceiver implements IoTDBReceiver {
+
+ private final TPipeTransferResp response;
+
+ private StubReceiver(final TSStatus status) {
+ response = new TPipeTransferResp(status);
+ }
+
+ @Override
+ public TPipeTransferResp receive(final TPipeTransferReq req) {
+ return response;
+ }
+
+ @Override
+ public void handleExit() {
+ // noop for unit test
+ }
+
+ @Override
+ public IoTDBSinkRequestVersion getVersion() {
+ return IoTDBSinkRequestVersion.VERSION_1;
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 9153acf9b2e..61d9155f996 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -461,16 +461,7 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
receiverFileDirWithIdSuffix.get().getPath());
}
}
- Path baseDir =
receiverFileDirWithIdSuffix.get().toPath().toAbsolutePath().normalize();
- Path targetPath = baseDir.resolve(fileName).toAbsolutePath().normalize();
-
- if (!targetPath.startsWith(baseDir)) {
- LOGGER.error(
- "Receiver id = {}: Path traversal attempt detected! Filename: {}",
- receiverId.get(),
- fileName);
- throw new IOException("Illegal fileName: " + fileName + " (Path
traversal detected)");
- }
+ final Path targetPath = resolveReceiverFilePath(fileName);
writingFile = targetPath.toFile();
writingFileWriter = new RandomAccessFile(writingFile, "rw");
@@ -481,7 +472,37 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
}
private boolean isFileExistedAndNameCorrect(final String fileName) {
- return writingFile != null && writingFile.exists() &&
writingFile.getName().equals(fileName);
+ try {
+ return writingFile != null
+ && writingFile.exists()
+ && receiverFileDirWithIdSuffix.get() != null
+ && writingFile
+ .toPath()
+ .toAbsolutePath()
+ .normalize()
+ .equals(resolveReceiverFilePath(fileName));
+ } catch (final IOException e) {
+ PipeLogger.log(
+ LOGGER::warn,
+ e,
+ "Receiver id = %s: Illegal file name %s when checking writing file.",
+ receiverId.get(),
+ fileName);
+ return false;
+ }
+ }
+
+ private Path resolveReceiverFilePath(final String fileName) throws
IOException {
+ try {
+ return PipeReceiverFilePathUtils.resolveFilePath(
+ receiverFileDirWithIdSuffix.get().toPath(), fileName);
+ } catch (final IOException e) {
+ LOGGER.error(
+ "Receiver id = {}: Path traversal attempt detected! Filename: {}",
+ receiverId.get(),
+ fileName);
+ throw e;
+ }
}
private void closeCurrentWritingFileWriter(final boolean fsyncAfterClose) {
@@ -641,11 +662,23 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
}
protected final TPipeTransferResp handleTransferFileSealV2(final
PipeTransferFileSealReqV2 req) {
- final List<File> files =
- req.getFileNames().stream()
- .map(fileName -> new File(receiverFileDirWithIdSuffix.get(),
fileName))
- .collect(Collectors.toList());
+ final List<String> fileNames = req.getFileNames();
try {
+ final List<File> files =
+ fileNames.stream()
+ .map(
+ fileName -> {
+ if (Objects.isNull(fileName)) {
+ return null;
+ }
+ try {
+ return resolveReceiverFilePath(fileName).toFile();
+ } catch (final IOException e) {
+ throw new IllegalArgumentException(e);
+ }
+ })
+ .collect(Collectors.toList());
+
if (!isWritingFileAvailable()) {
final TSStatus status =
RpcUtils.getStatus(
@@ -707,17 +740,20 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
}
return new TPipeTransferResp(status);
} catch (final Exception e) {
+ final Throwable rootCause = e instanceof IllegalArgumentException ?
e.getCause() : e;
PipeLogger.log(
LOGGER::warn,
- e,
+ rootCause,
"Receiver id = %s: Failed to seal file %s from req %s.",
receiverId.get(),
- files,
+ fileNames,
req);
return new TPipeTransferResp(
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
- String.format("Failed to seal file %s because %s", writingFile,
e.getMessage())));
+ String.format(
+ "Failed to seal file %s because %s",
+ fileNames, rootCause == null ? e.getMessage() :
rootCause.getMessage())));
} finally {
// If the writing file is not sealed successfully, the writing file will
be deleted.
// All pieces of the writing file and its mod(if exists) should be
retransmitted by the
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverFilePathUtils.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverFilePathUtils.java
new file mode 100644
index 00000000000..bc7275d4ebe
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/PipeReceiverFilePathUtils.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.receiver;
+
+import java.io.IOException;
+import java.nio.file.Path;
+
+public final class PipeReceiverFilePathUtils {
+
+ private PipeReceiverFilePathUtils() {
+ // Utility class
+ }
+
+ public static Path resolveFilePath(final Path baseDir, final String
fileName) throws IOException {
+ final Path normalizedBaseDir = baseDir.toAbsolutePath().normalize();
+ final Path normalizedTargetPath =
+ normalizedBaseDir.resolve(fileName).toAbsolutePath().normalize();
+
+ if (!normalizedTargetPath.startsWith(normalizedBaseDir)) {
+ throw new IOException("Illegal fileName: " + fileName + " (Path
traversal detected)");
+ }
+
+ return normalizedTargetPath;
+ }
+}
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java
index 6658927b9bb..337cece8a48 100644
---
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java
@@ -21,8 +21,10 @@ package org.apache.iotdb.commons.pipe.receiver;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV1;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV2;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
@@ -33,6 +35,8 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
public class IoTDBFileReceiverTest {
@@ -63,6 +67,25 @@ public class IoTDBFileReceiverTest {
}
}
+ @Test
+ public void testRejectPathTraversalFileNameInSealRequest() throws Exception {
+ final Path baseDir = Files.createTempDirectory("iotdb-file-receiver-test");
+ final DummyFileReceiver receiver = new DummyFileReceiver(baseDir.toFile());
+ try {
+ receiver.createWritingFile("normal.tsfile", false);
+
+ final TPipeTransferResp response =
+ receiver.sealFiles(
+ Arrays.asList("../outside.mod", "normal.tsfile"),
Arrays.asList(0L, 0L));
+
+ Assert.assertEquals(
+ TSStatusCode.PIPE_TRANSFER_FILE_ERROR.getStatusCode(),
response.getStatus().getCode());
+ Assert.assertTrue(response.getStatus().getMessage().contains("Illegal
fileName"));
+ } finally {
+ receiver.handleExit();
+ }
+ }
+
private static class DummyFileReceiver extends IoTDBFileReceiver {
DummyFileReceiver(final File baseDir) {
@@ -73,6 +96,12 @@ public class IoTDBFileReceiverTest {
updateWritingFileIfNeeded(fileName, isSingleFile);
}
+ TPipeTransferResp sealFiles(final List<String> fileNames, final List<Long>
fileLengths)
+ throws IOException {
+ return handleTransferFileSealV2(
+ DummyFileSealReqV2.toTPipeTransferReq(fileNames, fileLengths,
Collections.emptyMap()));
+ }
+
File getWritingFileInBaseDir(final String fileName) {
return
receiverFileDirWithIdSuffix.get().toPath().resolve(fileName).toFile();
}
@@ -125,4 +154,21 @@ public class IoTDBFileReceiverTest {
return null;
}
}
+
+ private static class DummyFileSealReqV2 extends PipeTransferFileSealReqV2 {
+
+ static DummyFileSealReqV2 toTPipeTransferReq(
+ final List<String> fileNames,
+ final List<Long> fileLengths,
+ final java.util.Map<String, String> parameters)
+ throws IOException {
+ return (DummyFileSealReqV2)
+ new DummyFileSealReqV2().convertToTPipeTransferReq(fileNames,
fileLengths, parameters);
+ }
+
+ @Override
+ protected PipeRequestType getPlanType() {
+ return PipeRequestType.TRANSFER_SCHEMA_SNAPSHOT_SEAL;
+ }
+ }
}