This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 8bf164037e7 [To dev/1.3] Improve state type validation in
CombineRequest deserialization. (#17449) (#17502)
8bf164037e7 is described below
commit 8bf164037e7100be32bc927cb3beea6619885568
Author: Zhenyu Luo <[email protected]>
AuthorDate: Fri Apr 17 14:55:49 2026 +0800
[To dev/1.3] Improve state type validation in CombineRequest
deserialization. (#17449) (#17502)
* Improve state type validation in CombineRequest deserialization. (#17449)
* Improve state type validation in CombineRequest deserialization.
Tighten request deserialization by validating state type before
instantiation to reduce unexpected type usage risk, and add targeted tests for
accepted and rejected state class names.
Made-with: Cursor
* spotless
(cherry picked from commit c1d16a40a64b9c94db2fe0f3cdb653b40f26beb6)
* spotless
---
.../twostage/exchange/payload/CombineRequest.java | 10 +++-
.../pipe/sink/PipeDataNodeThriftRequestTest.java | 58 ++++++++++++++++++++++
2 files changed, 67 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/CombineRequest.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/CombineRequest.java
index 99c8bb67a1a..ae6f7f9a5fb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/CombineRequest.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/CombineRequest.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.processor.twostage.exchange.payload;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
+import org.apache.iotdb.db.pipe.processor.twostage.state.CountState;
import org.apache.iotdb.db.pipe.processor.twostage.state.State;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -109,7 +110,7 @@ public class CombineRequest extends TPipeTransferReq {
combineId = ReadWriteIOUtils.readString(transferReq.body);
final String stateClassName =
ReadWriteIOUtils.readString(transferReq.body);
- state = (State) Class.forName(stateClassName).newInstance();
+ state = instantiateState(stateClassName);
state.deserialize(transferReq.body);
version = transferReq.version;
@@ -118,6 +119,13 @@ public class CombineRequest extends TPipeTransferReq {
return this;
}
+ private State instantiateState(final String stateClassName) throws Exception
{
+ if (CountState.class.getName().equals(stateClassName)) {
+ return new CountState();
+ }
+ throw new IllegalArgumentException("Unexpected state class: " +
stateClassName);
+ }
+
@Override
public String toString() {
return "CombineRequest{"
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
index c69b4f53a46..ee9b7218dab 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.pipe.sink;
import org.apache.iotdb.commons.path.PartialPath;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.response.PipeTransferFilePieceResp;
+import
org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.CombineRequest;
+import org.apache.iotdb.db.pipe.processor.twostage.state.CountState;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferPlanNodeReq;
import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferSchemaSnapshotPieceReq;
@@ -37,6 +39,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.Cre
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.enums.TSDataType;
@@ -62,6 +65,61 @@ public class PipeDataNodeThriftRequestTest {
private static final String TIME_PRECISION = "ms";
+ @Test
+ public void testCombineRequest() throws Exception {
+ final CombineRequest req =
+ CombineRequest.toTPipeTransferReq("pipe", 1L, 2, "combine", new
CountState(123L));
+ final CombineRequest deserializeReq =
CombineRequest.fromTPipeTransferReq(req);
+
+ Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+ Assert.assertEquals(req.getType(), deserializeReq.getType());
+ Assert.assertEquals("pipe", deserializeReq.getPipeName());
+ Assert.assertEquals(1L, deserializeReq.getCreationTime());
+ Assert.assertEquals(2, deserializeReq.getRegionId());
+ Assert.assertEquals("combine", deserializeReq.getCombineId());
+ Assert.assertTrue(deserializeReq.getState() instanceof CountState);
+ Assert.assertEquals(123L, ((CountState)
deserializeReq.getState()).getCount());
+ }
+
+ @Test
+ public void testCombineRequestWithUnexpectedStateClassName() throws
Exception {
+ final CombineRequest req =
+ CombineRequest.toTPipeTransferReq("pipe", 1L, 2, "combine", new
CountState(123L));
+
+ final ByteBuffer bodyBuffer = req.body.duplicate();
+ final String pipeName = ReadWriteIOUtils.readString(bodyBuffer);
+ final long creationTime = ReadWriteIOUtils.readLong(bodyBuffer);
+ final int regionId = ReadWriteIOUtils.readInt(bodyBuffer);
+ final String combineId = ReadWriteIOUtils.readString(bodyBuffer);
+ ReadWriteIOUtils.readString(bodyBuffer);
+ final long count = ReadWriteIOUtils.readLong(bodyBuffer);
+
+ final ByteBuffer tamperedBody;
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(pipeName, outputStream);
+ ReadWriteIOUtils.write(creationTime, outputStream);
+ ReadWriteIOUtils.write(regionId, outputStream);
+ ReadWriteIOUtils.write(combineId, outputStream);
+ ReadWriteIOUtils.write("java.lang.String", outputStream);
+ ReadWriteIOUtils.write(count, outputStream);
+ tamperedBody =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ }
+
+ final TPipeTransferReq tamperedReq = new TPipeTransferReq();
+ tamperedReq.version = req.version;
+ tamperedReq.type = req.type;
+ tamperedReq.body = tamperedBody;
+
+ try {
+ CombineRequest.fromTPipeTransferReq(tamperedReq);
+ Assert.fail("Expected IllegalArgumentException");
+ } catch (final IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().contains("Unexpected state class"));
+ }
+ }
+
@Test
public void testPipeTransferDataNodeHandshakeReq() throws IOException {
final PipeTransferDataNodeHandshakeV1Req req =