Repository: carbondata Updated Branches: refs/heads/master cf1e4d4ca -> e26cccc41
[CARBONDATA-2304][Compaction] Prefetch rowbatch during compaction Add a configuration to enable prefetch during compaction. During compaction, carbondata will query on the segments and retrieve a rowï¼ then it will sort the rows and produce the final carbondata file. Currently we find the poor performance in retrieving the rows, so adding prefetch for the rows will surely improve the compaction performance. This closes #2133 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e26cccc4 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e26cccc4 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e26cccc4 Branch: refs/heads/master Commit: e26cccc41df9c86879558d2d3721d7048004f638 Parents: cf1e4d4 Author: xuchuanyin <xuchuan...@hust.edu.cn> Authored: Mon Apr 2 20:38:17 2018 +0800 Committer: Jacky Li <jacky.li...@qq.com> Committed: Tue Apr 17 15:29:16 2018 +0800 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 8 + .../scan/result/iterator/RawResultIterator.java | 202 +++++++++++-------- ...mpactionSupportGlobalSortParameterTest.scala | 40 ++++ .../carbondata/spark/rdd/StreamHandoffRDD.scala | 5 +- .../merger/CarbonCompactionExecutor.java | 2 +- .../merger/CompactionResultSortProcessor.java | 1 + .../merger/RowResultMergerProcessor.java | 1 + 7 files changed, 176 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/e26cccc4/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index e644680..df995e0 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1645,6 +1645,14 @@ public final class CarbonCommonConstants { public static final String CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT = "-1"; + /* + * whether to enable prefetch for rowbatch to enhance row reconstruction during compaction + */ + @CarbonProperty + public static final String CARBON_COMPACTION_PREFETCH_ENABLE = + "carbon.compaction.prefetch.enable"; + public static final String CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT = "false"; + private CarbonCommonConstants() { } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/e26cccc4/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java index 1dd1595..1fe50a2 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java @@ -16,13 +16,22 @@ */ package org.apache.carbondata.core.scan.result.iterator; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.scan.result.RowBatch; import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper; +import org.apache.carbondata.core.util.CarbonProperties; /** * This is a wrapper iterator over the detail raw query iterator. @@ -30,6 +39,11 @@ import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper; * This will handle the batch results and will iterate on the batches and give single row. */ public class RawResultIterator extends CarbonIterator<Object[]> { + /** + * LOGGER + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(RawResultIterator.class.getName()); private final SegmentProperties sourceSegProperties; @@ -39,86 +53,130 @@ public class RawResultIterator extends CarbonIterator<Object[]> { */ private CarbonIterator<RowBatch> detailRawQueryResultIterator; - /** - * Counter to maintain the row counter. - */ - private int counter = 0; - - private Object[] currentConveretedRawRow = null; - - /** - * LOGGER - */ - private static final LogService LOGGER = - LogServiceFactory.getLogService(RawResultIterator.class.getName()); - - /** - * batch of the result. - */ - private RowBatch batch; + private boolean prefetchEnabled; + private List<Object[]> currentBuffer; + private List<Object[]> backupBuffer; + private int currentIdxInBuffer; + private ExecutorService executorService; + private Future<Void> fetchFuture; + private Object[] currentRawRow = null; + private boolean isBackupFilled = false; public RawResultIterator(CarbonIterator<RowBatch> detailRawQueryResultIterator, - SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties) { + SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties, + boolean isStreamingHandOff) { this.detailRawQueryResultIterator = detailRawQueryResultIterator; this.sourceSegProperties = sourceSegProperties; this.destinationSegProperties = destinationSegProperties; + this.executorService = Executors.newFixedThreadPool(1); + + if (!isStreamingHandOff) { + init(); + } } - @Override public boolean hasNext() { + private void init() { + this.prefetchEnabled = CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE, + CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true"); + try { + new RowsFetcher(false).call(); + if (prefetchEnabled) { + this.fetchFuture = executorService.submit(new RowsFetcher(true)); + } + } catch (Exception e) { + LOGGER.error(e, "Error occurs while fetching records"); + throw new RuntimeException(e); + } + } - if (null == batch || checkIfBatchIsProcessedCompletely(batch)) { - if (detailRawQueryResultIterator.hasNext()) { - batch = null; - batch = detailRawQueryResultIterator.next(); - counter = 0; // batch changed so reset the counter. + /** + * fetch rows + */ + private final class RowsFetcher implements Callable<Void> { + private boolean isBackupFilling; + + private RowsFetcher(boolean isBackupFilling) { + this.isBackupFilling = isBackupFilling; + } + + @Override + public Void call() throws Exception { + if (isBackupFilling) { + backupBuffer = fetchRows(); + isBackupFilled = true; } else { - return false; + currentBuffer = fetchRows(); } + return null; } + } - if (!checkIfBatchIsProcessedCompletely(batch)) { - return true; + private List<Object[]> fetchRows() { + if (detailRawQueryResultIterator.hasNext()) { + return detailRawQueryResultIterator.next().getRows(); } else { - return false; + return new ArrayList<>(); } } - @Override public Object[] next() { - if (null == batch) { // for 1st time - batch = detailRawQueryResultIterator.next(); - } - if (!checkIfBatchIsProcessedCompletely(batch)) { - try { - if (null != currentConveretedRawRow) { - counter++; - Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow; - currentConveretedRawRow = null; - return currentConveretedRawRowTemp; + private void fillDataFromPrefetch() { + try { + if (currentIdxInBuffer >= currentBuffer.size() && 0 != currentIdxInBuffer) { + if (prefetchEnabled) { + if (!isBackupFilled) { + fetchFuture.get(); + } + // copy backup buffer to current buffer and fill backup buffer asyn + currentIdxInBuffer = 0; + currentBuffer = backupBuffer; + isBackupFilled = false; + fetchFuture = executorService.submit(new RowsFetcher(true)); + } else { + currentIdxInBuffer = 0; + new RowsFetcher(false).call(); } - return convertRow(batch.getRawRow(counter++)); - } catch (KeyGenException e) { - LOGGER.error(e.getMessage()); - return null; } - } else { // completed one batch. - batch = null; - batch = detailRawQueryResultIterator.next(); - counter = 0; + } catch (Exception e) { + throw new RuntimeException(e); } - try { - if (null != currentConveretedRawRow) { - counter++; - Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow; - currentConveretedRawRow = null; - return currentConveretedRawRowTemp; - } + } - return convertRow(batch.getRawRow(counter++)); - } catch (KeyGenException e) { - LOGGER.error(e.getMessage()); - return null; + /** + * populate a row with index counter increased + */ + private void popRow() { + fillDataFromPrefetch(); + currentRawRow = currentBuffer.get(currentIdxInBuffer); + currentIdxInBuffer++; + } + + /** + * populate a row with index counter unchanged + */ + private void pickRow() { + fillDataFromPrefetch(); + currentRawRow = currentBuffer.get(currentIdxInBuffer); + } + + @Override + public boolean hasNext() { + fillDataFromPrefetch(); + if (currentIdxInBuffer < currentBuffer.size()) { + return true; } + return false; + } + + @Override + public Object[] next() { + try { + popRow(); + return convertRow(this.currentRawRow); + } catch (KeyGenException e) { + throw new RuntimeException(e); + } } /** @@ -126,19 +184,8 @@ public class RawResultIterator extends CarbonIterator<Object[]> { * @return */ public Object[] fetchConverted() throws KeyGenException { - if (null != currentConveretedRawRow) { - return currentConveretedRawRow; - } - if (hasNext()) - { - Object[] rawRow = batch.getRawRow(counter); - currentConveretedRawRow = convertRow(rawRow); - return currentConveretedRawRow; - } - else - { - return null; - } + pickRow(); + return convertRow(this.currentRawRow); } private Object[] convertRow(Object[] rawRow) throws KeyGenException { @@ -150,16 +197,9 @@ public class RawResultIterator extends CarbonIterator<Object[]> { return rawRow; } - /** - * To check if the batch is processed completely - * @param batch - * @return - */ - private boolean checkIfBatchIsProcessedCompletely(RowBatch batch) { - if (counter < batch.getSize()) { - return false; - } else { - return true; + public void close() { + if (null != executorService) { + executorService.shutdownNow(); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/e26cccc4/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala index 02c602a..2da1ada 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala @@ -518,6 +518,46 @@ class CompactionSupportGlobalSortParameterTest extends QueryTest with BeforeAndA CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT, CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT) } + + test("MAJOR, ENABLE_PREFETCH_DURING_COMPACTION: true") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE, "true") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false") + for (i <- 0 until 2) { + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort") + + sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')") + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')") + sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')") + } + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort") + + checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") + + sql("delete from table compaction_globalsort where SEGMENT.ID in (1,2,3)") + sql("delete from table carbon_localsort where SEGMENT.ID in (1,2,3)") + sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'") + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted") + + val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort") + val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(SegmentSequenceIds.contains("0.1")) + assert(SegmentSequenceIds.length == 7) + + checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12))) + + checkAnswer(sql("SELECT * FROM compaction_globalsort"), + sql("SELECT * FROM carbon_localsort")) + + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Success") + checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Marked for Delete") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, + CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE, + CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT) + } + private def resetConf() { val prop = CarbonProperties.getInstance() prop.addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT) http://git-wip-us.apache.org/repos/asf/carbondata/blob/e26cccc4/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala index 3cf9c55..f69e237 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala @@ -69,10 +69,13 @@ class HandoffPartition( /** * package the record reader of the handoff segment to RawResultIterator + * todo: actually we should not extends rawResultIterator if we don't use any method or variable + * from it. We only use it to reduce duplicate code for compaction and handoff + * and we can extract it later */ class StreamingRawResultIterator( recordReader: CarbonStreamRecordReader -) extends RawResultIterator(null, null, null) { +) extends RawResultIterator(null, null, null, true) { override def hasNext: Boolean = { recordReader.nextKeyValue() http://git-wip-us.apache.org/repos/asf/carbondata/blob/e26cccc4/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java index 6a401d8..306019c 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java @@ -117,7 +117,7 @@ public class CarbonCompactionExecutor { LOGGER.info("for task -" + task + "-block size is -" + list.size()); queryModel.setTableBlockInfos(list); resultList.add(new RawResultIterator(executeBlockList(list), sourceSegProperties, - destinationSegProperties)); + destinationSegProperties, false)); } } return resultList; http://git-wip-us.apache.org/repos/asf/carbondata/blob/e26cccc4/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java index dd8f739..fef8ab9 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java @@ -235,6 +235,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { isRecordFound = true; } } + resultIterator.close(); } try { sortDataRows.startSorting(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/e26cccc4/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java index 64e8b1e..2f06738 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java @@ -128,6 +128,7 @@ public class RowResultMergerProcessor extends AbstractResultProcessor { // index if (!iterator.hasNext()) { index--; + iterator.close(); continue; } // add record to heap