[HOTFIX] Modified code to fix the degrade in compaction performance Problem Compaction performance for 3.5 billion degraded by 16-20%
Analysis: Code modification in RawResultIerator.java has caused the problem wherein few extra checks are performed as compared to previous code Fix Revert the changes in RawResultIerator class This closes #2613 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7535d46a Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7535d46a Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7535d46a Branch: refs/heads/branch-1.4 Commit: 7535d46ac2ca26072189c1d7cb3e6189572c4ce7 Parents: 7ece100 Author: m00258959 <manish.gu...@huawei.com> Authored: Tue Aug 7 13:24:16 2018 +0800 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Thu Aug 9 23:51:36 2018 +0530 ---------------------------------------------------------------------- .../scan/result/iterator/RawResultIterator.java | 205 ++++++++----------- .../detailquery/SearchModeTestCase.scala | 2 + .../carbondata/spark/rdd/StreamHandoffRDD.scala | 2 +- .../merger/CarbonCompactionExecutor.java | 2 +- 4 files changed, 85 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/7535d46a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java index 94cea91..efa5b8a 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java @@ -16,22 +16,13 @@ */ package org.apache.carbondata.core.scan.result.iterator; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.scan.result.RowBatch; import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper; -import org.apache.carbondata.core.util.CarbonProperties; /** * This is a wrapper iterator over the detail raw query iterator. @@ -39,11 +30,6 @@ import org.apache.carbondata.core.util.CarbonProperties; * This will handle the batch results and will iterate on the batches and give single row. */ public class RawResultIterator extends CarbonIterator<Object[]> { - /** - * LOGGER - */ - private static final LogService LOGGER = - LogServiceFactory.getLogService(RawResultIterator.class.getName()); private final SegmentProperties sourceSegProperties; @@ -53,130 +39,85 @@ public class RawResultIterator extends CarbonIterator<Object[]> { */ private CarbonIterator<RowBatch> detailRawQueryResultIterator; - private boolean prefetchEnabled; - private List<Object[]> currentBuffer; - private List<Object[]> backupBuffer; - private int currentIdxInBuffer; - private ExecutorService executorService; - private Future<Void> fetchFuture; - private Object[] currentRawRow = null; - private boolean isBackupFilled = false; - - public RawResultIterator(CarbonIterator<RowBatch> detailRawQueryResultIterator, - SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties, - boolean isStreamingHandOff) { - this.detailRawQueryResultIterator = detailRawQueryResultIterator; - this.sourceSegProperties = sourceSegProperties; - this.destinationSegProperties = destinationSegProperties; - this.executorService = Executors.newFixedThreadPool(1); - - if (!isStreamingHandOff) { - init(); - } - } - - private void init() { - this.prefetchEnabled = CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE, - CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true"); - try { - new RowsFetcher(false).call(); - if (prefetchEnabled) { - this.fetchFuture = executorService.submit(new RowsFetcher(true)); - } - } catch (Exception e) { - LOGGER.error(e, "Error occurs while fetching records"); - throw new RuntimeException(e); - } - } - /** - * fetch rows + * Counter to maintain the row counter. */ - private final class RowsFetcher implements Callable<Void> { - private boolean isBackupFilling; + private int counter = 0; - private RowsFetcher(boolean isBackupFilling) { - this.isBackupFilling = isBackupFilling; - } - - @Override - public Void call() throws Exception { - if (isBackupFilling) { - backupBuffer = fetchRows(); - isBackupFilled = true; - } else { - currentBuffer = fetchRows(); - } - return null; - } - } - - private List<Object[]> fetchRows() { - if (detailRawQueryResultIterator.hasNext()) { - return detailRawQueryResultIterator.next().getRows(); - } else { - return new ArrayList<>(); - } - } - - private void fillDataFromPrefetch() { - try { - if (currentIdxInBuffer >= currentBuffer.size() && 0 != currentIdxInBuffer) { - if (prefetchEnabled) { - if (!isBackupFilled) { - fetchFuture.get(); - } - // copy backup buffer to current buffer and fill backup buffer asyn - currentIdxInBuffer = 0; - currentBuffer = backupBuffer; - isBackupFilled = false; - fetchFuture = executorService.submit(new RowsFetcher(true)); - } else { - currentIdxInBuffer = 0; - new RowsFetcher(false).call(); - } - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } + private Object[] currentConveretedRawRow = null; /** - * populate a row with index counter increased + * LOGGER */ - private void popRow() { - fillDataFromPrefetch(); - currentRawRow = currentBuffer.get(currentIdxInBuffer); - currentIdxInBuffer++; - } + private static final LogService LOGGER = + LogServiceFactory.getLogService(RawResultIterator.class.getName()); /** - * populate a row with index counter unchanged + * batch of the result. */ - private void pickRow() { - fillDataFromPrefetch(); - currentRawRow = currentBuffer.get(currentIdxInBuffer); + private RowBatch batch; + + public RawResultIterator(CarbonIterator<RowBatch> detailRawQueryResultIterator, + SegmentProperties sourceSegProperties, SegmentProperties destinationSegProperties) { + this.detailRawQueryResultIterator = detailRawQueryResultIterator; + this.sourceSegProperties = sourceSegProperties; + this.destinationSegProperties = destinationSegProperties; } @Override public boolean hasNext() { - fillDataFromPrefetch(); - if (currentIdxInBuffer < currentBuffer.size()) { + if (null == batch || checkIfBatchIsProcessedCompletely(batch)) { + if (detailRawQueryResultIterator.hasNext()) { + batch = null; + batch = detailRawQueryResultIterator.next(); + counter = 0; // batch changed so reset the counter. + } else { + return false; + } + } + if (!checkIfBatchIsProcessedCompletely(batch)) { return true; + } else { + return false; } - - return false; } @Override public Object[] next() { + if (null == batch) { // for 1st time + batch = detailRawQueryResultIterator.next(); + } + if (!checkIfBatchIsProcessedCompletely(batch)) { + try { + if (null != currentConveretedRawRow) { + counter++; + Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow; + currentConveretedRawRow = null; + return currentConveretedRawRowTemp; + } + return convertRow(batch.getRawRow(counter++)); + } catch (KeyGenException e) { + LOGGER.error(e.getMessage()); + return null; + } + } else { // completed one batch. + batch = null; + batch = detailRawQueryResultIterator.next(); + counter = 0; + } try { - popRow(); - return convertRow(this.currentRawRow); + if (null != currentConveretedRawRow) { + counter++; + Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow; + currentConveretedRawRow = null; + return currentConveretedRawRowTemp; + } + return convertRow(batch.getRawRow(counter++)); } catch (KeyGenException e) { - throw new RuntimeException(e); + LOGGER.error(e.getMessage()); + return null; } + } /** @@ -184,22 +125,38 @@ public class RawResultIterator extends CarbonIterator<Object[]> { * @return */ public Object[] fetchConverted() throws KeyGenException { - pickRow(); - return convertRow(this.currentRawRow); + if (null != currentConveretedRawRow) { + return currentConveretedRawRow; + } + if (hasNext()) { + Object[] rawRow = batch.getRawRow(counter); + currentConveretedRawRow = convertRow(rawRow); + return currentConveretedRawRow; + } + else { + return null; + } } private Object[] convertRow(Object[] rawRow) throws KeyGenException { byte[] dims = ((ByteArrayWrapper) rawRow[0]).getDictionaryKey(); long[] keyArray = sourceSegProperties.getDimensionKeyGenerator().getKeyArray(dims); - byte[] convertedBytes = + byte[] covertedBytes = destinationSegProperties.getDimensionKeyGenerator().generateKey(keyArray); - ((ByteArrayWrapper) rawRow[0]).setDictionaryKey(convertedBytes); + ((ByteArrayWrapper) rawRow[0]).setDictionaryKey(covertedBytes); return rawRow; } - public void close() { - if (null != executorService) { - executorService.shutdownNow(); + /** + * To check if the batch is processed completely + * @param batch + * @return + */ + private boolean checkIfBatchIsProcessedCompletely(RowBatch batch) { + if (counter < batch.getSize()) { + return false; + } else { + return true; } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/7535d46a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala index 001f6c0..dbf87a3 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/SearchModeTestCase.scala @@ -48,6 +48,7 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll { override def afterAll = { sql("DROP TABLE IF EXISTS main") + sql("set carbon.search.enabled = false") sqlContext.sparkSession.asInstanceOf[CarbonSession].stopSearchMode() } @@ -117,6 +118,7 @@ class SearchModeTestCase extends QueryTest with BeforeAndAfterAll { checkAnswer(sql("SELECT * FROM main WHERE TEXT_MATCH('id:100000')"), sql(s"SELECT * FROM main WHERE id='100000'")) sql("DROP DATAMAP if exists dm ON TABLE main") + sql("set carbon.search.enabled = false") } test("test lucene datamap with search mode 2") { http://git-wip-us.apache.org/repos/asf/carbondata/blob/7535d46a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala index 1f3decc..d21197c 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala @@ -75,7 +75,7 @@ class HandoffPartition( */ class StreamingRawResultIterator( recordReader: CarbonStreamRecordReader -) extends RawResultIterator(null, null, null, true) { +) extends RawResultIterator(null, null, null) { override def hasNext: Boolean = { recordReader.nextKeyValue() http://git-wip-us.apache.org/repos/asf/carbondata/blob/7535d46a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java index a347313..b0711ba 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java @@ -132,7 +132,7 @@ public class CarbonCompactionExecutor { queryModel.setTableBlockInfos(list); resultList.add( new RawResultIterator(executeBlockList(list, segmentId, task), sourceSegProperties, - destinationSegProperties, false)); + destinationSegProperties)); } } return resultList;