This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/ThrowTimeout-1.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 734c662960ee768500e400210512dd93564d6f03 Author: Jackie Tien <[email protected]> AuthorDate: Tue Jun 17 12:03:25 2025 +0800 Fix timeout didn't pass to client bug (cherry picked from commit 232e0c9054f3660b888c921b3b7b256469aad516) --- .../query/QueryTimeoutRuntimeException.java | 4 +++ .../execution/exchange/SharedTsBlockQueue.java | 21 ++++++++++++++++ .../exchange/source/LocalSourceHandle.java | 29 +++++++++++++++++++--- .../fragment/FragmentInstanceManager.java | 8 +++--- 4 files changed, 55 insertions(+), 7 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/query/QueryTimeoutRuntimeException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/query/QueryTimeoutRuntimeException.java index ecabba0542f..dec3081ec7a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/query/QueryTimeoutRuntimeException.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/query/QueryTimeoutRuntimeException.java @@ -29,4 +29,8 @@ public class QueryTimeoutRuntimeException extends RuntimeException { String.format( QUERY_TIMEOUT_EXCEPTION_MESSAGE, startTime, startTime + timeout, currentTime)); } + + public QueryTimeoutRuntimeException(String message) { + super(message); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java index 154ffa9c714..dcd062052b7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java @@ -37,7 +37,9 @@ import org.slf4j.LoggerFactory; import javax.annotation.concurrent.NotThreadSafe; import java.util.LinkedList; +import java.util.Optional; import java.util.Queue; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import static com.google.common.util.concurrent.Futures.immediateVoidFuture; @@ -81,6 +83,8 @@ public class SharedTsBlockQueue { private long maxBytesCanReserve = IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance(); + private volatile Throwable abortedCause = null; + // used for SharedTsBlockQueue listener private final ExecutorService executorService; @@ -177,6 +181,18 @@ public class SharedTsBlockQueue { */ public TsBlock remove() { if (closed) { + // try throw underlying exception instead of "Source handle is aborted." + if (abortedCause != null) { + throw new IllegalStateException(abortedCause); + } + try { + blocked.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } catch (ExecutionException e) { + throw new IllegalStateException(e.getCause() == null ? e : e.getCause()); + } throw new IllegalStateException("queue has been destroyed"); } TsBlock tsBlock = queue.remove(); @@ -332,6 +348,7 @@ public class SharedTsBlockQueue { if (closed) { return; } + abortedCause = t; closed = true; if (!blocked.isDone()) { blocked.setException(t); @@ -354,4 +371,8 @@ public class SharedTsBlockQueue { bufferRetainedSizeInBytes = 0; } } + + public Optional<Throwable> getAbortedCause() { + return Optional.ofNullable(abortedCause); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java index c31f9d655de..0c2d20d46ea 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java @@ -36,6 +36,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; +import java.util.Optional; +import java.util.concurrent.ExecutionException; import static com.google.common.util.concurrent.Futures.nonCancellationPropagating; import static org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.createFullIdFrom; @@ -156,6 +158,7 @@ public class LocalSourceHandle implements ISourceHandle { @Override public boolean isFinished() { synchronized (queue) { + checkSharedQueueIfAborted(); return queue.hasNoMoreTsBlocks() && queue.isEmpty(); } } @@ -251,10 +254,28 @@ public class LocalSourceHandle implements ISourceHandle { } private void checkState() { - if (aborted) { - throw new IllegalStateException("Source handle is aborted."); - } else if (closed) { - throw new IllegalStateException("Source Handle is closed."); + if (aborted || closed) { + checkSharedQueueIfAborted(); + if (queue.isBlocked().isDone()) { + // try throw underlying exception instead of "Source handle is aborted." + try { + queue.isBlocked().get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } catch (ExecutionException e) { + throw new IllegalStateException(e.getCause() == null ? e : e.getCause()); + } + } + throw new IllegalStateException( + "LocalSinkChannel state is ." + (aborted ? "ABORTED" : "CLOSED")); + } + } + + private void checkSharedQueueIfAborted() { + Optional<Throwable> abortedCause = queue.getAbortedCause(); + if (abortedCause.isPresent()) { + throw new IllegalStateException(abortedCause.get()); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java index a82e5a6d92e..5cb43d86bc0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java @@ -26,6 +26,7 @@ import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.execution.driver.IDriver; @@ -56,7 +57,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import static java.util.Objects.requireNonNull; @@ -421,8 +421,10 @@ public class FragmentInstanceManager { execution .getStateMachine() .failed( - new TimeoutException( - "Query has executed more than " + execution.getTimeoutInMs() + "ms")); + new QueryTimeoutRuntimeException( + "Query has executed more than " + + execution.getTimeoutInMs() + + "ms, and now is in flushing state")); } }); }
