This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 1c28ec7d9b5 [SPARK-43249][CONNECT] Fix missing stats for SQL Command 1c28ec7d9b5 is described below commit 1c28ec7d9b50933107b2d2f56dd57aeeb9ec4e53 Author: Martin Grund <martin.gr...@databricks.com> AuthorDate: Mon Apr 24 16:33:32 2023 +0800 [SPARK-43249][CONNECT] Fix missing stats for SQL Command ### What changes were proposed in this pull request? This patch fixes a minor issue in the code where for SQL Commands the plan metrics are not sent to the client. In addition, it renames a method to make clear that the method does not actually send anything but only creates the response object. ### Why are the changes needed? Clarity ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. Closes #40899 from grundprinzip/fix_sql_stats. Authored-by: Martin Grund <martin.gr...@databricks.com> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> (cherry picked from commit 9d050539bed10e5089c3c125887a9995693733c6) Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- .../org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala | 2 +- .../apache/spark/sql/connect/service/SparkConnectStreamHandler.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 7650532fcf9..0f3189e6013 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -1667,7 +1667,7 @@ class SparkConnectPlanner(val session: SparkSession) { .build()) // Send Metrics - SparkConnectStreamHandler.sendMetricsToResponse(sessionId, df) + responseObserver.onNext(SparkConnectStreamHandler.createMetricsResponse(sessionId, df)) } private def handleRegisterUserDefinedFunction( diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala index 335b871d499..760ff8a64b4 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala @@ -65,7 +65,7 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[ExecutePlanResp SparkConnectStreamHandler.sendSchemaToResponse(request.getSessionId, dataframe.schema)) processAsArrowBatches(request.getSessionId, dataframe, responseObserver) responseObserver.onNext( - SparkConnectStreamHandler.sendMetricsToResponse(request.getSessionId, dataframe)) + SparkConnectStreamHandler.createMetricsResponse(request.getSessionId, dataframe)) if (dataframe.queryExecution.observedMetrics.nonEmpty) { responseObserver.onNext( SparkConnectStreamHandler.sendObservedMetricsToResponse(request.getSessionId, dataframe)) @@ -215,7 +215,7 @@ object SparkConnectStreamHandler { .build() } - def sendMetricsToResponse(sessionId: String, rows: DataFrame): ExecutePlanResponse = { + def createMetricsResponse(sessionId: String, rows: DataFrame): ExecutePlanResponse = { // Send a last batch with the metrics ExecutePlanResponse .newBuilder() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org