This is an automated email from the ASF dual-hosted git repository.
JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a15c7330d91 Fix appearance of dispatching FI failed because of thrift
frame is oversize (#17536)
a15c7330d91 is described below
commit a15c7330d912bbc924533a08dd5bcd2fbae70a59
Author: Weihao Li <[email protected]>
AuthorDate: Sun Apr 26 08:44:43 2026 +0800
Fix appearance of dispatching FI failed because of thrift frame is oversize
(#17536)
---
.../rpc/TCompressedElasticFramedTransport.java | 3 ++-
.../apache/iotdb/rpc/TElasticFramedTransport.java | 21 +++++++++++++++++++--
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
.../iotdb/db/protocol/thrift/OperationType.java | 3 ++-
.../scheduler/FragmentInstanceDispatcherImpl.java | 11 +++++++++++
.../apache/iotdb/db/utils/ErrorHandlingUtils.java | 14 +++++++++++++-
6 files changed, 48 insertions(+), 5 deletions(-)
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
index 5b9c81ec58b..62abc28e470 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
@@ -44,7 +44,7 @@ public abstract class TCompressedElasticFramedTransport
extends TElasticFramedTr
protected void readFrame() throws TTransportException {
underlying.readAll(i32buf, 0, 4);
int size = TFramedTransport.decodeFrameSize(i32buf);
- checkFrameSize(size);
+ validateFrame(size);
readBuffer.fill(underlying, size);
RpcStat.readCompressedBytes.addAndGet(size);
try {
@@ -69,6 +69,7 @@ public abstract class TCompressedElasticFramedTransport
extends TElasticFramedTr
writeCompressBuffer.resizeIfNecessary(maxCompressedLength);
int compressedLength =
compress(writeBuffer.getBuffer(), 0, length,
writeCompressBuffer.getBuffer(), 0);
+ checkWriteFrameSize(compressedLength);
RpcStat.writeCompressedBytes.addAndGet(compressedLength);
TFramedTransport.encodeFrameSize(compressedLength, i32buf);
underlying.write(i32buf, 0, 4);
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
index 31e0f0b6960..955fdd01a9e 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
@@ -174,11 +174,11 @@ public class TElasticFramedTransport extends TTransport {
protected void readFrame() throws TTransportException {
underlying.readAll(i32buf, 0, 4);
int size = TFramedTransport.decodeFrameSize(i32buf);
- checkFrameSize(size);
+ validateFrame(size);
readBuffer.fill(underlying, size);
}
- protected void checkFrameSize(int size) throws TTransportException {
+ protected void validateFrame(int size) throws TTransportException {
final int HTTP_GET_SIGNATURE = 0x47455420; // "GET "
final int HTTP_POST_SIGNATURE = 0x504F5354; // "POST"
final int TLS_MIN_VERSION = 0x160300;
@@ -241,9 +241,26 @@ public class TElasticFramedTransport extends TTransport {
}
}
+ protected void checkWriteFrameSize(int size) throws TTransportException {
+ if (size <= thriftMaxFrameSize) {
+ return;
+ }
+ SocketAddress remoteAddress = null;
+ if (underlying instanceof TSocket) {
+ remoteAddress = ((TSocket)
underlying).getSocket().getRemoteSocketAddress();
+ }
+ String remoteInfo = (remoteAddress == null) ? "" : " to " + remoteAddress;
+ String message =
+ String.format(
+ FrameError.FRAME_SIZE_EXCEEDED.messageFormat, size,
thriftMaxFrameSize, remoteInfo);
+ close();
+ throw new TTransportException(TTransportException.CORRUPTED_DATA, message);
+ }
+
@Override
public void flush() throws TTransportException {
int length = writeBuffer.getPos();
+ checkWriteFrameSize(length);
TFramedTransport.encodeFrameSize(length, i32buf);
underlying.write(i32buf, 0, 4);
underlying.write(writeBuffer.getBuffer(), 0, length);
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index f9e750f4012..9fb0bb2f464 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -51,6 +51,7 @@ public enum TSStatusCode {
INTERNAL_SERVER_ERROR(305),
DISPATCH_ERROR(306),
LICENSE_ERROR(307),
+ THRIFT_FRAME_OVERSIZE(308),
// Client,
REDIRECTION_RECOMMEND(400),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java
index 881e823ef2d..94b5dcadecb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java
@@ -58,7 +58,8 @@ public enum OperationType {
DEALLOCATE_PREPARED_STATEMENT("deallocatePreparedStatement"),
GET_EARLIEST_TIMESLOTS("getEarliestTimeslots"),
GENERATE_DATA_PARTITION_TABLE("generateDataPartitionTable"),
- CHECK_DATA_PARTITION_TABLE_STATUS("checkDataPartitionTableStatus");
+ CHECK_DATA_PARTITION_TABLE_STATUS("checkDataPartitionTableStatus"),
+ DISPATCH_FRAGMENT_INSTANCE("dispatchFragmentInstance");
private final String name;
OperationType(String name) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index b8c47425db8..105e568991f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -59,6 +59,7 @@ import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
import org.apache.tsfile.external.commons.lang3.exception.ExceptionUtils;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.Preconditions;
@@ -78,6 +79,7 @@ import java.util.stream.Collectors;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static
org.apache.iotdb.calc.metric.QueryExecutionMetricSet.DISPATCH_READ;
+import static
org.apache.iotdb.db.utils.ErrorHandlingUtils.onThriftFrameOversizeException;
public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher
{
@@ -549,6 +551,15 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
TSStatusCode.EXECUTE_STATEMENT_ERROR,
String.format("unknown read type [%s]",
instance.getType())));
}
+ } catch (TException e) {
+ Throwable rootCause = ExceptionUtils.getRootCause(e);
+ if (rootCause instanceof TTransportException
+ && ((TTransportException) rootCause).getType() ==
TTransportException.CORRUPTED_DATA) {
+ // Don't set DISPATCH_ERROR status to avoid retry if dispatch failed
because of thrift frame
+ // is oversize
+ throw new
FragmentInstanceDispatchException(onThriftFrameOversizeException(rootCause));
+ }
+ throw e;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
index 369dafa3b06..533fa8ab640 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
@@ -47,6 +47,7 @@ import java.util.Objects;
import java.util.concurrent.ExecutionException;
import static org.apache.iotdb.commons.utils.StatusUtils.needRetry;
+import static
org.apache.iotdb.db.protocol.thrift.OperationType.DISPATCH_FRAGMENT_INSTANCE;
public class ErrorHandlingUtils {
@@ -61,10 +62,11 @@ public class ErrorHandlingUtils {
"The read statement is not allowed in batch: ";
private static final String ERROR_OPERATION_LOG = "Status code: {},
operation: {} failed";
+ private static final String EXCEPTION_PATTERN = "[%s] Exception occurred: %s
failed. ";
public static TSStatus onNpeOrUnexpectedException(
Exception e, String operation, TSStatusCode statusCode) {
- String message = String.format("[%s] Exception occurred: %s failed. ",
statusCode, operation);
+ String message = String.format(EXCEPTION_PATTERN, statusCode, operation);
if (e instanceof IOException || e instanceof NullPointerException) {
LOGGER.error(ERROR_OPERATION_LOG, statusCode, operation, e);
} else {
@@ -88,6 +90,16 @@ public class ErrorHandlingUtils {
return onNpeOrUnexpectedException(e, operation.getName(), statusCode);
}
+ public static TSStatus onThriftFrameOversizeException(Throwable t) {
+ TSStatus status =
+ new
TSStatus(TSStatusCode.THRIFT_FRAME_OVERSIZE.getStatusCode()).setNeedRetry(false);
+ String message =
+ String.format(EXCEPTION_PATTERN, status, DISPATCH_FRAGMENT_INSTANCE)
+ + ErrorHandlingCommonUtils.getRootCause(t).getMessage();
+ LOGGER.warn(message);
+ return status.setMessage(message);
+ }
+
public static TSStatus onQueryException(Exception e, String operation,
TSStatusCode statusCode) {
TSStatus status = tryCatchQueryException(e);
if (status != null) {