This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 8bf164037e7 [To dev/1.3] Improve state type validation in 
CombineRequest deserialization. (#17449) (#17502)
8bf164037e7 is described below

commit 8bf164037e7100be32bc927cb3beea6619885568
Author: Zhenyu Luo <[email protected]>
AuthorDate: Fri Apr 17 14:55:49 2026 +0800

    [To dev/1.3] Improve state type validation in CombineRequest 
deserialization. (#17449) (#17502)
    
    * Improve state type validation in CombineRequest deserialization. (#17449)
    
    * Improve state type validation in CombineRequest deserialization.
    
    Tighten request deserialization by validating state type before 
instantiation to reduce unexpected type usage risk, and add targeted tests for 
accepted and rejected state class names.
    
    Made-with: Cursor
    
    * spotless
    
    (cherry picked from commit c1d16a40a64b9c94db2fe0f3cdb653b40f26beb6)
    
    * spotless
---
 .../twostage/exchange/payload/CombineRequest.java  | 10 +++-
 .../pipe/sink/PipeDataNodeThriftRequestTest.java   | 58 ++++++++++++++++++++++
 2 files changed, 67 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/CombineRequest.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/CombineRequest.java
index 99c8bb67a1a..ae6f7f9a5fb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/CombineRequest.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/payload/CombineRequest.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.processor.twostage.exchange.payload;
 
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
+import org.apache.iotdb.db.pipe.processor.twostage.state.CountState;
 import org.apache.iotdb.db.pipe.processor.twostage.state.State;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 
@@ -109,7 +110,7 @@ public class CombineRequest extends TPipeTransferReq {
     combineId = ReadWriteIOUtils.readString(transferReq.body);
 
     final String stateClassName = 
ReadWriteIOUtils.readString(transferReq.body);
-    state = (State) Class.forName(stateClassName).newInstance();
+    state = instantiateState(stateClassName);
     state.deserialize(transferReq.body);
 
     version = transferReq.version;
@@ -118,6 +119,13 @@ public class CombineRequest extends TPipeTransferReq {
     return this;
   }
 
+  private State instantiateState(final String stateClassName) throws Exception 
{
+    if (CountState.class.getName().equals(stateClassName)) {
+      return new CountState();
+    }
+    throw new IllegalArgumentException("Unexpected state class: " + 
stateClassName);
+  }
+
   @Override
   public String toString() {
     return "CombineRequest{"
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
index c69b4f53a46..ee9b7218dab 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.pipe.sink;
 
 import org.apache.iotdb.commons.path.PartialPath;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.response.PipeTransferFilePieceResp;
+import 
org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.CombineRequest;
+import org.apache.iotdb.db.pipe.processor.twostage.state.CountState;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferPlanNodeReq;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferSchemaSnapshotPieceReq;
@@ -37,6 +39,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.Cre
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
 import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 
 import org.apache.tsfile.common.conf.TSFileConfig;
 import org.apache.tsfile.enums.TSDataType;
@@ -62,6 +65,61 @@ public class PipeDataNodeThriftRequestTest {
 
   private static final String TIME_PRECISION = "ms";
 
+  @Test
+  public void testCombineRequest() throws Exception {
+    final CombineRequest req =
+        CombineRequest.toTPipeTransferReq("pipe", 1L, 2, "combine", new 
CountState(123L));
+    final CombineRequest deserializeReq = 
CombineRequest.fromTPipeTransferReq(req);
+
+    Assert.assertEquals(req.getVersion(), deserializeReq.getVersion());
+    Assert.assertEquals(req.getType(), deserializeReq.getType());
+    Assert.assertEquals("pipe", deserializeReq.getPipeName());
+    Assert.assertEquals(1L, deserializeReq.getCreationTime());
+    Assert.assertEquals(2, deserializeReq.getRegionId());
+    Assert.assertEquals("combine", deserializeReq.getCombineId());
+    Assert.assertTrue(deserializeReq.getState() instanceof CountState);
+    Assert.assertEquals(123L, ((CountState) 
deserializeReq.getState()).getCount());
+  }
+
+  @Test
+  public void testCombineRequestWithUnexpectedStateClassName() throws 
Exception {
+    final CombineRequest req =
+        CombineRequest.toTPipeTransferReq("pipe", 1L, 2, "combine", new 
CountState(123L));
+
+    final ByteBuffer bodyBuffer = req.body.duplicate();
+    final String pipeName = ReadWriteIOUtils.readString(bodyBuffer);
+    final long creationTime = ReadWriteIOUtils.readLong(bodyBuffer);
+    final int regionId = ReadWriteIOUtils.readInt(bodyBuffer);
+    final String combineId = ReadWriteIOUtils.readString(bodyBuffer);
+    ReadWriteIOUtils.readString(bodyBuffer);
+    final long count = ReadWriteIOUtils.readLong(bodyBuffer);
+
+    final ByteBuffer tamperedBody;
+    try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      ReadWriteIOUtils.write(pipeName, outputStream);
+      ReadWriteIOUtils.write(creationTime, outputStream);
+      ReadWriteIOUtils.write(regionId, outputStream);
+      ReadWriteIOUtils.write(combineId, outputStream);
+      ReadWriteIOUtils.write("java.lang.String", outputStream);
+      ReadWriteIOUtils.write(count, outputStream);
+      tamperedBody =
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    }
+
+    final TPipeTransferReq tamperedReq = new TPipeTransferReq();
+    tamperedReq.version = req.version;
+    tamperedReq.type = req.type;
+    tamperedReq.body = tamperedBody;
+
+    try {
+      CombineRequest.fromTPipeTransferReq(tamperedReq);
+      Assert.fail("Expected IllegalArgumentException");
+    } catch (final IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains("Unexpected state class"));
+    }
+  }
+
   @Test
   public void testPipeTransferDataNodeHandshakeReq() throws IOException {
     final PipeTransferDataNodeHandshakeV1Req req =

Reply via email to