This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c1d16a40a64 Improve state type validation in CombineRequest 
deserialization. (#17449)
c1d16a40a64 is described below

commit c1d16a40a64b9c94db2fe0f3cdb653b40f26beb6
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu Apr 9 15:30:08 2026 +0800

    Improve state type validation in CombineRequest deserialization. (#17449)
    
    * Improve state type validation in CombineRequest deserialization.
    
    Tighten request deserialization by validating state type before 
instantiation to reduce unexpected type usage risk, and add targeted tests for 
accepted and rejected state class names.
    
    Made-with: Cursor
    
    * spotless
---
 .../twostage/exchange/payload/CombineRequest.java  | 10 +++-
 .../pipe/sink/PipeDataNodeThriftRequestTest.java   | 58 ++++++++++++++++++++++
 2 files changed, 67 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/CombineRequest.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/CombineRequest.java
index 99c8bb67a1a..ae6f7f9a5fb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/CombineRequest.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/CombineRequest.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.processor.twostage.exchange.payload;
 
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
+import org.apache.iotdb.db.pipe.processor.twostage.state.CountState;
 import org.apache.iotdb.db.pipe.processor.twostage.state.State;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 
@@ -109,7 +110,7 @@ public class CombineRequest extends TPipeTransferReq {
     combineId = ReadWriteIOUtils.readString(transferReq.body);
 
     final String stateClassName = 
ReadWriteIOUtils.readString(transferReq.body);
-    state = (State) Class.forName(stateClassName).newInstance();
+    state = instantiateState(stateClassName);
     state.deserialize(transferReq.body);
 
     version = transferReq.version;
@@ -118,6 +119,13 @@ public class CombineRequest extends TPipeTransferReq {
     return this;
   }
 
+  private State instantiateState(final String stateClassName) throws Exception 
{
+    if (CountState.class.getName().equals(stateClassName)) {
+      return new CountState();
+    }
+    throw new IllegalArgumentException("Unexpected state class: " + 
stateClassName);
+  }
+
   @Override
   public String toString() {
     return "CombineRequest{"
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
index d4af2135c95..1263e83ae08 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.db.pipe.sink;
 import org.apache.iotdb.commons.path.PartialPath;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.response.PipeTransferFilePieceResp;
 import org.apache.iotdb.commons.schema.SchemaConstant;
+import 
org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.CombineRequest;
+import org.apache.iotdb.db.pipe.processor.twostage.state.CountState;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferPlanNodeReq;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferSchemaSnapshotPieceReq;
@@ -43,6 +45,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
 import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 
 import org.apache.tsfile.common.conf.TSFileConfig;
 import org.apache.tsfile.enums.TSDataType;
@@ -69,6 +72,61 @@ public class PipeDataNodeThriftRequestTest {
 
   private static final String TIME_PRECISION = "ms";
 
+  @Test
+  public void testCombineRequest() throws Exception {
+    final CombineRequest req =
+        CombineRequest.toTPipeTransferReq("pipe", 1L, 2, "combine", new 
CountState(123L));
+    final CombineRequest deserializeReq = 
CombineRequest.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+    Assert.assertEquals(req.getType(), deserializeReq.getType());
+    Assert.assertEquals("pipe", deserializeReq.getPipeName());
+    Assert.assertEquals(1L, deserializeReq.getCreationTime());
+    Assert.assertEquals(2, deserializeReq.getRegionId());
+    Assert.assertEquals("combine", deserializeReq.getCombineId());
+    Assert.assertTrue(deserializeReq.getState() instanceof CountState);
+    Assert.assertEquals(123L, ((CountState) 
deserializeReq.getState()).getCount());
+  }
+
+  @Test
+  public void testCombineRequestWithUnexpectedStateClassName() throws 
Exception {
+    final CombineRequest req =
+        CombineRequest.toTPipeTransferReq("pipe", 1L, 2, "combine", new 
CountState(123L));
+
+    final ByteBuffer bodyBuffer = req.body.duplicate();
+    final String pipeName = ReadWriteIOUtils.readString(bodyBuffer);
+    final long creationTime = ReadWriteIOUtils.readLong(bodyBuffer);
+    final int regionId = ReadWriteIOUtils.readInt(bodyBuffer);
+    final String combineId = ReadWriteIOUtils.readString(bodyBuffer);
+    ReadWriteIOUtils.readString(bodyBuffer);
+    final long count = ReadWriteIOUtils.readLong(bodyBuffer);
+
+    final ByteBuffer tamperedBody;
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(pipeName, outputStream);
+      ReadWriteIOUtils.write(creationTime, outputStream);
+      ReadWriteIOUtils.write(regionId, outputStream);
+      ReadWriteIOUtils.write(combineId, outputStream);
+      ReadWriteIOUtils.write("java.lang.String", outputStream);
+      ReadWriteIOUtils.write(count, outputStream);
+      tamperedBody =
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+
+    final TPipeTransferReq tamperedReq = new TPipeTransferReq();
+    tamperedReq.version = req.version;
+    tamperedReq.type = req.type;
+    tamperedReq.body = tamperedBody;
+
+    try {
+      CombineRequest.fromTPipeTransferReq(tamperedReq);
+      Assert.fail("Expected IllegalArgumentException");
+    } catch (final IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("Unexpected state class"));
+    }
+  }
+
   @Test
   public void testPipeTransferDataNodeHandshakeReq() throws IOException {
     final PipeTransferDataNodeHandshakeV1Req req =

Reply via email to