Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2133#discussion_r181136505
  
    --- Diff: 
core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
 ---
    @@ -39,106 +53,131 @@
        */
       private CarbonIterator<RowBatch> detailRawQueryResultIterator;
     
    -  /**
    -   * Counter to maintain the row counter.
    -   */
    -  private int counter = 0;
    -
    -  private Object[] currentConveretedRawRow = null;
    -
    -  /**
    -   * LOGGER
    -   */
    -  private static final LogService LOGGER =
    -      LogServiceFactory.getLogService(RawResultIterator.class.getName());
    -
    -  /**
    -   * batch of the result.
    -   */
    -  private RowBatch batch;
    +  private boolean prefetchEnabled;
    +  private List<Object[]> currentBuffer;
    +  private List<Object[]> backupBuffer;
    +  private int currentIdxInBuffer;
    +  private ExecutorService executorService;
    +  private Future<Void> fetchFuture;
    +  private Object[] currentRawRow = null;
    +  private boolean isBackupFilled = false;
     
       public RawResultIterator(CarbonIterator<RowBatch> 
detailRawQueryResultIterator,
    -      SegmentProperties sourceSegProperties, SegmentProperties 
destinationSegProperties) {
    +      SegmentProperties sourceSegProperties, SegmentProperties 
destinationSegProperties,
    +      boolean isStreamingHandOff) {
         this.detailRawQueryResultIterator = detailRawQueryResultIterator;
         this.sourceSegProperties = sourceSegProperties;
         this.destinationSegProperties = destinationSegProperties;
    +    this.executorService = Executors.newFixedThreadPool(1);
    +
    +    if (!isStreamingHandOff) {
    +      init();
    +    }
       }
     
    -  @Override public boolean hasNext() {
    +  private void init() {
    +    this.prefetchEnabled = CarbonProperties.getInstance().getProperty(
    +        CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE,
    +        
CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true");
    +    try {
    +      new RowsFetcher(false).call();
    +      if (prefetchEnabled) {
    +        this.fetchFuture = executorService.submit(new RowsFetcher(true));
    +      }
    +    } catch (Exception e) {
    +      LOGGER.error(e, "Error occurs while fetching records");
    +      throw new RuntimeException(e);
    +    }
    +  }
    +
    +  /**
    +   * fetch rows
    +   */
    +  private final class RowsFetcher implements Callable<Void> {
    +    private boolean isBackupFilling;
     
    -    if (null == batch || checkIfBatchIsProcessedCompletely(batch)) {
    -      if (detailRawQueryResultIterator.hasNext()) {
    -        batch = null;
    -        batch = detailRawQueryResultIterator.next();
    -        counter = 0; // batch changed so reset the counter.
    +    private RowsFetcher(boolean isBackupFilling) {
    +      this.isBackupFilling = isBackupFilling;
    +    }
    +
    +    @Override
    +    public Void call() throws Exception {
    +      if (isBackupFilling) {
    +        backupBuffer = fetchRows();
    +        isBackupFilled = true;
           } else {
    -        return false;
    +        currentBuffer = fetchRows();
           }
    +      return null;
         }
    +  }
     
    -    if (!checkIfBatchIsProcessedCompletely(batch)) {
    -      return true;
    +  private List<Object[]> fetchRows() {
    +    if (detailRawQueryResultIterator.hasNext()) {
    +      return detailRawQueryResultIterator.next().getRows();
         } else {
    -      return false;
    +      return new ArrayList<>();
         }
       }
     
    -  @Override public Object[] next() {
    -    if (null == batch) { // for 1st time
    -      batch = detailRawQueryResultIterator.next();
    -    }
    -    if (!checkIfBatchIsProcessedCompletely(batch)) {
    -      try {
    -        if (null != currentConveretedRawRow) {
    -          counter++;
    -          Object[] currentConveretedRawRowTemp = 
this.currentConveretedRawRow;
    -          currentConveretedRawRow = null;
    -          return currentConveretedRawRowTemp;
    +  private void fillDataFromPrefetch() {
    +    try {
    +      if (currentIdxInBuffer >= currentBuffer.size() && 0 != 
currentIdxInBuffer) {
    +        if (prefetchEnabled) {
    +          if (!isBackupFilled) {
    +            fetchFuture.get();
    +          }
    +          // copy backup buffer to current buffer and fill backup buffer 
asyn
    +          currentIdxInBuffer = 0;
    +          currentBuffer = backupBuffer;
    +          isBackupFilled = false;
    +          fetchFuture = executorService.submit(new RowsFetcher(true));
    +        } else {
    +          currentIdxInBuffer = 0;
    +          new RowsFetcher(false).call();
             }
    -        return convertRow(batch.getRawRow(counter++));
    -      } catch (KeyGenException e) {
    -        LOGGER.error(e.getMessage());
    -        return null;
           }
    -    } else { // completed one batch.
    -      batch = null;
    -      batch = detailRawQueryResultIterator.next();
    -      counter = 0;
    +    } catch (Exception e) {
    +      throw new RuntimeException(e);
         }
    -    try {
    -      if (null != currentConveretedRawRow) {
    -        counter++;
    -        Object[] currentConveretedRawRowTemp = 
this.currentConveretedRawRow;
    -        currentConveretedRawRow = null;
    -        return currentConveretedRawRowTemp;
    -      }
    +  }
     
    -      return convertRow(batch.getRawRow(counter++));
    -    } catch (KeyGenException e) {
    -      LOGGER.error(e.getMessage());
    -      return null;
    +  private void popRow() {
    --- End diff --
    
    please add comment


---

Reply via email to