This is an automated email from the ASF dual-hosted git repository. Wei-hao-Li pushed a commit to branch lwh/frame1.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4dda60f2785c0b73129d112f19f5529fbe0662a0 Author: Weihao Li <[email protected]> AuthorDate: Sun Apr 26 08:44:43 2026 +0800 Fix appearance of dispatching FI failed because of thrift frame is oversize (#17536) (cherry picked from commit a15c7330d912bbc924533a08dd5bcd2fbae70a59) --- .../rpc/TCompressedElasticFramedTransport.java | 9 +---- .../apache/iotdb/rpc/TElasticFramedTransport.java | 45 ++++++++++++++++++++++ .../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 + .../iotdb/db/protocol/thrift/OperationType.java | 11 +++++- .../scheduler/FragmentInstanceDispatcherImpl.java | 11 ++++++ .../apache/iotdb/db/utils/ErrorHandlingUtils.java | 14 ++++++- 6 files changed, 82 insertions(+), 9 deletions(-) diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java index a3b4f38064a..62abc28e470 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java @@ -44,13 +44,7 @@ public abstract class TCompressedElasticFramedTransport extends TElasticFramedTr protected void readFrame() throws TTransportException { underlying.readAll(i32buf, 0, 4); int size = TFramedTransport.decodeFrameSize(i32buf); - - if (size < 0) { - close(); - throw new TTransportException( - TTransportException.CORRUPTED_DATA, "Read a negative frame size (" + size + ")!"); - } - + validateFrame(size); readBuffer.fill(underlying, size); RpcStat.readCompressedBytes.addAndGet(size); try { @@ -75,6 +69,7 @@ public abstract class TCompressedElasticFramedTransport extends TElasticFramedTr writeCompressBuffer.resizeIfNecessary(maxCompressedLength); int compressedLength = compress(writeBuffer.getBuffer(), 0, length, writeCompressBuffer.getBuffer(), 0); + checkWriteFrameSize(compressedLength); RpcStat.writeCompressedBytes.addAndGet(compressedLength); TFramedTransport.encodeFrameSize(compressedLength, i32buf); underlying.write(i32buf, 0, 4); diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java index 4c7602c8699..cd0256712f2 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java @@ -23,8 +23,11 @@ import org.apache.thrift.TConfiguration; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import org.apache.thrift.transport.TTransportFactory; +import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.layered.TFramedTransport; +import java.net.SocketAddress; + // https://github.com/apache/thrift/blob/master/doc/specs/thrift-rpc.md public class TElasticFramedTransport extends TTransport { @@ -121,6 +124,16 @@ public class TElasticFramedTransport extends TTransport { protected void readFrame() throws TTransportException { underlying.readAll(i32buf, 0, 4); int size = TFramedTransport.decodeFrameSize(i32buf); + validateFrame(size); + readBuffer.fill(underlying, size); + } + + protected void validateFrame(int size) throws TTransportException { + final int HTTP_GET_SIGNATURE = 0x47455420; // "GET " + final int HTTP_POST_SIGNATURE = 0x504F5354; // "POST" + final int TLS_MIN_VERSION = 0x160300; + final int TLS_MAX_VERSION = 0x160303; + final int TLS_LENGTH_HIGH_MAX = 0x02; if (size < 0) { close(); @@ -138,6 +151,16 @@ public class TElasticFramedTransport extends TTransport { + size + ") detected, you may be sending HTTP GET/POST requests to the Thrift-RPC port, please confirm that you are using the right port"); } else { + int high24 = size >>> 8; + if (high24 >= TLS_MIN_VERSION + && high24 <= TLS_MAX_VERSION + && (i32buf[3] & 0xFF) <= TLS_LENGTH_HIGH_MAX) { + throw new TTransportException( + TTransportException.CORRUPTED_DATA, + "Singular frame size (" + + size + + ") detected, you may be sending TLS requests to the Thrift-RPC port, please confirm that you are using the right port"); + } throw new TTransportException( TTransportException.CORRUPTED_DATA, "Frame size (" + size + ") larger than protect max size (" + thriftMaxFrameSize + ")!"); @@ -146,9 +169,31 @@ public class TElasticFramedTransport extends TTransport { readBuffer.fill(underlying, size); } + protected void checkWriteFrameSize(int size) throws TTransportException { + if (size <= thriftMaxFrameSize) { + return; + } + SocketAddress remoteAddress = null; + if (underlying instanceof TSocket) { + remoteAddress = ((TSocket) underlying).getSocket().getRemoteSocketAddress(); + } + String remoteInfo = (remoteAddress == null) ? "" : " to " + remoteAddress; + String message = + "Frame size (" + + size + + ") larger than protect max size (" + + thriftMaxFrameSize + + ") while writing" + + remoteInfo + + "!"; + close(); + throw new TTransportException(TTransportException.CORRUPTED_DATA, message); + } + @Override public void flush() throws TTransportException { int length = writeBuffer.getPos(); + checkWriteFrameSize(length); TFramedTransport.encodeFrameSize(length, i32buf); underlying.write(i32buf, 0, 4); underlying.write(writeBuffer.getBuffer(), 0, length); diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index d28510c2b7f..7afe263b990 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -47,6 +47,7 @@ public enum TSStatusCode { INTERNAL_SERVER_ERROR(305), DISPATCH_ERROR(306), LICENSE_ERROR(307), + THRIFT_FRAME_OVERSIZE(308), // Client, REDIRECTION_RECOMMEND(400), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java index 065a568353f..94b5dcadecb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java @@ -50,7 +50,16 @@ public enum OperationType { CREATE_SCHEMA_TEMPLATE("createSchemaTemplate"), CHECK_AUTHORITY("checkAuthority"), EXECUTE_NON_QUERY_PLAN("executeNonQueryPlan"), - QUERY_LATENCY("queryLatency"); + SELECT_INTO("selectInto"), + QUERY_LATENCY("queryLatency"), + WRITE_AUDIT_LOG("writeAuditLog"), + PREPARE_STATEMENT("prepareStatement"), + EXECUTE_PREPARED_STATEMENT("executePreparedStatement"), + DEALLOCATE_PREPARED_STATEMENT("deallocatePreparedStatement"), + GET_EARLIEST_TIMESLOTS("getEarliestTimeslots"), + GENERATE_DATA_PARTITION_TABLE("generateDataPartitionTable"), + CHECK_DATA_PARTITION_TABLE_STATUS("checkDataPartitionTableStatus"), + DISPATCH_FRAGMENT_INSTANCE("dispatchFragmentInstance"); private final String name; OperationType(String name) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java index 162ccf22a75..e497942dd88 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -56,6 +56,7 @@ import org.apache.iotdb.rpc.TSStatusCode; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException; import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,6 +71,7 @@ import java.util.concurrent.TimeUnit; import static com.google.common.util.concurrent.Futures.immediateFuture; import static org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.DISPATCH_READ; +import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onThriftFrameOversizeException; public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { @@ -481,6 +483,15 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { TSStatusCode.EXECUTE_STATEMENT_ERROR, String.format("unknown read type [%s]", instance.getType()))); } + } catch (TException e) { + Throwable rootCause = ExceptionUtils.getRootCause(e); + if (rootCause instanceof TTransportException + && ((TTransportException) rootCause).getType() == TTransportException.CORRUPTED_DATA) { + // Don't set DISPATCH_ERROR status to avoid retry if dispatch failed because of thrift frame + // is oversize + throw new FragmentInstanceDispatchException(onThriftFrameOversizeException(rootCause)); + } + throw e; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java index 987eaaf1019..b777ca2e40f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java @@ -45,6 +45,7 @@ import java.util.Objects; import java.util.concurrent.ExecutionException; import static org.apache.iotdb.commons.utils.StatusUtils.needRetry; +import static org.apache.iotdb.db.protocol.thrift.OperationType.DISPATCH_FRAGMENT_INSTANCE; public class ErrorHandlingUtils { @@ -59,10 +60,11 @@ public class ErrorHandlingUtils { "The read statement is not allowed in batch: "; private static final String ERROR_OPERATION_LOG = "Status code: {}, operation: {} failed"; + private static final String EXCEPTION_PATTERN = "[%s] Exception occurred: %s failed. "; public static TSStatus onNpeOrUnexpectedException( Exception e, String operation, TSStatusCode statusCode) { - String message = String.format("[%s] Exception occurred: %s failed. ", statusCode, operation); + String message = String.format(EXCEPTION_PATTERN, statusCode, operation); if (e instanceof IOException || e instanceof NullPointerException) { LOGGER.error(ERROR_OPERATION_LOG, statusCode, operation, e); } else { @@ -93,6 +95,16 @@ public class ErrorHandlingUtils { return e; } + public static TSStatus onThriftFrameOversizeException(Throwable t) { + TSStatus status = + new TSStatus(TSStatusCode.THRIFT_FRAME_OVERSIZE.getStatusCode()).setNeedRetry(false); + String message = + String.format(EXCEPTION_PATTERN, status, DISPATCH_FRAGMENT_INSTANCE) + + getRootCause(t).getMessage(); + LOGGER.warn(message); + return status.setMessage(message); + } + public static TSStatus onQueryException(Exception e, String operation, TSStatusCode statusCode) { TSStatus status = tryCatchQueryException(e); if (status != null) {
