DRILL-5564: Added finally block for stopWait() to avoid all situations where Drill able to miss stopWait() in case of exceptions (it can lead to assertions).
closes #967 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/da88bf7a Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/da88bf7a Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/da88bf7a Branch: refs/heads/master Commit: da88bf7aada502c49ee80a9a24a325abaa052802 Parents: f958d4d Author: Roman Kulyk <rom.ku...@gmail.com> Authored: Fri Sep 29 17:26:39 2017 +0000 Committer: Paul Rogers <prog...@maprtech.com> Committed: Sat Sep 30 19:15:03 2017 -0700 ---------------------------------------------------------------------- .../parquet/columnreaders/AsyncPageReader.java | 27 +++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/da88bf7a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java index 8c89e3a..4f1ac12 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java @@ -218,21 +218,24 @@ class AsyncPageReader extends PageReader { try { Stopwatch timer = Stopwatch.createStarted(); parentColumnReader.parentReader.getOperatorContext().getStats().startWait(); - asyncPageRead.poll().get(); // get the result of execution - synchronized (pageQueueSyncronize) { - boolean pageQueueFull = pageQueue.remainingCapacity() == 0; - readStatus = pageQueue.take(); // get the data if no exception has been thrown - if (readStatus.pageData == null || readStatus == ReadStatus.EMPTY) { - throw new DrillRuntimeException("Unexpected end of data"); - } - //if the queue was full before we took a page out, then there would - // have been no new read tasks scheduled. In that case, schedule a new read. - if (!parentColumnReader.isShuttingDown && pageQueueFull) { - asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue))); + try { + asyncPageRead.poll().get(); // get the result of execution + synchronized (pageQueueSyncronize) { + boolean pageQueueFull = pageQueue.remainingCapacity() == 0; + readStatus = pageQueue.take(); // get the data if no exception has been thrown + if (readStatus.pageData == null || readStatus == ReadStatus.EMPTY) { + throw new DrillRuntimeException("Unexpected end of data"); + } + //if the queue was full before we took a page out, then there would + // have been no new read tasks scheduled. In that case, schedule a new read. + if (!parentColumnReader.isShuttingDown && pageQueueFull) { + asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue))); + } } + } finally { + parentColumnReader.parentReader.getOperatorContext().getStats().stopWait(); } long timeBlocked = timer.elapsed(TimeUnit.NANOSECONDS); - parentColumnReader.parentReader.getOperatorContext().getStats().stopWait(); stats.timeDiskScanWait.addAndGet(timeBlocked); stats.timeDiskScan.addAndGet(readStatus.getDiskScanTime()); if (readStatus.isDictionaryPage) {