Github user parthchandra commented on a diff in the pull request: https://github.com/apache/drill/pull/723#discussion_r98271038 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java --- @@ -192,45 +235,74 @@ private DrillBuf decompress(PageHeader pageHeader, DrillBuf compressedData) { stats.timeDataPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime()); } pageHeader = readStatus.getPageHeader(); - // reset this. At the time of calling close, if this is not null then a pending asyncPageRead needs to be consumed - asyncPageRead = null; - } catch (Exception e) { - handleAndThrowException(e, "Error reading page data."); - } // TODO - figure out if we need multiple dictionary pages, I believe it may be limited to one // I think we are clobbering parts of the dictionary if there can be multiple pages of dictionary - do { - if (pageHeader.getType() == PageType.DICTIONARY_PAGE) { - readDictionaryPageData(readStatus, parentColumnReader); - // Ugly. Use the Async task to make a synchronous read call. - readStatus = new AsyncPageReaderTask().call(); - pageHeader = readStatus.getPageHeader(); - } - } while (pageHeader.getType() == PageType.DICTIONARY_PAGE); - - if (parentColumnReader.totalValuesRead + readStatus.getValuesRead() - < parentColumnReader.columnChunkMetaData.getValueCount()) { - asyncPageRead = threadPool.submit(new AsyncPageReaderTask()); - } + do { + if (pageHeader.getType() == PageType.DICTIONARY_PAGE) { + readDictionaryPageData(readStatus, parentColumnReader); + asyncPageRead.poll().get(); // get the result of execution + synchronized (pageQueue) { + boolean pageQueueFull = pageQueue.remainingCapacity() == 0; + readStatus = pageQueue.take(); // get the data if no exception has been thrown + if (readStatus.pageData == null || readStatus == ReadStatus.EMPTY) { + break; + } + //if the queue was full before we took a page out, then there would + // have been no new read tasks scheduled. In that case, schedule a new read. + if (pageQueueFull) { + asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue))); + } + } + assert (readStatus.pageData != null); + pageHeader = readStatus.getPageHeader(); + } + } while (pageHeader.getType() == PageType.DICTIONARY_PAGE); pageHeader = readStatus.getPageHeader(); pageData = getDecompressedPageData(readStatus); - + assert(pageData != null); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e){ + handleAndThrowException(e, "Error reading page data"); + } } - @Override public void clear() { - if (asyncPageRead != null) { + while (asyncPageRead != null && !asyncPageRead.isEmpty()) { try { - final ReadStatus readStatus = asyncPageRead.get(); - readStatus.getPageData().release(); + Future<Boolean> f = asyncPageRead.poll(); + if(!f.isDone() && !f.isCancelled()){ + f.cancel(true); + } else { + Boolean b = f.get(1, TimeUnit.MILLISECONDS); --- End diff -- OK
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---