This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 65ad98410810d47c7bbd2b4f5bcd2e0a877bd014 Author: rupengwang <wangrup...@live.cn> AuthorDate: Thu Aug 27 10:31:01 2020 +0800 KYLIN-4722 Add more statistics to the query results --- .../java/org/apache/kylin/common/QueryContext.java | 28 ++++++++++++++++++++ .../spark/sql/hive/utils/QueryMetricUtils.scala | 23 ++++++++++++----- .../kylin/query/pushdown/SparkSqlClient.scala | 2 +- .../kylin/query/runtime/plans/ResultPlan.scala | 5 +++- .../apache/kylin/rest/response/SQLResponse.java | 30 ++++++++++++++++++++++ .../apache/kylin/rest/service/QueryService.java | 6 +++++ 6 files changed, 85 insertions(+), 9 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java index 801eb52..eb01b6e 100644 --- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java +++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java @@ -62,6 +62,9 @@ public class QueryContext { private AtomicLong scannedBytes = new AtomicLong(); private AtomicLong sourceScanBytes = new AtomicLong(); private AtomicLong sourceScanRows = new AtomicLong(); + private AtomicLong scanFiles = new AtomicLong(); + private AtomicLong metadataTime = new AtomicLong(); + private AtomicLong scanTime = new AtomicLong(); private Object calcitePlan; private boolean isHighPriorityQuery = false; private boolean isTableIndex = false; @@ -186,6 +189,31 @@ public class QueryContext { return sourceScanRows.addAndGet(rows); } + public long getScanFiles() { + return scanFiles.get(); + } + + public long addAndGetScanFiles(long number) { + return scanFiles.addAndGet(number); + } + + public long getMedataTime() { + return metadataTime.get(); + } + + public long addAndGetMetadataTime(long time) { + return metadataTime.addAndGet(time); + } + + //Scaned time with Spark + public long getScanTime() { + return scanTime.get(); + } + + public long addAndGetScanTime(long time) { + return scanTime.addAndGet(time); + } + @Clarification(priority = Clarification.Priority.MAJOR, msg = "remove this") public void addQueryStopListener(QueryStopListener listener) { this.stopListeners.add(listener); diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala index dccbba3..2f89f03 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala @@ -25,26 +25,35 @@ import org.apache.spark.sql.hive.execution.HiveTableScanExec import scala.collection.JavaConverters._ object QueryMetricUtils extends Logging { - def collectScanMetrics(plan: SparkPlan): (java.util.List[java.lang.Long], java.util.List[java.lang.Long]) = { + def collectScanMetrics(plan: SparkPlan): (java.util.List[java.lang.Long], java.util.List[java.lang.Long], + java.util.List[java.lang.Long], java.util.List[java.lang.Long], java.util.List[java.lang.Long]) = { try { val metrics = plan.collect { case exec: KylinFileSourceScanExec => //(exec.metrics.apply("numOutputRows").value, exec.metrics.apply("readBytes").value) - (exec.metrics.apply("numOutputRows").value, 0l) + (exec.metrics.apply("numOutputRows").value, exec.metrics.apply("numFiles").value, + exec.metrics.apply("metadataTime").value, exec.metrics.apply("scanTime").value, -1l) case exec: FileSourceScanExec => //(exec.metrics.apply("numOutputRows").value, exec.metrics.apply("readBytes").value) - (exec.metrics.apply("numOutputRows").value, 0l) + (exec.metrics.apply("numOutputRows").value, exec.metrics.apply("numFiles").value, + exec.metrics.apply("metadataTime").value, exec.metrics.apply("scanTime").value, -1l) case exec: HiveTableScanExec => //(exec.metrics.apply("numOutputRows").value, exec.metrics.apply("readBytes").value) - (exec.metrics.apply("numOutputRows").value, 0l) + (exec.metrics.apply("numOutputRows").value, exec.metrics.apply("numFiles").value, + exec.metrics.apply("metadataTime").value, exec.metrics.apply("scanTime").value, -1l) } val scanRows = metrics.map(metric => java.lang.Long.valueOf(metric._1)).toList.asJava - val scanBytes = metrics.map(metric => java.lang.Long.valueOf(metric._2)).toList.asJava - (scanRows, scanBytes) + val scanFiles = metrics.map(metrics => java.lang.Long.valueOf(metrics._2)).toList.asJava + val metadataTime = metrics.map(metrics => java.lang.Long.valueOf(metrics._3)).toList.asJava + val scanTime = metrics.map(metrics => java.lang.Long.valueOf(metrics._4)).toList.asJava + val scanBytes = metrics.map(metric => java.lang.Long.valueOf(metric._5)).toList.asJava + + (scanRows, scanFiles, metadataTime, scanTime, scanBytes) } catch { case throwable: Throwable => logWarning("Error occurred when collect query scan metrics.", throwable) - (List.empty[java.lang.Long].asJava, List.empty[java.lang.Long].asJava) + (List.empty[java.lang.Long].asJava, List.empty[java.lang.Long].asJava, List.empty[java.lang.Long].asJava, + List.empty[java.lang.Long].asJava, List.empty[java.lang.Long].asJava) } } } diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala index db05c66..4cc2802 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala @@ -86,7 +86,7 @@ object SparkSqlClient { val frame = tempDF.select(columns: _*) val rowList = frame.collect().map(_.toSeq.map(_.asInstanceOf[String]).asJava).toSeq.asJava val fieldList = df.schema.map(field => SparkTypeUtil.convertSparkFieldToJavaField(field)).asJava - val (scanRows, scanBytes) = QueryMetricUtils.collectScanMetrics(frame.queryExecution.executedPlan) + val (scanRows, scanFiles, metadataTime, scanTime, scanBytes) = QueryMetricUtils.collectScanMetrics(frame.queryExecution.executedPlan) Pair.newPair(rowList, fieldList) } catch { case e: Throwable => diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala index 5430723..ffcfdde 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala @@ -106,9 +106,12 @@ object ResultPlan extends Logging { interruptOnCancel = true) try { val rows = df.collect() - val (scanRows, scanBytes) = QueryMetricUtils.collectScanMetrics(df.queryExecution.executedPlan) + val (scanRows, scanFiles, metadataTime, scanTime, scanBytes) = QueryMetricUtils.collectScanMetrics(df.queryExecution.executedPlan) QueryContextFacade.current().addAndGetScannedRows(scanRows.asScala.map(Long2long(_)).sum) + QueryContextFacade.current().addAndGetScanFiles(scanFiles.asScala.map(Long2long(_)).sum) QueryContextFacade.current().addAndGetScannedBytes(scanBytes.asScala.map(Long2long(_)).sum) + QueryContextFacade.current().addAndGetMetadataTime(metadataTime.asScala.map(Long2long(_)).sum) + QueryContextFacade.current().addAndGetScanTime(scanTime.asScala.map(Long2long(_)).sum) val dt = rows.map { row => var rowIndex = 0 row.toSeq.map { cell => { diff --git a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java index 8fa57c6..aaf5add 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java +++ b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java @@ -66,6 +66,12 @@ public class SQLResponse implements Serializable { protected long totalScanBytes; + protected long totalScanFiles; + + protected long metadataTime; + + protected long totalSparkScanTime; + protected boolean hitExceptionCache = false; protected boolean storageCacheUsed = false; @@ -199,6 +205,30 @@ public class SQLResponse implements Serializable { this.totalScanBytes = totalScanBytes; } + public long getTotalScanFiles() { + return totalScanFiles; + } + + public void setTotalScanFiles(long totalScanFiles) { + this.totalScanFiles = totalScanFiles; + } + + public long getMetadataTime() { + return metadataTime; + } + + public void setMetadataTime(long metadataTime) { + this.metadataTime = metadataTime; + } + + public long getTotalSparkScanTime() { + return totalSparkScanTime; + } + + public void setTotalSparkScanTime(long totalSparkScanTime) { + this.totalSparkScanTime = totalSparkScanTime; + } + public boolean isHitExceptionCache() { return hitExceptionCache; } diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index fd686b5..b6eb54f 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -348,6 +348,9 @@ public class QueryService extends BasicService { stringBuilder.append("Realization Names: ").append(realizationNames).append(newLine); stringBuilder.append("Cuboid Ids: ").append(cuboidIds).append(newLine); stringBuilder.append("Total scan count: ").append(response.getTotalScanCount()).append(newLine); + stringBuilder.append("Total scan files: ").append(response.getTotalScanFiles()).append(newLine); + stringBuilder.append("Total metadata time: ").append(response.getMetadataTime()).append("ms").append(newLine); + stringBuilder.append("Total spark scan time: ").append(response.getTotalSparkScanTime()).append("ms").append(newLine); stringBuilder.append("Total scan bytes: ").append(response.getTotalScanBytes()).append(newLine); stringBuilder.append("Result row count: ").append(resultRowCount).append(newLine); stringBuilder.append("Accept Partial: ").append(request.isAcceptPartial()).append(newLine); @@ -1163,6 +1166,9 @@ public class QueryService extends BasicService { SQLResponse response = new SQLResponse(columnMetas, results, cubeSb.toString(), 0, isException, exceptionMessage, isPartialResult, isPushDown); response.setTotalScanCount(queryContext.getScannedRows()); + response.setTotalScanFiles(queryContext.getScanFiles()); + response.setMetadataTime(queryContext.getMedataTime()); + response.setTotalSparkScanTime(queryContext.getScanTime()); response.setTotalScanBytes(queryContext.getScannedBytes()); response.setCubeSegmentStatisticsList(queryContext.getCubeSegmentStatisticsResultList()); response.setSparkPool(queryContext.getSparkPool());