This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b689a711ef9 [HUDI-9354] Support displaying complete dag for merge into
statement in spark web ui (#13239)
b689a711ef9 is described below
commit b689a711ef9ac9dff9cf0c80b3945163a784f63d
Author: wangyinsheng <[email protected]>
AuthorDate: Thu May 1 08:44:13 2025 +0800
[HUDI-9354] Support displaying complete dag for merge into statement in
spark web ui (#13239)
Co-authored-by: wangyinsheng <[email protected]>
---
.../spark/sql/hudi/analysis/HoodieAnalysis.scala | 7 ++--
.../hudi/command/MergeIntoHoodieTableCommand.scala | 38 +++++++++++-----------
2 files changed, 24 insertions(+), 21 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index be7e7ceabfb..87296d177f8 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -427,8 +427,11 @@ case class ResolveImplementations(sparkSession:
SparkSession) extends Rule[Logic
AnalysisHelper.allowInvokingTransformsInAnalyzer {
plan match {
// Convert to MergeIntoHoodieTableCommand
- case mit@MatchMergeIntoTable(target@ResolvesToHudiTable(_), _, _) if
mit.resolved =>
-
MergeIntoHoodieTableCommand(ReplaceExpressions(mit).asInstanceOf[MergeIntoTable])
+ case mit@MatchMergeIntoTable(target@ResolvesToHudiTable(table), _, _)
if mit.resolved =>
+ val catalogTable = HoodieCatalogTable(sparkSession, table)
+ val command =
MergeIntoHoodieTableCommand(ReplaceExpressions(mit).asInstanceOf[MergeIntoTable],
catalogTable, sparkSession, null)
+ val inputPlan = command.getProcessedInputPlan
+ command.copy(query = inputPlan)
// Convert to UpdateHoodieTableCommand
case ut@UpdateTable(plan@ResolvesToHudiTable(_), _, _) if ut.resolved
=>
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 31f2d9cdbf6..06b452632cf 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -42,12 +42,13 @@ import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeReference, BoundReference, EqualTo, Expression, Literal,
NamedExpression, PredicateHelper}
import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReference
-import org.apache.spark.sql.catalyst.plans.LeftOuter
+import org.apache.spark.sql.catalyst.plans.{LeftOuter, QueryPlan}
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.command.DataWritingCommand
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
import org.apache.spark.sql.hudi.ProvidesHoodieConfig.{combineOptions,
getPartitionPathFieldWriteConfig}
-import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.failAnalysis
import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand._
import
org.apache.spark.sql.hudi.command.PartialAssignmentMode.PartialAssignmentMode
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
@@ -105,12 +106,21 @@ class MergeIntoFieldTypeMismatchException(message: String)
*
* TODO explain workflow for MOR tables
*/
-case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends
HoodieLeafRunnableCommand
+case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable,
+ hoodieCatalogTable: HoodieCatalogTable,
+ sparkSession: SparkSession,
+ query: LogicalPlan) extends
DataWritingCommand
with SparkAdapterSupport
with ProvidesHoodieConfig
with PredicateHelper {
- private var sparkSession: SparkSession = _
+ override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
+
+ override def outputColumnNames: Seq[String] = {
+ query.output.map(_.name)
+ }
+
+ override protected def withNewChildInternal(newChild: LogicalPlan):
LogicalPlan = copy(query = newChild)
/**
* The target table schema without hoodie meta fields.
@@ -118,12 +128,6 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
private lazy val targetTableSchema =
removeMetaFields(mergeInto.targetTable.schema).fields
- private lazy val hoodieCatalogTable =
sparkAdapter.resolveHoodieTable(mergeInto.targetTable) match {
- case Some(catalogTable) => HoodieCatalogTable(sparkSession, catalogTable)
- case _ =>
- failAnalysis(s"Failed to resolve MERGE INTO statement into the Hudi
table. Got instead: ${mergeInto.targetTable}")
- }
-
private lazy val targetTableType = hoodieCatalogTable.tableTypeName
/**
@@ -266,14 +270,13 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
updatingActions.flatMap(_.assignments)).head
}
- override def run(sparkSession: SparkSession): Seq[Row] = {
- this.sparkSession = sparkSession
+ override def run(sparkSession: SparkSession, inputPlan: SparkPlan): Seq[Row]
= {
// TODO move to analysis phase
// Create the write parameters
val props = buildMergeIntoConfig(hoodieCatalogTable)
validate(props)
- val processedInputDf: DataFrame = getProcessedInputDf
+ val processedInputDf: DataFrame =
sparkSession.internalCreateDataFrame(inputPlan.execute(), inputPlan.schema)
// Do the upsert
executeUpsert(processedInputDf, props)
// Refresh the table in the catalog
@@ -340,7 +343,7 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
* <li>{@code ts = source.sts}</li>
* </ul>
*/
- private def getProcessedInputDf: DataFrame = {
+ def getProcessedInputPlan: LogicalPlan = {
val resolver = sparkSession.sessionState.analyzer.resolver
// For pkless table, we need to project the meta columns by joining with
the target table;
@@ -394,10 +397,7 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
case _ => attr
}
}
-
- val amendedPlan = Project(adjustedSourceTableOutput ++ additionalColumns,
inputPlan)
-
- Dataset.ofRows(sparkSession, amendedPlan)
+ Project(adjustedSourceTableOutput ++ additionalColumns, inputPlan)
}
/**
@@ -725,7 +725,7 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
// NOTE: We're relying on [[sourceDataset]] here instead of
[[mergeInto.sourceTable]],
// as it could be amended to add missing primary-key and/or
precombine columns.
// Please check [[sourceDataset]] scala-doc for more details
- (getProcessedInputDf.queryExecution.analyzed.output ++
mergeInto.targetTable.output).filterNot(a => isMetaField(a.name))
+ (query.output ++ mergeInto.targetTable.output).filterNot(a =>
isMetaField(a.name))
}
private def validateInsertingAssignmentExpression(expr: Expression): Unit = {