This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 65ad98410810d47c7bbd2b4f5bcd2e0a877bd014
Author: rupengwang <wangrup...@live.cn>
AuthorDate: Thu Aug 27 10:31:01 2020 +0800

    KYLIN-4722 Add more statistics to the query results
---
 .../java/org/apache/kylin/common/QueryContext.java | 28 ++++++++++++++++++++
 .../spark/sql/hive/utils/QueryMetricUtils.scala    | 23 ++++++++++++-----
 .../kylin/query/pushdown/SparkSqlClient.scala      |  2 +-
 .../kylin/query/runtime/plans/ResultPlan.scala     |  5 +++-
 .../apache/kylin/rest/response/SQLResponse.java    | 30 ++++++++++++++++++++++
 .../apache/kylin/rest/service/QueryService.java    |  6 +++++
 6 files changed, 85 insertions(+), 9 deletions(-)

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java 
b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index 801eb52..eb01b6e 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -62,6 +62,9 @@ public class QueryContext {
     private AtomicLong scannedBytes = new AtomicLong();
     private AtomicLong sourceScanBytes = new AtomicLong();
     private AtomicLong sourceScanRows = new AtomicLong();
+    private AtomicLong scanFiles = new AtomicLong();
+    private AtomicLong metadataTime = new AtomicLong();
+    private AtomicLong scanTime = new AtomicLong();
     private Object calcitePlan;
     private boolean isHighPriorityQuery = false;
     private boolean isTableIndex = false;
@@ -186,6 +189,31 @@ public class QueryContext {
         return sourceScanRows.addAndGet(rows);
     }
 
+    public long getScanFiles() {
+        return scanFiles.get();
+    }
+
+    public long addAndGetScanFiles(long number) {
+        return scanFiles.addAndGet(number);
+    }
+
+    public long getMedataTime() {
+        return metadataTime.get();
+    }
+
+    public long addAndGetMetadataTime(long time) {
+        return metadataTime.addAndGet(time);
+    }
+
+    //Scaned time with Spark
+    public long getScanTime() {
+        return scanTime.get();
+    }
+
+    public long addAndGetScanTime(long time) {
+        return scanTime.addAndGet(time);
+    }
+
     @Clarification(priority = Clarification.Priority.MAJOR, msg = "remove 
this")
     public void addQueryStopListener(QueryStopListener listener) {
         this.stopListeners.add(listener);
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
index dccbba3..2f89f03 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
@@ -25,26 +25,35 @@ import org.apache.spark.sql.hive.execution.HiveTableScanExec
 import scala.collection.JavaConverters._
 
 object QueryMetricUtils extends Logging {
-  def collectScanMetrics(plan: SparkPlan): (java.util.List[java.lang.Long], 
java.util.List[java.lang.Long]) = {
+  def collectScanMetrics(plan: SparkPlan): (java.util.List[java.lang.Long], 
java.util.List[java.lang.Long],
+          java.util.List[java.lang.Long], java.util.List[java.lang.Long], 
java.util.List[java.lang.Long]) = {
     try {
       val metrics = plan.collect {
         case exec: KylinFileSourceScanExec =>
           //(exec.metrics.apply("numOutputRows").value, 
exec.metrics.apply("readBytes").value)
-          (exec.metrics.apply("numOutputRows").value, 0l)
+          (exec.metrics.apply("numOutputRows").value, 
exec.metrics.apply("numFiles").value,
+                  exec.metrics.apply("metadataTime").value, 
exec.metrics.apply("scanTime").value, -1l)
         case exec: FileSourceScanExec =>
           //(exec.metrics.apply("numOutputRows").value, 
exec.metrics.apply("readBytes").value)
-          (exec.metrics.apply("numOutputRows").value, 0l)
+          (exec.metrics.apply("numOutputRows").value, 
exec.metrics.apply("numFiles").value,
+                  exec.metrics.apply("metadataTime").value, 
exec.metrics.apply("scanTime").value, -1l)
         case exec: HiveTableScanExec =>
           //(exec.metrics.apply("numOutputRows").value, 
exec.metrics.apply("readBytes").value)
-          (exec.metrics.apply("numOutputRows").value, 0l)
+          (exec.metrics.apply("numOutputRows").value, 
exec.metrics.apply("numFiles").value,
+                  exec.metrics.apply("metadataTime").value, 
exec.metrics.apply("scanTime").value, -1l)
       }
       val scanRows = metrics.map(metric => 
java.lang.Long.valueOf(metric._1)).toList.asJava
-      val scanBytes = metrics.map(metric => 
java.lang.Long.valueOf(metric._2)).toList.asJava
-      (scanRows, scanBytes)
+      val scanFiles = metrics.map(metrics => 
java.lang.Long.valueOf(metrics._2)).toList.asJava
+      val metadataTime = metrics.map(metrics => 
java.lang.Long.valueOf(metrics._3)).toList.asJava
+      val scanTime = metrics.map(metrics => 
java.lang.Long.valueOf(metrics._4)).toList.asJava
+      val scanBytes = metrics.map(metric => 
java.lang.Long.valueOf(metric._5)).toList.asJava
+
+      (scanRows, scanFiles, metadataTime, scanTime, scanBytes)
     } catch {
       case throwable: Throwable =>
         logWarning("Error occurred when collect query scan metrics.", 
throwable)
-        (List.empty[java.lang.Long].asJava, List.empty[java.lang.Long].asJava)
+        (List.empty[java.lang.Long].asJava, List.empty[java.lang.Long].asJava, 
List.empty[java.lang.Long].asJava,
+                List.empty[java.lang.Long].asJava, 
List.empty[java.lang.Long].asJava)
     }
   }
 }
diff --git 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
index db05c66..4cc2802 100644
--- 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
+++ 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
@@ -86,7 +86,7 @@ object SparkSqlClient {
                        val frame = tempDF.select(columns: _*)
                        val rowList = 
frame.collect().map(_.toSeq.map(_.asInstanceOf[String]).asJava).toSeq.asJava
                        val fieldList = df.schema.map(field => 
SparkTypeUtil.convertSparkFieldToJavaField(field)).asJava
-                       val (scanRows, scanBytes) = 
QueryMetricUtils.collectScanMetrics(frame.queryExecution.executedPlan)
+                       val (scanRows, scanFiles, metadataTime, scanTime, 
scanBytes) = 
QueryMetricUtils.collectScanMetrics(frame.queryExecution.executedPlan)
                        Pair.newPair(rowList, fieldList)
                } catch {
                        case e: Throwable =>
diff --git 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
index 5430723..ffcfdde 100644
--- 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
+++ 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
@@ -106,9 +106,12 @@ object ResultPlan extends Logging {
       interruptOnCancel = true)
     try {
       val rows = df.collect()
-      val (scanRows, scanBytes) = 
QueryMetricUtils.collectScanMetrics(df.queryExecution.executedPlan)
+      val (scanRows, scanFiles, metadataTime, scanTime, scanBytes) = 
QueryMetricUtils.collectScanMetrics(df.queryExecution.executedPlan)
       
QueryContextFacade.current().addAndGetScannedRows(scanRows.asScala.map(Long2long(_)).sum)
+      
QueryContextFacade.current().addAndGetScanFiles(scanFiles.asScala.map(Long2long(_)).sum)
       
QueryContextFacade.current().addAndGetScannedBytes(scanBytes.asScala.map(Long2long(_)).sum)
+      
QueryContextFacade.current().addAndGetMetadataTime(metadataTime.asScala.map(Long2long(_)).sum)
+      
QueryContextFacade.current().addAndGetScanTime(scanTime.asScala.map(Long2long(_)).sum)
       val dt = rows.map { row =>
         var rowIndex = 0
         row.toSeq.map { cell => {
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java 
b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
index 8fa57c6..aaf5add 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
@@ -66,6 +66,12 @@ public class SQLResponse implements Serializable {
 
     protected long totalScanBytes;
 
+    protected long totalScanFiles;
+
+    protected long metadataTime;
+
+    protected long totalSparkScanTime;
+
     protected boolean hitExceptionCache = false;
 
     protected boolean storageCacheUsed = false;
@@ -199,6 +205,30 @@ public class SQLResponse implements Serializable {
         this.totalScanBytes = totalScanBytes;
     }
 
+    public long getTotalScanFiles() {
+        return totalScanFiles;
+    }
+
+    public void setTotalScanFiles(long totalScanFiles) {
+        this.totalScanFiles = totalScanFiles;
+    }
+
+    public long getMetadataTime() {
+        return metadataTime;
+    }
+
+    public void setMetadataTime(long metadataTime) {
+        this.metadataTime = metadataTime;
+    }
+
+    public long getTotalSparkScanTime() {
+        return totalSparkScanTime;
+    }
+
+    public void setTotalSparkScanTime(long totalSparkScanTime) {
+        this.totalSparkScanTime = totalSparkScanTime;
+    }
+
     public boolean isHitExceptionCache() {
         return hitExceptionCache;
     }
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index fd686b5..b6eb54f 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -348,6 +348,9 @@ public class QueryService extends BasicService {
         stringBuilder.append("Realization Names: 
").append(realizationNames).append(newLine);
         stringBuilder.append("Cuboid Ids: ").append(cuboidIds).append(newLine);
         stringBuilder.append("Total scan count: 
").append(response.getTotalScanCount()).append(newLine);
+        stringBuilder.append("Total scan files: 
").append(response.getTotalScanFiles()).append(newLine);
+        stringBuilder.append("Total metadata time: 
").append(response.getMetadataTime()).append("ms").append(newLine);
+        stringBuilder.append("Total spark scan time: 
").append(response.getTotalSparkScanTime()).append("ms").append(newLine);
         stringBuilder.append("Total scan bytes: 
").append(response.getTotalScanBytes()).append(newLine);
         stringBuilder.append("Result row count: 
").append(resultRowCount).append(newLine);
         stringBuilder.append("Accept Partial: 
").append(request.isAcceptPartial()).append(newLine);
@@ -1163,6 +1166,9 @@ public class QueryService extends BasicService {
         SQLResponse response = new SQLResponse(columnMetas, results, 
cubeSb.toString(), 0, isException,
                 exceptionMessage, isPartialResult, isPushDown);
         response.setTotalScanCount(queryContext.getScannedRows());
+        response.setTotalScanFiles(queryContext.getScanFiles());
+        response.setMetadataTime(queryContext.getMedataTime());
+        response.setTotalSparkScanTime(queryContext.getScanTime());
         response.setTotalScanBytes(queryContext.getScannedBytes());
         
response.setCubeSegmentStatisticsList(queryContext.getCubeSegmentStatisticsResultList());
         response.setSparkPool(queryContext.getSparkPool());

Reply via email to