This is an automated email from the ASF dual-hosted git repository.

Wei-hao-Li pushed a commit to branch lwh/frame1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4dda60f2785c0b73129d112f19f5529fbe0662a0
Author: Weihao Li <[email protected]>
AuthorDate: Sun Apr 26 08:44:43 2026 +0800

    Fix appearance of dispatching FI failed because of thrift frame is oversize 
(#17536)
    
    (cherry picked from commit a15c7330d912bbc924533a08dd5bcd2fbae70a59)
---
 .../rpc/TCompressedElasticFramedTransport.java     |  9 +----
 .../apache/iotdb/rpc/TElasticFramedTransport.java  | 45 ++++++++++++++++++++++
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |  1 +
 .../iotdb/db/protocol/thrift/OperationType.java    | 11 +++++-
 .../scheduler/FragmentInstanceDispatcherImpl.java  | 11 ++++++
 .../apache/iotdb/db/utils/ErrorHandlingUtils.java  | 14 ++++++-
 6 files changed, 82 insertions(+), 9 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
index a3b4f38064a..62abc28e470 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
@@ -44,13 +44,7 @@ public abstract class TCompressedElasticFramedTransport 
extends TElasticFramedTr
   protected void readFrame() throws TTransportException {
     underlying.readAll(i32buf, 0, 4);
     int size = TFramedTransport.decodeFrameSize(i32buf);
-
-    if (size < 0) {
-      close();
-      throw new TTransportException(
-          TTransportException.CORRUPTED_DATA, "Read a negative frame size (" + 
size + ")!");
-    }
-
+    validateFrame(size);
     readBuffer.fill(underlying, size);
     RpcStat.readCompressedBytes.addAndGet(size);
     try {
@@ -75,6 +69,7 @@ public abstract class TCompressedElasticFramedTransport 
extends TElasticFramedTr
       writeCompressBuffer.resizeIfNecessary(maxCompressedLength);
       int compressedLength =
           compress(writeBuffer.getBuffer(), 0, length, 
writeCompressBuffer.getBuffer(), 0);
+      checkWriteFrameSize(compressedLength);
       RpcStat.writeCompressedBytes.addAndGet(compressedLength);
       TFramedTransport.encodeFrameSize(compressedLength, i32buf);
       underlying.write(i32buf, 0, 4);
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
index 4c7602c8699..cd0256712f2 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
@@ -23,8 +23,11 @@ import org.apache.thrift.TConfiguration;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.thrift.transport.TTransportFactory;
+import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.layered.TFramedTransport;
 
+import java.net.SocketAddress;
+
 // https://github.com/apache/thrift/blob/master/doc/specs/thrift-rpc.md
 public class TElasticFramedTransport extends TTransport {
 
@@ -121,6 +124,16 @@ public class TElasticFramedTransport extends TTransport {
   protected void readFrame() throws TTransportException {
     underlying.readAll(i32buf, 0, 4);
     int size = TFramedTransport.decodeFrameSize(i32buf);
+    validateFrame(size);
+    readBuffer.fill(underlying, size);
+  }
+
+  protected void validateFrame(int size) throws TTransportException {
+    final int HTTP_GET_SIGNATURE = 0x47455420; // "GET "
+    final int HTTP_POST_SIGNATURE = 0x504F5354; // "POST"
+    final int TLS_MIN_VERSION = 0x160300;
+    final int TLS_MAX_VERSION = 0x160303;
+    final int TLS_LENGTH_HIGH_MAX = 0x02;
 
     if (size < 0) {
       close();
@@ -138,6 +151,16 @@ public class TElasticFramedTransport extends TTransport {
                 + size
                 + ") detected, you may be sending HTTP GET/POST requests to 
the Thrift-RPC port, please confirm that you are using the right port");
       } else {
+        int high24 = size >>> 8;
+        if (high24 >= TLS_MIN_VERSION
+            && high24 <= TLS_MAX_VERSION
+            && (i32buf[3] & 0xFF) <= TLS_LENGTH_HIGH_MAX) {
+          throw new TTransportException(
+              TTransportException.CORRUPTED_DATA,
+              "Singular frame size ("
+                  + size
+                  + ") detected, you may be sending TLS requests to the 
Thrift-RPC port, please confirm that you are using the right port");
+        }
         throw new TTransportException(
             TTransportException.CORRUPTED_DATA,
             "Frame size (" + size + ") larger than protect max size (" + 
thriftMaxFrameSize + ")!");
@@ -146,9 +169,31 @@ public class TElasticFramedTransport extends TTransport {
     readBuffer.fill(underlying, size);
   }
 
+  protected void checkWriteFrameSize(int size) throws TTransportException {
+    if (size <= thriftMaxFrameSize) {
+      return;
+    }
+    SocketAddress remoteAddress = null;
+    if (underlying instanceof TSocket) {
+      remoteAddress = ((TSocket) 
underlying).getSocket().getRemoteSocketAddress();
+    }
+    String remoteInfo = (remoteAddress == null) ? "" : " to " + remoteAddress;
+    String message =
+        "Frame size ("
+            + size
+            + ") larger than protect max size ("
+            + thriftMaxFrameSize
+            + ") while writing"
+            + remoteInfo
+            + "!";
+    close();
+    throw new TTransportException(TTransportException.CORRUPTED_DATA, message);
+  }
+
   @Override
   public void flush() throws TTransportException {
     int length = writeBuffer.getPos();
+    checkWriteFrameSize(length);
     TFramedTransport.encodeFrameSize(length, i32buf);
     underlying.write(i32buf, 0, 4);
     underlying.write(writeBuffer.getBuffer(), 0, length);
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index d28510c2b7f..7afe263b990 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -47,6 +47,7 @@ public enum TSStatusCode {
   INTERNAL_SERVER_ERROR(305),
   DISPATCH_ERROR(306),
   LICENSE_ERROR(307),
+  THRIFT_FRAME_OVERSIZE(308),
 
   // Client,
   REDIRECTION_RECOMMEND(400),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java
index 065a568353f..94b5dcadecb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java
@@ -50,7 +50,16 @@ public enum OperationType {
   CREATE_SCHEMA_TEMPLATE("createSchemaTemplate"),
   CHECK_AUTHORITY("checkAuthority"),
   EXECUTE_NON_QUERY_PLAN("executeNonQueryPlan"),
-  QUERY_LATENCY("queryLatency");
+  SELECT_INTO("selectInto"),
+  QUERY_LATENCY("queryLatency"),
+  WRITE_AUDIT_LOG("writeAuditLog"),
+  PREPARE_STATEMENT("prepareStatement"),
+  EXECUTE_PREPARED_STATEMENT("executePreparedStatement"),
+  DEALLOCATE_PREPARED_STATEMENT("deallocatePreparedStatement"),
+  GET_EARLIEST_TIMESLOTS("getEarliestTimeslots"),
+  GENERATE_DATA_PARTITION_TABLE("generateDataPartitionTable"),
+  CHECK_DATA_PARTITION_TABLE_STATUS("checkDataPartitionTableStatus"),
+  DISPATCH_FRAGMENT_INSTANCE("dispatchFragmentInstance");
   private final String name;
 
   OperationType(String name) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 162ccf22a75..e497942dd88 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -56,6 +56,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
 import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,6 +71,7 @@ import java.util.concurrent.TimeUnit;
 
 import static com.google.common.util.concurrent.Futures.immediateFuture;
 import static 
org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.DISPATCH_READ;
+import static 
org.apache.iotdb.db.utils.ErrorHandlingUtils.onThriftFrameOversizeException;
 
 public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher 
{
 
@@ -481,6 +483,15 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
                   TSStatusCode.EXECUTE_STATEMENT_ERROR,
                   String.format("unknown read type [%s]", 
instance.getType())));
       }
+    } catch (TException e) {
+      Throwable rootCause = ExceptionUtils.getRootCause(e);
+      if (rootCause instanceof TTransportException
+          && ((TTransportException) rootCause).getType() == 
TTransportException.CORRUPTED_DATA) {
+        // Don't set DISPATCH_ERROR status to avoid retry if dispatch failed 
because of thrift frame
+        // is oversize
+        throw new 
FragmentInstanceDispatchException(onThriftFrameOversizeException(rootCause));
+      }
+      throw e;
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
index 987eaaf1019..b777ca2e40f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
@@ -45,6 +45,7 @@ import java.util.Objects;
 import java.util.concurrent.ExecutionException;
 
 import static org.apache.iotdb.commons.utils.StatusUtils.needRetry;
+import static 
org.apache.iotdb.db.protocol.thrift.OperationType.DISPATCH_FRAGMENT_INSTANCE;
 
 public class ErrorHandlingUtils {
 
@@ -59,10 +60,11 @@ public class ErrorHandlingUtils {
       "The read statement is not allowed in batch: ";
 
   private static final String ERROR_OPERATION_LOG = "Status code: {}, 
operation: {} failed";
+  private static final String EXCEPTION_PATTERN = "[%s] Exception occurred: %s 
failed. ";
 
   public static TSStatus onNpeOrUnexpectedException(
       Exception e, String operation, TSStatusCode statusCode) {
-    String message = String.format("[%s] Exception occurred: %s failed. ", 
statusCode, operation);
+    String message = String.format(EXCEPTION_PATTERN, statusCode, operation);
     if (e instanceof IOException || e instanceof NullPointerException) {
       LOGGER.error(ERROR_OPERATION_LOG, statusCode, operation, e);
     } else {
@@ -93,6 +95,16 @@ public class ErrorHandlingUtils {
     return e;
   }
 
+  public static TSStatus onThriftFrameOversizeException(Throwable t) {
+    TSStatus status =
+        new 
TSStatus(TSStatusCode.THRIFT_FRAME_OVERSIZE.getStatusCode()).setNeedRetry(false);
+    String message =
+        String.format(EXCEPTION_PATTERN, status, DISPATCH_FRAGMENT_INSTANCE)
+            + getRootCause(t).getMessage();
+    LOGGER.warn(message);
+    return status.setMessage(message);
+  }
+
   public static TSStatus onQueryException(Exception e, String operation, 
TSStatusCode statusCode) {
     TSStatus status = tryCatchQueryException(e);
     if (status != null) {

Reply via email to