Repository: carbondata Updated Branches: refs/heads/master 1b8271726 -> d0f88a154
[CARBONDATA-2386]Fix Query performance issue with PreAggregate table Problem: Query on pre aggregate table is consuming too much time. Root cause: Time consumption to calculate size of selecting the smallest Pre-Aggregate table is approximately 76 seconds. This is index file is being read when segment file is present to compute the size of Pre-Aggregate table Solution: Read table status and get the size of data file and index file for valid segments. For older segments were datasize and indexsize is not present calculate the size of store folder This closes #2213 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d0f88a15 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d0f88a15 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d0f88a15 Branch: refs/heads/master Commit: d0f88a1542fd09b2c65d3df7ece0564e194ed5be Parents: 1b82717 Author: kumarvishal09 <kumarvishal1...@gmail.com> Authored: Mon Apr 23 17:03:00 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Thu Apr 26 13:16:45 2018 +0530 ---------------------------------------------------------------------- .../apache/carbondata/core/datamap/Segment.java | 22 ++++++++++++++++++++ .../statusmanager/SegmentStatusManager.java | 5 +++-- .../preaaggregate/PreAggregateUtil.scala | 19 +++++------------ .../sql/hive/CarbonPreAggregateRules.scala | 2 +- .../apache/spark/sql/hive/CarbonRelation.scala | 9 ++++---- 5 files changed, 36 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0f88a15/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java index 251ea38..9179bbc 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java @@ -54,6 +54,11 @@ public class Segment implements Serializable { private ReadCommittedScope readCommittedScope; /** + * keeps all the details about segments + */ + private LoadMetadataDetails loadMetadataDetails; + + /** * ReadCommittedScope will be null. So getCommittedIndexFile will not work and will throw * a NullPointerException. In case getCommittedIndexFile is need to be accessed then * use the other constructor and pass proper ReadCommittedScope. @@ -79,6 +84,19 @@ public class Segment implements Serializable { } /** + * @param segmentNo + * @param segmentFileName + * @param readCommittedScope + */ + public Segment(String segmentNo, String segmentFileName, ReadCommittedScope readCommittedScope, + LoadMetadataDetails loadMetadataDetails) { + this.segmentNo = segmentNo; + this.segmentFileName = segmentFileName; + this.readCommittedScope = readCommittedScope; + this.loadMetadataDetails = loadMetadataDetails; + } + + /** * * @return map of Absolute path of index file as key and null as value -- without mergeIndex * map of AbsolutePath with fileName of MergeIndex parent file as key and mergeIndexFileName @@ -175,4 +193,8 @@ public class Segment implements Serializable { return segmentNo; } } + + public LoadMetadataDetails getLoadMetadataDetails() { + return loadMetadataDetails; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0f88a15/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java index a4011bc..c7925c9 100755 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java @@ -141,7 +141,7 @@ public class SegmentStatusManager { // check for merged loads. if (null != segment.getMergedLoadName()) { Segment seg = new Segment(segment.getMergedLoadName(), segment.getSegmentFile(), - readCommittedScope); + readCommittedScope, segment); if (!listOfValidSegments.contains(seg)) { listOfValidSegments.add(seg); } @@ -164,7 +164,8 @@ public class SegmentStatusManager { continue; } listOfValidSegments.add( - new Segment(segment.getLoadName(), segment.getSegmentFile(), readCommittedScope)); + new Segment(segment.getLoadName(), segment.getSegmentFile(), readCommittedScope, + segment)); } else if ((SegmentStatus.LOAD_FAILURE == segment.getSegmentStatus() || SegmentStatus.COMPACTED == segment.getSegmentStatus() || SegmentStatus.MARKED_FOR_DELETE == segment.getSegmentStatus())) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0f88a15/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala index 8a95767..a7bcddb 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala @@ -746,8 +746,8 @@ object PreAggregateUtil { * @param aggExp aggregate expression * @return list of fields */ - def validateAggregateFunctionAndGetFields(aggExp: AggregateExpression, - addCastForCount: Boolean = true): Seq[AggregateExpression] = { + def validateAggregateFunctionAndGetFields(aggExp: AggregateExpression) + : Seq[AggregateExpression] = { aggExp.aggregateFunction match { case Sum(MatchCastExpression(exp: Expression, changeDataType: DataType)) => Seq(AggregateExpression(Sum(Cast( @@ -784,23 +784,14 @@ object PreAggregateUtil { // in case of average need to return two columns // sum and count of the column to added during table creation to support rollup case Average(MatchCastExpression(exp: Expression, changeDataType: DataType)) => - val sum = AggregateExpression(Sum(Cast( + Seq(AggregateExpression(Sum(Cast( exp, changeDataType)), aggExp.mode, - aggExp.isDistinct) - val count = if (!addCastForCount) { + aggExp.isDistinct), AggregateExpression(Count(exp), aggExp.mode, - aggExp.isDistinct) - } else { - AggregateExpression(Count(Cast( - exp, - changeDataType)), - aggExp.mode, - aggExp.isDistinct) - } - Seq(sum, count) + aggExp.isDistinct)) // in case of average need to return two columns // sum and count of the column to added during table creation to support rollup case Average(exp: Expression) => http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0f88a15/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala index ab8ec30..9cf3d68 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala @@ -1762,7 +1762,7 @@ case class CarbonPreAggregateDataLoadingRules(sparkSession: SparkSession) case alias@Alias(aggExp: AggregateExpression, name) => // get the updated expression for avg convert it to two expression // sum and count - val expressions = PreAggregateUtil.validateAggregateFunctionAndGetFields(aggExp, false) + val expressions = PreAggregateUtil.validateAggregateFunctionAndGetFields(aggExp) // if size is more than one then it was for average if(expressions.size > 1) { val sumExp = PreAggregateUtil.normalizeExprId( http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0f88a15/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala index 0c69b9d..5739d3e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala @@ -176,10 +176,11 @@ case class CarbonRelation( var size = 0L // for each segment calculate the size segments.foreach {validSeg => - if (validSeg.getSegmentFileName != null) { - size = size + CarbonUtil.getSizeOfSegment( - carbonTable.getTablePath, - new Segment(validSeg.getSegmentNo, validSeg.getSegmentFileName)) + // for older store + if (null != validSeg.getLoadMetadataDetails.getDataSize && + null != validSeg.getLoadMetadataDetails.getIndexSize) { + size = size + validSeg.getLoadMetadataDetails.getDataSize.toLong + + validSeg.getLoadMetadataDetails.getIndexSize.toLong } else { size = size + FileFactory.getDirectorySize( CarbonTablePath.getSegmentPath(tablePath, validSeg.getSegmentNo))