This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new ce40c64  [CARBONDATA-3396] Range Compaction Data Mismatch Fix
ce40c64 is described below

commit ce40c64f552d02417400111e9865ff77a05d4fbd
Author: manishnalla1994 <manish.nalla1...@gmail.com>
AuthorDate: Mon May 27 11:41:10 2019 +0530

    [CARBONDATA-3396] Range Compaction Data Mismatch Fix
    
    Problem : When we have to compact the data second time and the ranges made 
first time have data in more than one file/blocklet, then while compacting 
second time if the first blocklet does not contain any record then the other 
files are also skipped. Also, Global Sort and Local Sort with Range Column were 
taking different time for same data load and compaction as during write step we 
give only 1 core to Global Sort.
    
    Solution : For the first issue we are reading all the blocklets of a given 
range and then breaking only when the batch size is full. For the second issue 
in case of range column both the sort scopes will now take same number of cores 
and behave similarly.
    
    Also changed the number of tasks to be launched during the compaction, now 
based on the number of tasks during load.
    
    This closes #3233
---
 .../core/constants/CarbonCommonConstants.java      |  4 ----
 .../AbstractDetailQueryResultIterator.java         | 14 +------------
 .../scan/result/iterator/RawResultIterator.java    | 11 +++++++++--
 .../carbondata/core/util/CarbonProperties.java     | 23 ++++++++++++++++------
 .../carbondata/spark/rdd/CarbonMergerRDD.scala     | 18 ++++++++++++-----
 .../processing/merger/CarbonCompactionUtil.java    | 11 +++++++++++
 .../store/CarbonFactDataHandlerModel.java          |  3 ++-
 7 files changed, 53 insertions(+), 31 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index e78ea17..aa9dd05 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1193,10 +1193,6 @@ public final class CarbonCommonConstants {
 
   public static final String CARBON_RANGE_COLUMN_SCALE_FACTOR_DEFAULT = "3";
 
-  public static final String CARBON_ENABLE_RANGE_COMPACTION = 
"carbon.enable.range.compaction";
-
-  public static final String CARBON_ENABLE_RANGE_COMPACTION_DEFAULT = "false";
-
   
//////////////////////////////////////////////////////////////////////////////////////////
   // Query parameter start here
   
//////////////////////////////////////////////////////////////////////////////////////////
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
index f39e549..d7f2c0b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/AbstractDetailQueryResultIterator.java
@@ -24,7 +24,6 @@ import java.util.concurrent.ExecutorService;
 
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.DataRefNode;
 import org.apache.carbondata.core.datastore.FileReader;
 import org.apache.carbondata.core.datastore.block.AbstractIndex;
@@ -89,18 +88,7 @@ public abstract class AbstractDetailQueryResultIterator<E> 
extends CarbonIterato
 
   AbstractDetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel 
queryModel,
       ExecutorService execService) {
-    String batchSizeString =
-        
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE);
-    if (null != batchSizeString) {
-      try {
-        batchSize = Integer.parseInt(batchSizeString);
-      } catch (NumberFormatException ne) {
-        LOGGER.error("Invalid inmemory records size. Using default value");
-        batchSize = CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_DEFAULT;
-      }
-    } else {
-      batchSize = CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_DEFAULT;
-    }
+    batchSize = CarbonProperties.getQueryBatchSize();
     this.recorder = queryModel.getStatisticsRecorder();
     this.blockExecutionInfos = infos;
     this.fileReader = FileFactory.getFileHolder(
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
index 4d471b6..911a7dd 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
@@ -59,6 +59,10 @@ public class RawResultIterator extends 
CarbonIterator<Object[]> {
   private boolean isBackupFilled = false;
 
   /**
+   * number of cores which can be used
+   */
+  private int batchSize;
+  /**
    * LOGGER
    */
   private static final Logger LOGGER =
@@ -71,7 +75,7 @@ public class RawResultIterator extends 
CarbonIterator<Object[]> {
     this.sourceSegProperties = sourceSegProperties;
     this.destinationSegProperties = destinationSegProperties;
     this.executorService = Executors.newFixedThreadPool(1);
-
+    batchSize = CarbonProperties.getQueryBatchSize();
     if (init) {
       init();
     }
@@ -116,10 +120,13 @@ public class RawResultIterator extends 
CarbonIterator<Object[]> {
 
   private List<Object[]> fetchRows() throws Exception {
     List<Object[]> converted = new ArrayList<>();
-    if (detailRawQueryResultIterator.hasNext()) {
+    while (detailRawQueryResultIterator.hasNext()) {
       for (Object[] r : detailRawQueryResultIterator.next().getRows()) {
         converted.add(convertRow(r));
       }
+      if (converted.size() >= batchSize) {
+        break;
+      }
     }
     return converted;
   }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index f1aade9..a53c365 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1056,6 +1056,23 @@ public final class CarbonProperties {
     return batchSize;
   }
 
+  public static int getQueryBatchSize() {
+    int batchSize;
+    String batchSizeString =
+        
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE);
+    if (null != batchSizeString) {
+      try {
+        batchSize = Integer.parseInt(batchSizeString);
+      } catch (NumberFormatException ne) {
+        LOGGER.error("Invalid inmemory records size. Using default value");
+        batchSize = CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_DEFAULT;
+      }
+    } else {
+      batchSize = CarbonCommonConstants.DETAIL_QUERY_BATCH_SIZE_DEFAULT;
+    }
+    return batchSize;
+  }
+
   public long getHandoffSize() {
     Long handoffSize;
     try {
@@ -1507,12 +1524,6 @@ public final class CarbonProperties {
     return Boolean.parseBoolean(pushFilters);
   }
 
-  public boolean isRangeCompactionAllowed() {
-    String isRangeCompact = 
getProperty(CarbonCommonConstants.CARBON_ENABLE_RANGE_COMPACTION,
-        CarbonCommonConstants.CARBON_ENABLE_RANGE_COMPACTION_DEFAULT);
-    return Boolean.parseBoolean(isRangeCompact);
-  }
-
   private void validateSortMemorySpillPercentage() {
     String spillPercentageStr = carbonProperties.getProperty(
         CARBON_LOAD_SORT_MEMORY_SPILL_PERCENTAGE,
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 4f4386b..656166d 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -297,8 +297,7 @@ class CarbonMergerRDD[K, V](
     )
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     var rangeColumn: CarbonColumn = null
-    if (CarbonProperties.getInstance().isRangeCompactionAllowed &&
-        
!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) {
+    if 
(!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) {
       // If the table is not a partition table then only we go for range 
column compaction flow
       rangeColumn = carbonTable.getRangeColumn
     }
@@ -339,6 +338,7 @@ class CarbonMergerRDD[K, V](
         java.util.HashMap[String, java.util.List[CarbonInputSplit]]
 
     var totalSize: Double = 0
+    var totalTaskCount: Integer = 0
     var loadMetadataDetails: Array[LoadMetadataDetails] = null
     // Only for range column get the details for the size of segments
     if (null != rangeColumn) {
@@ -386,17 +386,25 @@ class CarbonMergerRDD[K, V](
             updateDetails, updateStatusManager)))) &&
         FileFormat.COLUMNAR_V3.equals(entry.getFileFormat)
       }
+      if (rangeColumn != null) {
+        totalTaskCount = totalTaskCount +
+                         
CarbonCompactionUtil.getTaskCountForSegment(filteredSplits.toArray)
+      }
       carbonInputSplits ++:= filteredSplits
       allSplits.addAll(filteredSplits.asJava)
     }
+    totalTaskCount = totalTaskCount / carbonMergerMapping.validSegments.size
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     var allRanges: Array[Object] = new Array[Object](0)
     var singleRange = false
     if (rangeColumn != null) {
-      // To calculate the number of ranges to be made, min 2 ranges/tasks to 
be made in any case
+      // Calculate the number of ranges to be made, min 2 ranges/tasks to be 
made in any case
+      // We take the minimum of average number of tasks created during load 
time and the number
+      // of tasks we get based on size for creating ranges.
       val numOfPartitions = Math
-        .max(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL.toInt, 
DataLoadProcessBuilderOnSpark
-          .getNumPatitionsBasedOnSize(totalSize, carbonTable, carbonLoadModel, 
true))
+        .max(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL.toInt,
+          Math.min(totalTaskCount, DataLoadProcessBuilderOnSpark
+            .getNumPatitionsBasedOnSize(totalSize, carbonTable, 
carbonLoadModel, true)))
       val colName = rangeColumn.getColName
       LOGGER.info(s"Compacting on range column: $colName")
       allRanges = getRangesFromRDD(rangeColumn,
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
index f4a15bb..c3017a7 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
@@ -19,8 +19,10 @@ package org.apache.carbondata.processing.merger;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -601,6 +603,15 @@ public class CarbonCompactionUtil {
     return minMaxVals;
   }
 
+  public static int getTaskCountForSegment(CarbonInputSplit[] splits) {
+    Set<String> taskIdSet = new HashSet<>();
+    for (CarbonInputSplit split : splits) {
+      String taskId = split.taskId;
+      taskIdSet.add(taskId);
+    }
+    return taskIdSet.size();
+  }
+
   /**
    * Returns if the DataFileFooter containing carbondata file contains
    * sorted data or not.
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index e66e233..8aaeb9d 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -741,7 +741,8 @@ public class CarbonFactDataHandlerModel {
       this.numberOfCores = 
CarbonProperties.getInstance().getNumberOfLoadingCores();
     }
 
-    if (this.sortScope != null && 
this.sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
+    if (this.sortScope != null && 
this.sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)
+        && tableSpec.getCarbonTable().getRangeColumn() != null) {
       this.numberOfCores = 1;
     }
     // Overriding it to the task specified cores.

Reply via email to