Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2133#discussion_r181136505 --- Diff: core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java --- @@ -39,106 +53,131 @@ */ private CarbonIterator<RowBatch> detailRawQueryResultIterator; - /** - * Counter to maintain the row counter. - */ - private int counter = 0; - - private Object[] currentConveretedRawRow = null; - - /** - * LOGGER - */ - private static final LogService LOGGER = - LogServiceFactory.getLogService(RawResultIterator.class.getName()); - - /** - * batch of the result. - */ - private RowBatch batch; + private boolean prefetchEnabled; + private List<Object[]> currentBuffer; + private List<Object[]> backupBuffer; + private int currentIdxInBuffer; + private ExecutorService executorService; + private Future<Void> fetchFuture; + private Object[] currentRawRow = null; + private boolean isBackupFilled = false; public RawResultIterator(CarbonIterator<RowBatch> detailRawQueryResultIterator, - SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties) { + SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties, + boolean isStreamingHandOff) { this.detailRawQueryResultIterator = detailRawQueryResultIterator; this.sourceSegProperties = sourceSegProperties; this.destinationSegProperties = destinationSegProperties; + this.executorService = Executors.newFixedThreadPool(1); + + if (!isStreamingHandOff) { + init(); + } } - @Override public boolean hasNext() { + private void init() { + this.prefetchEnabled = CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE, + CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true"); + try { + new RowsFetcher(false).call(); + if (prefetchEnabled) { + this.fetchFuture = executorService.submit(new RowsFetcher(true)); + } + } catch (Exception e) { + LOGGER.error(e, "Error occurs while fetching records"); + throw new RuntimeException(e); + } + } + + /** + * fetch rows + */ + private final class RowsFetcher implements Callable<Void> { + private boolean isBackupFilling; - if (null == batch || checkIfBatchIsProcessedCompletely(batch)) { - if (detailRawQueryResultIterator.hasNext()) { - batch = null; - batch = detailRawQueryResultIterator.next(); - counter = 0; // batch changed so reset the counter. + private RowsFetcher(boolean isBackupFilling) { + this.isBackupFilling = isBackupFilling; + } + + @Override + public Void call() throws Exception { + if (isBackupFilling) { + backupBuffer = fetchRows(); + isBackupFilled = true; } else { - return false; + currentBuffer = fetchRows(); } + return null; } + } - if (!checkIfBatchIsProcessedCompletely(batch)) { - return true; + private List<Object[]> fetchRows() { + if (detailRawQueryResultIterator.hasNext()) { + return detailRawQueryResultIterator.next().getRows(); } else { - return false; + return new ArrayList<>(); } } - @Override public Object[] next() { - if (null == batch) { // for 1st time - batch = detailRawQueryResultIterator.next(); - } - if (!checkIfBatchIsProcessedCompletely(batch)) { - try { - if (null != currentConveretedRawRow) { - counter++; - Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow; - currentConveretedRawRow = null; - return currentConveretedRawRowTemp; + private void fillDataFromPrefetch() { + try { + if (currentIdxInBuffer >= currentBuffer.size() && 0 != currentIdxInBuffer) { + if (prefetchEnabled) { + if (!isBackupFilled) { + fetchFuture.get(); + } + // copy backup buffer to current buffer and fill backup buffer asyn + currentIdxInBuffer = 0; + currentBuffer = backupBuffer; + isBackupFilled = false; + fetchFuture = executorService.submit(new RowsFetcher(true)); + } else { + currentIdxInBuffer = 0; + new RowsFetcher(false).call(); } - return convertRow(batch.getRawRow(counter++)); - } catch (KeyGenException e) { - LOGGER.error(e.getMessage()); - return null; } - } else { // completed one batch. - batch = null; - batch = detailRawQueryResultIterator.next(); - counter = 0; + } catch (Exception e) { + throw new RuntimeException(e); } - try { - if (null != currentConveretedRawRow) { - counter++; - Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow; - currentConveretedRawRow = null; - return currentConveretedRawRowTemp; - } + } - return convertRow(batch.getRawRow(counter++)); - } catch (KeyGenException e) { - LOGGER.error(e.getMessage()); - return null; + private void popRow() { --- End diff -- please add comment
---