This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b101ec  [CARBONDATA-3594] Optimize getSplits() during compaction
5b101ec is described below

commit 5b101ec781286ab98516eb6d1f327b7a36c4b8c7
Author: Indhumathi27 <indhumathi...@gmail.com>
AuthorDate: Fri Nov 22 16:26:10 2019 +0530

    [CARBONDATA-3594] Optimize getSplits() during compaction
    
    Problem:
    In MergerRDD, for compaction of n segments per task, get splits is called n 
times.
    Solution:
    In MergerRDD, for per compaction task,get all validSegments and call 
getsplits only once for those valid segments
    
    This closes #3475
---
 .../carbondata/spark/rdd/CarbonMergerRDD.scala     | 76 ++++++++++++----------
 1 file changed, 41 insertions(+), 35 deletions(-)

diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index febaeca..5e33ea7 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -343,7 +343,7 @@ class CarbonMergerRDD[K, V](
     var noOfBlocks = 0
 
     val taskInfoList = new java.util.ArrayList[Distributable]
-    var carbonInputSplits = mutable.Seq[CarbonInputSplit]()
+    var carbonInputSplits = mutable.ArrayBuffer[CarbonInputSplit]()
     var allSplits = new java.util.ArrayList[InputSplit]
 
     var splitsOfLastSegment: List[CarbonInputSplit] = null
@@ -359,6 +359,8 @@ class CarbonMergerRDD[K, V](
       loadMetadataDetails = SegmentStatusManager
         .readLoadMetadata(CarbonTablePath.getMetadataPath(tablePath))
     }
+
+    val validSegIds: java.util.List[String] = new util.ArrayList[String]()
     // for each valid segment.
     for (eachSeg <- carbonMergerMapping.validSegments) {
       // In case of range column get the size for calculation of number of 
ranges
@@ -369,44 +371,48 @@ class CarbonMergerRDD[K, V](
           }
         }
       }
+      validSegIds.add(eachSeg.getSegmentNo)
+    }
 
-      // map for keeping the relation of a task and its blocks.
-      job.getConfiguration.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, 
eachSeg.getSegmentNo)
-
+    // map for keeping the relation of a task and its blocks.
+    job.getConfiguration
+      .set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, 
validSegIds.asScala.mkString(","))
+
+    val updated: Boolean = updateStatusManager.getUpdateStatusDetails.length 
!= 0
+    // get splits
+    val splits = format.getSplits(job)
+
+    // keep on assigning till last one is reached.
+    if (null != splits && splits.size > 0) {
+      splitsOfLastSegment = splits.asScala
+        .map(_.asInstanceOf[CarbonInputSplit])
+        .filter { split => FileFormat.COLUMNAR_V3.equals(split.getFileFormat) 
}.toList.asJava
+    }
+    val filteredSplits = 
splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter { entry =>
+      val segmentId = Segment.toSegment(entry.getSegmentId).getSegmentNo
+      val blockInfo = new TableBlockInfo(entry.getFilePath,
+        entry.getStart, entry.getSegmentId,
+        entry.getLocations, entry.getLength, entry.getVersion,
+        updateStatusManager.getDeleteDeltaFilePath(
+          entry.getFilePath,
+          segmentId)
+      )
       if (updateStatusManager.getUpdateStatusDetails.length != 0) {
-         updateDetails = 
updateStatusManager.getInvalidTimestampRange(eachSeg.getSegmentNo)
-      }
-
-      val updated: Boolean = updateStatusManager.getUpdateStatusDetails.length 
!= 0
-      // get splits
-      val splits = format.getSplits(job)
-
-      // keep on assigning till last one is reached.
-      if (null != splits && splits.size > 0) {
-        splitsOfLastSegment = splits.asScala
-          .map(_.asInstanceOf[CarbonInputSplit])
-          .filter { split => 
FileFormat.COLUMNAR_V3.equals(split.getFileFormat) }.toList.asJava
-      }
-       val filteredSplits = 
splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter{ entry =>
-        val blockInfo = new TableBlockInfo(entry.getFilePath,
-          entry.getStart, entry.getSegmentId,
-          entry.getLocations, entry.getLength, entry.getVersion,
-          updateStatusManager.getDeleteDeltaFilePath(
-            entry.getFilePath,
-            Segment.toSegment(entry.getSegmentId).getSegmentNo)
-        )
-        (!updated || (updated && (!CarbonUtil
-          .isInvalidTableBlock(blockInfo.getSegmentId, blockInfo.getFilePath,
-            updateDetails, updateStatusManager)))) &&
-        FileFormat.COLUMNAR_V3.equals(entry.getFileFormat)
+        updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId)
       }
-      if (rangeColumn != null) {
-        totalTaskCount = totalTaskCount +
-                         
CarbonCompactionUtil.getTaskCountForSegment(filteredSplits.toArray)
-      }
-      carbonInputSplits ++:= filteredSplits
-      allSplits.addAll(filteredSplits.asJava)
+      // filter splits with V3 data file format
+      // if split is updated, then check for if it is valid segment based on 
update details
+      (!updated ||
+       (updated && (!CarbonUtil.isInvalidTableBlock(blockInfo.getSegmentId, 
blockInfo.getFilePath,
+         updateDetails, updateStatusManager)))) &&
+      FileFormat.COLUMNAR_V3.equals(entry.getFileFormat)
+    }
+    if (rangeColumn != null) {
+      totalTaskCount = totalTaskCount +
+                       
CarbonCompactionUtil.getTaskCountForSegment(filteredSplits.toArray)
     }
+    carbonInputSplits ++= filteredSplits
+    allSplits.addAll(filteredSplits.asJava)
     totalTaskCount = totalTaskCount / carbonMergerMapping.validSegments.size
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     var allRanges: Array[Object] = new Array[Object](0)

Reply via email to