This is an automated email from the ASF dual-hosted git repository. jackylk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 5b101ec [CARBONDATA-3594] Optimize getSplits() during compaction 5b101ec is described below commit 5b101ec781286ab98516eb6d1f327b7a36c4b8c7 Author: Indhumathi27 <indhumathi...@gmail.com> AuthorDate: Fri Nov 22 16:26:10 2019 +0530 [CARBONDATA-3594] Optimize getSplits() during compaction Problem: In MergerRDD, for compaction of n segments per task, get splits is called n times. Solution: In MergerRDD, for per compaction task,get all validSegments and call getsplits only once for those valid segments This closes #3475 --- .../carbondata/spark/rdd/CarbonMergerRDD.scala | 76 ++++++++++++---------- 1 file changed, 41 insertions(+), 35 deletions(-) diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index febaeca..5e33ea7 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -343,7 +343,7 @@ class CarbonMergerRDD[K, V]( var noOfBlocks = 0 val taskInfoList = new java.util.ArrayList[Distributable] - var carbonInputSplits = mutable.Seq[CarbonInputSplit]() + var carbonInputSplits = mutable.ArrayBuffer[CarbonInputSplit]() var allSplits = new java.util.ArrayList[InputSplit] var splitsOfLastSegment: List[CarbonInputSplit] = null @@ -359,6 +359,8 @@ class CarbonMergerRDD[K, V]( loadMetadataDetails = SegmentStatusManager .readLoadMetadata(CarbonTablePath.getMetadataPath(tablePath)) } + + val validSegIds: java.util.List[String] = new util.ArrayList[String]() // for each valid segment. for (eachSeg <- carbonMergerMapping.validSegments) { // In case of range column get the size for calculation of number of ranges @@ -369,44 +371,48 @@ class CarbonMergerRDD[K, V]( } } } + validSegIds.add(eachSeg.getSegmentNo) + } - // map for keeping the relation of a task and its blocks. - job.getConfiguration.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg.getSegmentNo) - + // map for keeping the relation of a task and its blocks. + job.getConfiguration + .set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, validSegIds.asScala.mkString(",")) + + val updated: Boolean = updateStatusManager.getUpdateStatusDetails.length != 0 + // get splits + val splits = format.getSplits(job) + + // keep on assigning till last one is reached. + if (null != splits && splits.size > 0) { + splitsOfLastSegment = splits.asScala + .map(_.asInstanceOf[CarbonInputSplit]) + .filter { split => FileFormat.COLUMNAR_V3.equals(split.getFileFormat) }.toList.asJava + } + val filteredSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter { entry => + val segmentId = Segment.toSegment(entry.getSegmentId).getSegmentNo + val blockInfo = new TableBlockInfo(entry.getFilePath, + entry.getStart, entry.getSegmentId, + entry.getLocations, entry.getLength, entry.getVersion, + updateStatusManager.getDeleteDeltaFilePath( + entry.getFilePath, + segmentId) + ) if (updateStatusManager.getUpdateStatusDetails.length != 0) { - updateDetails = updateStatusManager.getInvalidTimestampRange(eachSeg.getSegmentNo) - } - - val updated: Boolean = updateStatusManager.getUpdateStatusDetails.length != 0 - // get splits - val splits = format.getSplits(job) - - // keep on assigning till last one is reached. - if (null != splits && splits.size > 0) { - splitsOfLastSegment = splits.asScala - .map(_.asInstanceOf[CarbonInputSplit]) - .filter { split => FileFormat.COLUMNAR_V3.equals(split.getFileFormat) }.toList.asJava - } - val filteredSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter{ entry => - val blockInfo = new TableBlockInfo(entry.getFilePath, - entry.getStart, entry.getSegmentId, - entry.getLocations, entry.getLength, entry.getVersion, - updateStatusManager.getDeleteDeltaFilePath( - entry.getFilePath, - Segment.toSegment(entry.getSegmentId).getSegmentNo) - ) - (!updated || (updated && (!CarbonUtil - .isInvalidTableBlock(blockInfo.getSegmentId, blockInfo.getFilePath, - updateDetails, updateStatusManager)))) && - FileFormat.COLUMNAR_V3.equals(entry.getFileFormat) + updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId) } - if (rangeColumn != null) { - totalTaskCount = totalTaskCount + - CarbonCompactionUtil.getTaskCountForSegment(filteredSplits.toArray) - } - carbonInputSplits ++:= filteredSplits - allSplits.addAll(filteredSplits.asJava) + // filter splits with V3 data file format + // if split is updated, then check for if it is valid segment based on update details + (!updated || + (updated && (!CarbonUtil.isInvalidTableBlock(blockInfo.getSegmentId, blockInfo.getFilePath, + updateDetails, updateStatusManager)))) && + FileFormat.COLUMNAR_V3.equals(entry.getFileFormat) + } + if (rangeColumn != null) { + totalTaskCount = totalTaskCount + + CarbonCompactionUtil.getTaskCountForSegment(filteredSplits.toArray) } + carbonInputSplits ++= filteredSplits + allSplits.addAll(filteredSplits.asJava) totalTaskCount = totalTaskCount / carbonMergerMapping.validSegments.size val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) var allRanges: Array[Object] = new Array[Object](0)