Github user parthchandra commented on a diff in the pull request:

    https://github.com/apache/drill/pull/723#discussion_r98271038
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
 ---
    @@ -192,45 +235,74 @@ private DrillBuf decompress(PageHeader pageHeader, 
DrillBuf compressedData) {
             stats.timeDataPageLoads.addAndGet(timeBlocked + 
readStatus.getDiskScanTime());
           }
           pageHeader = readStatus.getPageHeader();
    -      // reset this. At the time of calling close, if this is not null 
then a pending asyncPageRead needs to be consumed
    -      asyncPageRead = null;
    -    } catch (Exception e) {
    -      handleAndThrowException(e, "Error reading page data.");
    -    }
     
         // TODO - figure out if we need multiple dictionary pages, I believe 
it may be limited to one
         // I think we are clobbering parts of the dictionary if there can be 
multiple pages of dictionary
     
    -    do {
    -      if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
    -        readDictionaryPageData(readStatus, parentColumnReader);
    -        // Ugly. Use the Async task to make a synchronous read call.
    -        readStatus = new AsyncPageReaderTask().call();
    -        pageHeader = readStatus.getPageHeader();
    -      }
    -    } while (pageHeader.getType() == PageType.DICTIONARY_PAGE);
    -
    -    if (parentColumnReader.totalValuesRead + readStatus.getValuesRead()
    -        < parentColumnReader.columnChunkMetaData.getValueCount()) {
    -      asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
    -    }
    +      do {
    +        if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
    +          readDictionaryPageData(readStatus, parentColumnReader);
    +          asyncPageRead.poll().get(); // get the result of execution
    +          synchronized (pageQueue) {
    +            boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
    +            readStatus = pageQueue.take(); // get the data if no exception 
has been thrown
    +            if (readStatus.pageData == null || readStatus == 
ReadStatus.EMPTY) {
    +              break;
    +            }
    +            //if the queue was full before we took a page out, then there 
would
    +            // have been no new read tasks scheduled. In that case, 
schedule a new read.
    +            if (pageQueueFull) {
    +              asyncPageRead.offer(threadPool.submit(new 
AsyncPageReaderTask(debugName, pageQueue)));
    +            }
    +          }
    +          assert (readStatus.pageData != null);
    +          pageHeader = readStatus.getPageHeader();
    +        }
    +      } while (pageHeader.getType() == PageType.DICTIONARY_PAGE);
     
         pageHeader = readStatus.getPageHeader();
         pageData = getDecompressedPageData(readStatus);
    -
    +    assert(pageData != null);
    +    } catch (InterruptedException e) {
    +      Thread.currentThread().interrupt();
    +    } catch (Exception e){
    +      handleAndThrowException(e, "Error reading page data");
    +    }
     
       }
     
    -
       @Override public void clear() {
    -    if (asyncPageRead != null) {
    +    while (asyncPageRead != null && !asyncPageRead.isEmpty()) {
           try {
    -        final ReadStatus readStatus = asyncPageRead.get();
    -        readStatus.getPageData().release();
    +        Future<Boolean> f = asyncPageRead.poll();
    +        if(!f.isDone() && !f.isCancelled()){
    +          f.cancel(true);
    +        } else {
    +          Boolean b = f.get(1, TimeUnit.MILLISECONDS);
    --- End diff --
    
    OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to