This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new e3cadc3 [SPARK-34567][SQL] CreateTableAsSelect should update metrics too e3cadc3 is described below commit e3cadc32102fb502e3b8ac5f6bacb39ec80df26a Author: Angerszhuuuu <angers....@gmail.com> AuthorDate: Thu Mar 4 20:42:47 2021 +0800 [SPARK-34567][SQL] CreateTableAsSelect should update metrics too ### What changes were proposed in this pull request? For command `CreateTableAsSelect` we use `InsertIntoHiveTable`, `InsertIntoHadoopFsRelationCommand` to insert data. We will update metrics of `InsertIntoHiveTable`, `InsertIntoHadoopFsRelationCommand` in `FileFormatWriter.write()`, but we only show CreateTableAsSelectCommand in WebUI SQL Tab. We need to update `CreateTableAsSelectCommand`'s metrics too. Before this PR: ![image](https://user-images.githubusercontent.com/46485123/109411226-81f44480-79db-11eb-99cb-b9686b15bf61.png) After this PR: ![image](https://user-images.githubusercontent.com/46485123/109411232-8ae51600-79db-11eb-9111-3bea0bc2d475.png) ![image](https://user-images.githubusercontent.com/46485123/109905192-62aa2f80-7cd9-11eb-91f9-04b16c9238ae.png) ### Why are the changes needed? Complete SQL Metrics ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? <!-- MT Closes #31679 from AngersZhuuuu/SPARK-34567. Authored-by: Angerszhuuuu <angers....@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 401e270c179021dd7bfd136b143ccc6d01c04755) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/execution/command/DataWritingCommand.scala | 27 ++++++++++++++++++++-- .../execution/command/createDataSourceTables.scala | 2 +- .../sql/execution/datasources/DataSource.scala | 5 +++- .../sql/execution/metric/SQLMetricsSuite.scala | 17 ++++++++++++++ .../execution/CreateHiveTableAsSelectCommand.scala | 2 ++ .../spark/sql/hive/execution/SQLMetricsSuite.scala | 27 ++++++++++++++++++++++ 6 files changed, 76 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala index a56007f..c9de8c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala @@ -19,12 +19,13 @@ package org.apache.spark.sql.execution.command import org.apache.hadoop.conf.Configuration +import org.apache.spark.SparkContext import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan} -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker -import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.util.SerializableConfiguration /** @@ -73,4 +74,26 @@ object DataWritingCommand { attr.withName(outputName) } } + + /** + * When execute CTAS operators, Spark will use [[InsertIntoHadoopFsRelationCommand]] + * or [[InsertIntoHiveTable]] command to write data, they both inherit metrics from + * [[DataWritingCommand]], but after running [[InsertIntoHadoopFsRelationCommand]] + * or [[InsertIntoHiveTable]], we only update metrics in these two command through + * [[BasicWriteJobStatsTracker]], we also need to propogate metrics to the command + * that actually calls [[InsertIntoHadoopFsRelationCommand]] or [[InsertIntoHiveTable]]. + * + * @param sparkContext Current SparkContext. + * @param command Command to execute writing data. + * @param metrics Metrics of real DataWritingCommand. + */ + def propogateMetrics( + sparkContext: SparkContext, + command: DataWritingCommand, + metrics: Map[String, SQLMetric]): Unit = { + command.metrics.foreach { case (key, metric) => metrics(key).set(metric.value) } + SQLMetrics.postDriverMetricUpdates(sparkContext, + sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY), + metrics.values.toSeq) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index be7fa7b..dc26e00 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -217,7 +217,7 @@ case class CreateDataSourceTableAsSelectCommand( catalogTable = if (tableExists) Some(table) else None) try { - dataSource.writeAndRead(mode, query, outputColumnNames, physicalPlan) + dataSource.writeAndRead(mode, query, outputColumnNames, physicalPlan, metrics) } catch { case ex: AnalysisException => logError(s"Failed to write to table ${table.identifier.unquotedString}", ex) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index e84f594..476174a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -43,6 +43,7 @@ import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2 +import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.sources.{RateStreamProvider, TextSocketSourceProvider} import org.apache.spark.sql.internal.SQLConf @@ -518,7 +519,8 @@ case class DataSource( mode: SaveMode, data: LogicalPlan, outputColumnNames: Seq[String], - physicalPlan: SparkPlan): BaseRelation = { + physicalPlan: SparkPlan, + metrics: Map[String, SQLMetric]): BaseRelation = { val outputColumns = DataWritingCommand.logicalPlanOutputWithNames(data, outputColumnNames) if (outputColumns.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) { throw new AnalysisException("Cannot save interval data type into external storage.") @@ -546,6 +548,7 @@ case class DataSource( partitionColumns = resolvedPartCols, outputColumnNames = outputColumnNames) resolved.run(sparkSession, physicalPlan) + DataWritingCommand.propogateMetrics(sparkSession.sparkContext, resolved, metrics) // Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring copy(userSpecifiedSchema = Some(outputColumns.toStructType.asNullable)).resolveRelation() case _ => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 21d17f4..d5f9875 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.execution.{FilterExec, RangeExec, SparkPlan, WholeStageCodegenExec} import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite import org.apache.spark.sql.execution.aggregate.HashAggregateExec +import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec import org.apache.spark.sql.functions._ @@ -755,4 +756,20 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils } } } + + test("SPARK-34567: Add metrics for CTAS operator") { + withTable("t") { + val df = sql("CREATE TABLE t USING PARQUET AS SELECT 1 as a") + val dataWritingCommandExec = + df.queryExecution.executedPlan.asInstanceOf[DataWritingCommandExec] + dataWritingCommandExec.executeCollect() + val createTableAsSelect = dataWritingCommandExec.cmd + assert(createTableAsSelect.metrics.contains("numFiles")) + assert(createTableAsSelect.metrics("numFiles").value == 1) + assert(createTableAsSelect.metrics.contains("numOutputBytes")) + assert(createTableAsSelect.metrics("numOutputBytes").value > 0) + assert(createTableAsSelect.metrics.contains("numOutputRows")) + assert(createTableAsSelect.metrics("numOutputRows").value == 1) + } + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index ccaa450..283c254 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -55,6 +55,7 @@ trait CreateHiveTableAsSelectBase extends DataWritingCommand { val command = getWritingCommand(catalog, tableDesc, tableExists = true) command.run(sparkSession, child) + DataWritingCommand.propogateMetrics(sparkSession.sparkContext, command, metrics) } else { // TODO ideally, we should get the output data ready first and then // add the relation into catalog, just in case of failure occurs while data @@ -69,6 +70,7 @@ trait CreateHiveTableAsSelectBase extends DataWritingCommand { val createdTableMeta = catalog.getTableMetadata(tableDesc.identifier) val command = getWritingCommand(catalog, createdTableMeta, tableExists = false) command.run(sparkSession, child) + DataWritingCommand.propogateMetrics(sparkSession.sparkContext, command, metrics) } catch { case NonFatal(e) => // drop the created table. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala index 4d6dafd..a2de43d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala @@ -18,7 +18,9 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite +import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.metric.SQLMetricsTestUtils +import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.test.TestHiveSingleton // Disable AQE because metric info is different with AQE on/off @@ -34,4 +36,29 @@ class SQLMetricsSuite extends SQLMetricsTestUtils with TestHiveSingleton testMetricsDynamicPartition("hive", "hive", "t1") } } + + test("SPARK-34567: Add metrics for CTAS operator") { + Seq(false, true).foreach { canOptimized => + withSQLConf(HiveUtils.CONVERT_METASTORE_CTAS.key -> canOptimized.toString) { + withTable("t") { + val df = sql(s"CREATE TABLE t STORED AS PARQUET AS SELECT 1 as a") + val dataWritingCommandExec = + df.queryExecution.executedPlan.asInstanceOf[DataWritingCommandExec] + dataWritingCommandExec.executeCollect() + val createTableAsSelect = dataWritingCommandExec.cmd + if (canOptimized) { + assert(createTableAsSelect.isInstanceOf[OptimizedCreateHiveTableAsSelectCommand]) + } else { + assert(createTableAsSelect.isInstanceOf[CreateHiveTableAsSelectCommand]) + } + assert(createTableAsSelect.metrics.contains("numFiles")) + assert(createTableAsSelect.metrics("numFiles").value == 1) + assert(createTableAsSelect.metrics.contains("numOutputBytes")) + assert(createTableAsSelect.metrics("numOutputBytes").value > 0) + assert(createTableAsSelect.metrics.contains("numOutputRows")) + assert(createTableAsSelect.metrics("numOutputRows").value == 1) + } + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org