Repository: carbondata
Updated Branches:
  refs/heads/master 1b8271726 -> d0f88a154


[CARBONDATA-2386]Fix Query performance issue with PreAggregate table

Problem: Query on pre aggregate table is consuming too much time.
Root cause: Time consumption to calculate size of selecting the smallest 
Pre-Aggregate table is approximately 76 seconds. This is index file is being 
read when segment file is present to compute the size of Pre-Aggregate table
Solution: Read table status and get the size of data file and index file for 
valid segments. For older segments were datasize and indexsize is not present 
calculate the size of store folder

This closes #2213


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d0f88a15
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d0f88a15
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d0f88a15

Branch: refs/heads/master
Commit: d0f88a1542fd09b2c65d3df7ece0564e194ed5be
Parents: 1b82717
Author: kumarvishal09 <kumarvishal1...@gmail.com>
Authored: Mon Apr 23 17:03:00 2018 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Thu Apr 26 13:16:45 2018 +0530

----------------------------------------------------------------------
 .../apache/carbondata/core/datamap/Segment.java | 22 ++++++++++++++++++++
 .../statusmanager/SegmentStatusManager.java     |  5 +++--
 .../preaaggregate/PreAggregateUtil.scala        | 19 +++++------------
 .../sql/hive/CarbonPreAggregateRules.scala      |  2 +-
 .../apache/spark/sql/hive/CarbonRelation.scala  |  9 ++++----
 5 files changed, 36 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0f88a15/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index 251ea38..9179bbc 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -54,6 +54,11 @@ public class Segment implements Serializable {
   private ReadCommittedScope readCommittedScope;
 
   /**
+   * keeps all the details about segments
+   */
+  private LoadMetadataDetails loadMetadataDetails;
+
+  /**
    * ReadCommittedScope will be null. So getCommittedIndexFile will not work 
and will throw
    * a NullPointerException. In case getCommittedIndexFile is need to be 
accessed then
    * use the other constructor and pass proper ReadCommittedScope.
@@ -79,6 +84,19 @@ public class Segment implements Serializable {
   }
 
   /**
+   * @param segmentNo
+   * @param segmentFileName
+   * @param readCommittedScope
+   */
+  public Segment(String segmentNo, String segmentFileName, ReadCommittedScope 
readCommittedScope,
+      LoadMetadataDetails loadMetadataDetails) {
+    this.segmentNo = segmentNo;
+    this.segmentFileName = segmentFileName;
+    this.readCommittedScope = readCommittedScope;
+    this.loadMetadataDetails = loadMetadataDetails;
+  }
+
+  /**
    *
    * @return map of Absolute path of index file as key and null as value -- 
without mergeIndex
    * map of AbsolutePath with fileName of MergeIndex parent file as key and 
mergeIndexFileName
@@ -175,4 +193,8 @@ public class Segment implements Serializable {
       return segmentNo;
     }
   }
+
+  public LoadMetadataDetails getLoadMetadataDetails() {
+    return loadMetadataDetails;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0f88a15/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index a4011bc..c7925c9 100755
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -141,7 +141,7 @@ public class SegmentStatusManager {
           // check for merged loads.
           if (null != segment.getMergedLoadName()) {
             Segment seg = new Segment(segment.getMergedLoadName(), 
segment.getSegmentFile(),
-                readCommittedScope);
+                readCommittedScope, segment);
             if (!listOfValidSegments.contains(seg)) {
               listOfValidSegments.add(seg);
             }
@@ -164,7 +164,8 @@ public class SegmentStatusManager {
             continue;
           }
           listOfValidSegments.add(
-              new Segment(segment.getLoadName(), segment.getSegmentFile(), 
readCommittedScope));
+              new Segment(segment.getLoadName(), segment.getSegmentFile(), 
readCommittedScope,
+                  segment));
         } else if ((SegmentStatus.LOAD_FAILURE == segment.getSegmentStatus()
             || SegmentStatus.COMPACTED == segment.getSegmentStatus()
             || SegmentStatus.MARKED_FOR_DELETE == segment.getSegmentStatus())) 
{

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0f88a15/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index 8a95767..a7bcddb 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -746,8 +746,8 @@ object PreAggregateUtil {
    * @param aggExp aggregate expression
    * @return list of fields
    */
-  def validateAggregateFunctionAndGetFields(aggExp: AggregateExpression,
-      addCastForCount: Boolean = true): Seq[AggregateExpression] = {
+  def validateAggregateFunctionAndGetFields(aggExp: AggregateExpression)
+  : Seq[AggregateExpression] = {
     aggExp.aggregateFunction match {
       case Sum(MatchCastExpression(exp: Expression, changeDataType: DataType)) 
=>
         Seq(AggregateExpression(Sum(Cast(
@@ -784,23 +784,14 @@ object PreAggregateUtil {
       // in case of average need to return two columns
       // sum and count of the column to added during table creation to support 
rollup
       case Average(MatchCastExpression(exp: Expression, changeDataType: 
DataType)) =>
-        val sum = AggregateExpression(Sum(Cast(
+        Seq(AggregateExpression(Sum(Cast(
           exp,
           changeDataType)),
           aggExp.mode,
-          aggExp.isDistinct)
-        val count = if (!addCastForCount) {
+          aggExp.isDistinct),
           AggregateExpression(Count(exp),
             aggExp.mode,
-            aggExp.isDistinct)
-        } else {
-          AggregateExpression(Count(Cast(
-            exp,
-            changeDataType)),
-            aggExp.mode,
-            aggExp.isDistinct)
-        }
-        Seq(sum, count)
+            aggExp.isDistinct))
       // in case of average need to return two columns
       // sum and count of the column to added during table creation to support 
rollup
       case Average(exp: Expression) =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0f88a15/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index ab8ec30..9cf3d68 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -1762,7 +1762,7 @@ case class 
CarbonPreAggregateDataLoadingRules(sparkSession: SparkSession)
           case alias@Alias(aggExp: AggregateExpression, name) =>
             // get the updated expression for avg convert it to two expression
             // sum and count
-            val expressions = 
PreAggregateUtil.validateAggregateFunctionAndGetFields(aggExp, false)
+            val expressions = 
PreAggregateUtil.validateAggregateFunctionAndGetFields(aggExp)
             // if size is more than one then it was for average
             if(expressions.size > 1) {
               val sumExp = PreAggregateUtil.normalizeExprId(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d0f88a15/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index 0c69b9d..5739d3e 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -176,10 +176,11 @@ case class CarbonRelation(
           var size = 0L
           // for each segment calculate the size
           segments.foreach {validSeg =>
-            if (validSeg.getSegmentFileName != null) {
-              size = size + CarbonUtil.getSizeOfSegment(
-                carbonTable.getTablePath,
-                new Segment(validSeg.getSegmentNo, 
validSeg.getSegmentFileName))
+            // for older store
+            if (null != validSeg.getLoadMetadataDetails.getDataSize &&
+                null != validSeg.getLoadMetadataDetails.getIndexSize) {
+              size = size + validSeg.getLoadMetadataDetails.getDataSize.toLong 
+
+                     validSeg.getLoadMetadataDetails.getIndexSize.toLong
             } else {
               size = size + FileFactory.getDirectorySize(
                 CarbonTablePath.getSegmentPath(tablePath, 
validSeg.getSegmentNo))

Reply via email to