This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch DriverInit10 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ccbc149525d969dc72201f996f27e55dd2a2463b Author: JackieTien97 <[email protected]> AuthorDate: Wed Dec 14 16:26:58 2022 +0800 [IOTDB-5210] Fix closed TsFileSequenceReader still cached in FileReaderManager --- .../iotdb/db/mpp/execution/driver/Driver.java | 31 +++++++++++++--------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java index 086234fa12..b743b90ef0 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java @@ -100,12 +100,6 @@ public abstract class Driver implements IDriver { public ListenableFuture<?> processFor(Duration duration) { SettableFuture<?> blockedFuture = driverBlockedFuture.get(); - // initialization may be time-consuming, so we keep it in the processFor method - // in normal case, it won't cause deadlock and should finish soon, otherwise it will be a - // critical bug - if (!init(blockedFuture)) { - return blockedFuture; - } // if the driver is blocked we don't need to continue if (!blockedFuture.isDone()) { @@ -120,13 +114,26 @@ public abstract class Driver implements IDriver { TimeUnit.MILLISECONDS, true, () -> { - long start = System.nanoTime(); - do { - ListenableFuture<?> future = processInternal(); - if (!future.isDone()) { - return updateDriverBlockedFuture(future); + // only keep doing query processing if driver state is still alive + if (state.get() == State.ALIVE) { + long start = System.nanoTime(); + // initialization may be time-consuming, so we keep it in the processFor method + // in normal case, it won't cause deadlock and should finish soon, otherwise it will + // be a + // critical bug + // We should do initialization after holding the lock to avoid parallelism problems + // with close + if (!init(blockedFuture)) { + return blockedFuture; } - } while (System.nanoTime() - start < maxRuntime && !isFinishedInternal()); + + do { + ListenableFuture<?> future = processInternal(); + if (!future.isDone()) { + return updateDriverBlockedFuture(future); + } + } while (System.nanoTime() - start < maxRuntime && !isFinishedInternal()); + } return NOT_BLOCKED; });
