Repository: carbondata
Updated Branches:
  refs/heads/master cf1e4d4ca -> e26cccc41


[CARBONDATA-2304][Compaction] Prefetch rowbatch during compaction

Add a configuration to enable prefetch during compaction.

During compaction, carbondata will query on the segments and retrieve a row, 
then it will sort the rows and produce the final carbondata file.

Currently we find the poor performance in retrieving the rows, so adding 
prefetch for the rows will surely improve the compaction performance.

This closes #2133


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e26cccc4
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e26cccc4
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e26cccc4

Branch: refs/heads/master
Commit: e26cccc41df9c86879558d2d3721d7048004f638
Parents: cf1e4d4
Author: xuchuanyin <xuchuan...@hust.edu.cn>
Authored: Mon Apr 2 20:38:17 2018 +0800
Committer: Jacky Li <jacky.li...@qq.com>
Committed: Tue Apr 17 15:29:16 2018 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   8 +
 .../scan/result/iterator/RawResultIterator.java | 202 +++++++++++--------
 ...mpactionSupportGlobalSortParameterTest.scala |  40 ++++
 .../carbondata/spark/rdd/StreamHandoffRDD.scala |   5 +-
 .../merger/CarbonCompactionExecutor.java        |   2 +-
 .../merger/CompactionResultSortProcessor.java   |   1 +
 .../merger/RowResultMergerProcessor.java        |   1 +
 7 files changed, 176 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e26cccc4/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index e644680..df995e0 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1645,6 +1645,14 @@ public final class CarbonCommonConstants {
 
   public static final String CARBON_SEARCH_MODE_SCAN_THREAD_DEFAULT = "-1";
 
+  /*
+   * whether to enable prefetch for rowbatch to enhance row reconstruction 
during compaction
+   */
+  @CarbonProperty
+  public static final String CARBON_COMPACTION_PREFETCH_ENABLE =
+      "carbon.compaction.prefetch.enable";
+  public static final String CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT = 
"false";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e26cccc4/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
index 1dd1595..1fe50a2 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
@@ -16,13 +16,22 @@
  */
 package org.apache.carbondata.core.scan.result.iterator;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.scan.result.RowBatch;
 import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
+import org.apache.carbondata.core.util.CarbonProperties;
 
 /**
  * This is a wrapper iterator over the detail raw query iterator.
@@ -30,6 +39,11 @@ import 
org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
  * This will handle the batch results and will iterate on the batches and give 
single row.
  */
 public class RawResultIterator extends CarbonIterator<Object[]> {
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(RawResultIterator.class.getName());
 
   private final SegmentProperties sourceSegProperties;
 
@@ -39,86 +53,130 @@ public class RawResultIterator extends 
CarbonIterator<Object[]> {
    */
   private CarbonIterator<RowBatch> detailRawQueryResultIterator;
 
-  /**
-   * Counter to maintain the row counter.
-   */
-  private int counter = 0;
-
-  private Object[] currentConveretedRawRow = null;
-
-  /**
-   * LOGGER
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(RawResultIterator.class.getName());
-
-  /**
-   * batch of the result.
-   */
-  private RowBatch batch;
+  private boolean prefetchEnabled;
+  private List<Object[]> currentBuffer;
+  private List<Object[]> backupBuffer;
+  private int currentIdxInBuffer;
+  private ExecutorService executorService;
+  private Future<Void> fetchFuture;
+  private Object[] currentRawRow = null;
+  private boolean isBackupFilled = false;
 
   public RawResultIterator(CarbonIterator<RowBatch> 
detailRawQueryResultIterator,
-      SegmentProperties sourceSegProperties, SegmentProperties 
destinationSegProperties) {
+      SegmentProperties sourceSegProperties, SegmentProperties 
destinationSegProperties,
+      boolean isStreamingHandOff) {
     this.detailRawQueryResultIterator = detailRawQueryResultIterator;
     this.sourceSegProperties = sourceSegProperties;
     this.destinationSegProperties = destinationSegProperties;
+    this.executorService = Executors.newFixedThreadPool(1);
+
+    if (!isStreamingHandOff) {
+      init();
+    }
   }
 
-  @Override public boolean hasNext() {
+  private void init() {
+    this.prefetchEnabled = CarbonProperties.getInstance().getProperty(
+        CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE,
+        
CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true");
+    try {
+      new RowsFetcher(false).call();
+      if (prefetchEnabled) {
+        this.fetchFuture = executorService.submit(new RowsFetcher(true));
+      }
+    } catch (Exception e) {
+      LOGGER.error(e, "Error occurs while fetching records");
+      throw new RuntimeException(e);
+    }
+  }
 
-    if (null == batch || checkIfBatchIsProcessedCompletely(batch)) {
-      if (detailRawQueryResultIterator.hasNext()) {
-        batch = null;
-        batch = detailRawQueryResultIterator.next();
-        counter = 0; // batch changed so reset the counter.
+  /**
+   * fetch rows
+   */
+  private final class RowsFetcher implements Callable<Void> {
+    private boolean isBackupFilling;
+
+    private RowsFetcher(boolean isBackupFilling) {
+      this.isBackupFilling = isBackupFilling;
+    }
+
+    @Override
+    public Void call() throws Exception {
+      if (isBackupFilling) {
+        backupBuffer = fetchRows();
+        isBackupFilled = true;
       } else {
-        return false;
+        currentBuffer = fetchRows();
       }
+      return null;
     }
+  }
 
-    if (!checkIfBatchIsProcessedCompletely(batch)) {
-      return true;
+  private List<Object[]> fetchRows() {
+    if (detailRawQueryResultIterator.hasNext()) {
+      return detailRawQueryResultIterator.next().getRows();
     } else {
-      return false;
+      return new ArrayList<>();
     }
   }
 
-  @Override public Object[] next() {
-    if (null == batch) { // for 1st time
-      batch = detailRawQueryResultIterator.next();
-    }
-    if (!checkIfBatchIsProcessedCompletely(batch)) {
-      try {
-        if (null != currentConveretedRawRow) {
-          counter++;
-          Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow;
-          currentConveretedRawRow = null;
-          return currentConveretedRawRowTemp;
+  private void fillDataFromPrefetch() {
+    try {
+      if (currentIdxInBuffer >= currentBuffer.size() && 0 != 
currentIdxInBuffer) {
+        if (prefetchEnabled) {
+          if (!isBackupFilled) {
+            fetchFuture.get();
+          }
+          // copy backup buffer to current buffer and fill backup buffer asyn
+          currentIdxInBuffer = 0;
+          currentBuffer = backupBuffer;
+          isBackupFilled = false;
+          fetchFuture = executorService.submit(new RowsFetcher(true));
+        } else {
+          currentIdxInBuffer = 0;
+          new RowsFetcher(false).call();
         }
-        return convertRow(batch.getRawRow(counter++));
-      } catch (KeyGenException e) {
-        LOGGER.error(e.getMessage());
-        return null;
       }
-    } else { // completed one batch.
-      batch = null;
-      batch = detailRawQueryResultIterator.next();
-      counter = 0;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
     }
-    try {
-      if (null != currentConveretedRawRow) {
-        counter++;
-        Object[] currentConveretedRawRowTemp = this.currentConveretedRawRow;
-        currentConveretedRawRow = null;
-        return currentConveretedRawRowTemp;
-      }
+  }
 
-      return convertRow(batch.getRawRow(counter++));
-    } catch (KeyGenException e) {
-      LOGGER.error(e.getMessage());
-      return null;
+  /**
+   * populate a row with index counter increased
+   */
+  private void popRow() {
+    fillDataFromPrefetch();
+    currentRawRow = currentBuffer.get(currentIdxInBuffer);
+    currentIdxInBuffer++;
+  }
+
+  /**
+   * populate a row with index counter unchanged
+   */
+  private void pickRow() {
+    fillDataFromPrefetch();
+    currentRawRow = currentBuffer.get(currentIdxInBuffer);
+  }
+
+  @Override
+  public boolean hasNext() {
+    fillDataFromPrefetch();
+    if (currentIdxInBuffer < currentBuffer.size()) {
+      return true;
     }
 
+    return false;
+  }
+
+  @Override
+  public Object[] next() {
+    try {
+      popRow();
+      return convertRow(this.currentRawRow);
+    } catch (KeyGenException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   /**
@@ -126,19 +184,8 @@ public class RawResultIterator extends 
CarbonIterator<Object[]> {
    * @return
    */
   public Object[] fetchConverted() throws KeyGenException {
-    if (null != currentConveretedRawRow) {
-      return currentConveretedRawRow;
-    }
-    if (hasNext())
-    {
-      Object[] rawRow = batch.getRawRow(counter);
-      currentConveretedRawRow = convertRow(rawRow);
-      return currentConveretedRawRow;
-    }
-    else
-    {
-      return null;
-    }
+    pickRow();
+    return convertRow(this.currentRawRow);
   }
 
   private Object[] convertRow(Object[] rawRow) throws KeyGenException {
@@ -150,16 +197,9 @@ public class RawResultIterator extends 
CarbonIterator<Object[]> {
     return rawRow;
   }
 
-  /**
-   * To check if the batch is processed completely
-   * @param batch
-   * @return
-   */
-  private boolean checkIfBatchIsProcessedCompletely(RowBatch batch) {
-    if (counter < batch.getSize()) {
-      return false;
-    } else {
-      return true;
+  public void close() {
+    if (null != executorService) {
+      executorService.shutdownNow();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e26cccc4/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
index 02c602a..2da1ada 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
@@ -518,6 +518,46 @@ class CompactionSupportGlobalSortParameterTest extends 
QueryTest with BeforeAndA
     
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
       CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT)
   }
+
+  test("MAJOR, ENABLE_PREFETCH_DURING_COMPACTION: true") {
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE,
 "true")
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
 "false")
+    for (i <- 0 until 2) {
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort 
OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort 
OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort 
OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+    }
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, 
"global_sort")
+
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, 
"city,name")
+
+    sql("delete from table compaction_globalsort where SEGMENT.ID in (1,2,3)")
+    sql("delete from table carbon_localsort where SEGMENT.ID in (1,2,3)")
+    sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, 
"Compacted")
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) 
}
+    assert(SegmentSequenceIds.contains("0.1"))
+    assert(SegmentSequenceIds.length == 7)
+
+    checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), 
Seq(Row(12)))
+
+    checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+      sql("SELECT * FROM carbon_localsort"))
+
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, 
"Success")
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, 
"Marked for Delete")
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
+      CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE,
+      CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT)
+  }
+
   private def resetConf() {
     val prop = CarbonProperties.getInstance()
     prop.addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, 
CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e26cccc4/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
index 3cf9c55..f69e237 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
@@ -69,10 +69,13 @@ class HandoffPartition(
 
 /**
  * package the record reader of the handoff segment to RawResultIterator
+ * todo: actually we should not extends rawResultIterator if we don't use any 
method or variable
+ * from it. We only use it to reduce duplicate code for compaction and handoff
+ * and we can extract it later
  */
 class StreamingRawResultIterator(
     recordReader: CarbonStreamRecordReader
-) extends RawResultIterator(null, null, null) {
+) extends RawResultIterator(null, null, null, true) {
 
   override def hasNext: Boolean = {
     recordReader.nextKeyValue()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e26cccc4/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
index 6a401d8..306019c 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
@@ -117,7 +117,7 @@ public class CarbonCompactionExecutor {
         LOGGER.info("for task -" + task + "-block size is -" + list.size());
         queryModel.setTableBlockInfos(list);
         resultList.add(new RawResultIterator(executeBlockList(list), 
sourceSegProperties,
-            destinationSegProperties));
+            destinationSegProperties, false));
       }
     }
     return resultList;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e26cccc4/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index dd8f739..fef8ab9 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -235,6 +235,7 @@ public class CompactionResultSortProcessor extends 
AbstractResultProcessor {
           isRecordFound = true;
         }
       }
+      resultIterator.close();
     }
     try {
       sortDataRows.startSorting();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e26cccc4/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index 64e8b1e..2f06738 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -128,6 +128,7 @@ public class RowResultMergerProcessor extends 
AbstractResultProcessor {
         // index
         if (!iterator.hasNext()) {
           index--;
+          iterator.close();
           continue;
         }
         // add record to heap

Reply via email to