Copilot commented on code in PR #2150:
URL: https://github.com/apache/auron/pull/2150#discussion_r3025617354
##########
auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronAllExecutionsPage.scala:
##########
@@ -77,4 +95,376 @@ private[ui] class AuronAllExecutionsPage(parent:
AuronSQLTab) extends WebUIPage(
}
}
+ @sparkver("3.0 / 3.1 / 3.2 / 3.3 / 3.4 / 3.5")
+ private def buildExecutionsListSummary(
+ executionsList: Seq[AuronSQLExecutionUIData],
+ request: javax.servlet.http.HttpServletRequest): NodeSeq = {
+ val content = {
+ val _content = mutable.ListBuffer[Node]()
+ val executionPage =
+ Option(request.getParameter("auron.page")).map(_.toInt).getOrElse(1)
+
+ val auronPageTable =
+ try {
+ new AuronExecutionPagedTable(
+ request,
+ parent,
+ executionsList,
+ "auron",
+ "auron",
+ UIUtils.prependBaseUri(request, parent.basePath),
+ "auron").table(executionPage)
+ } catch {
+ case e @ (_: IllegalArgumentException | _:
IndexOutOfBoundsException) =>
+ <div class="alert alert-error">
+ <p>Error while rendering execution table:</p>
+ <pre>
+ {Utils.exceptionString(e)}
+ </pre>
+ </div>
+ }
+
+ _content ++=
+ <span id="auron" class="collapse-aggregated-runningExecutions
collapse-table"
+ onClick="collapseTable('collapse-aggregated-runningExecutions',
+ 'aggregated-runningExecutions')">
+ <h4>
+ <span class="collapse-table-arrow arrow-open"></span>
+ <a href="#auron">
+ Queries:
+ </a>{executionsList.size}
+ </h4>
+ </span> ++
+ <div class="aggregated-runningExecutions collapsible-table">
+ {auronPageTable}
+ </div>
+
+ _content
+ }
+ content ++=
+ <script>
+ function clickDetail(details) {{
+
details.parentNode.querySelector('.stage-details').classList.toggle('collapsed')
+ }}
+ </script>
+ }
+
+ @sparkver("4.0 / 4.1")
+ private def buildExecutionsListSummary(
+ executionsList: Seq[AuronSQLExecutionUIData],
+ request: jakarta.servlet.http.HttpServletRequest): NodeSeq = {
+ val content = {
+ val _content = mutable.ListBuffer[Node]()
+ val executionPage =
+ Option(request.getParameter("auron.page")).map(_.toInt).getOrElse(1)
+
+ val auronPageTable =
+ try {
+ new AuronExecutionPagedTable(
+ request,
+ parent,
+ executionsList,
+ "auron",
+ "auron",
+ UIUtils.prependBaseUri(request, parent.basePath),
+ "auron").table(executionPage)
+ } catch {
+ case e @ (_: IllegalArgumentException | _:
IndexOutOfBoundsException) =>
+ <div class="alert alert-error">
+ <p>Error while rendering execution table:</p>
+ <pre>
+ {Utils.exceptionString(e)}
+ </pre>
+ </div>
+ }
+
+ _content ++=
+ <span id="auron" class="collapse-aggregated-runningExecutions
collapse-table"
+ onClick="collapseTable('collapse-aggregated-runningExecutions',
+ 'aggregated-runningExecutions')">
+ <h4>
+ <span class="collapse-table-arrow arrow-open"></span>
+ <a href="#auron">
+ Queries:
+ </a>{executionsList.size}
+ </h4>
+ </span> ++
+ <div class="aggregated-runningExecutions collapsible-table">
+ {auronPageTable}
+ </div>
+
+ _content
+ }
+ content ++=
+ <script>
+ function clickDetail(details) {{
+
details.parentNode.querySelector('.stage-details').classList.toggle('collapsed')
+ }}
+ </script>
+ }
+
+}
+
+private[ui] class AuronExecutionPagedTable(
+ request: HttpServletRequest,
+ parent: AuronSQLTab,
+ data: Seq[AuronSQLExecutionUIData],
+ tableHeaderId: String,
+ executionTag: String,
+ basePath: String,
+ subPath: String)
+ extends PagedTable[AuronExecutionTableRowData] {
+
+ private val (sortColumn, desc, pageSize) = getAuronTableParameters(request,
executionTag, "ID")
+
+ private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
+
+ override val dataSource = new AuronExecutionDataSource(data, pageSize,
sortColumn, desc)
+
+ private val parameterPath =
+ s"$basePath/$subPath/?${getAuronParameterOtherTable(request,
executionTag)}"
+
+ override def tableId: String = s"$executionTag-table"
+
+ override def tableCssClass: String =
+ "table table-bordered table-sm table-striped table-head-clickable
table-cell-width-limited"
+
+ override def pageLink(page: Int): String = {
+ parameterPath +
+ s"&$pageNumberFormField=$page" +
+ s"&$executionTag.sort=$encodedSortColumn" +
+ s"&$executionTag.desc=$desc" +
+ s"&$pageSizeFormField=$pageSize" +
+ s"#$tableHeaderId"
+ }
+
+ /**
+ * Returns parameters of other tables in the page.
+ */
+ def getAuronParameterOtherTable(request: HttpServletRequest, tableTag:
String): String = {
+ request.getParameterMap.asScala
+ .filterNot(_._1.startsWith(tableTag))
+ .map(parameter => parameter._1 + "=" + parameter._2(0))
+ .mkString("&")
+ }
+
+ /**
+ * Returns parameter of this table.
+ */
+ def getAuronTableParameters(
+ request: HttpServletRequest,
+ tableTag: String,
+ defaultSortColumn: String): (String, Boolean, Int) = {
+ val parameterSortColumn = request.getParameter(s"$tableTag.sort")
+ val parameterSortDesc = request.getParameter(s"$tableTag.desc")
+ val parameterPageSize = request.getParameter(s"$tableTag.pageSize")
+ val sortColumn = Option(parameterSortColumn)
+ .map { sortColumn =>
+ UIUtils.decodeURLParameter(sortColumn)
+ }
+ .getOrElse(defaultSortColumn)
+ val desc =
+ Option(parameterSortDesc).map(_.toBoolean).getOrElse(sortColumn ==
defaultSortColumn)
+ val pageSize = Option(parameterPageSize).map(_.toInt).getOrElse(100)
+
+ (sortColumn, desc, pageSize)
+ }
+
+ override def pageSizeFormField: String = s"$executionTag.pageSize"
+
+ override def pageNumberFormField: String = s"$executionTag.page"
+
+ override def goButtonFormPath: String =
+
s"$parameterPath&$executionTag.sort=$encodedSortColumn&$executionTag.desc=$desc#$tableHeaderId"
+
+ // Information for each header: title, sortable, tooltip
+ private val headerInfo: Seq[(String, Boolean, Option[String])] = {
+ Seq(
+ ("ID", true, None),
+ ("Description", true, None),
+ ("Num Auron Nodes", true, None),
+ ("Num Fallback Nodes", true, None))
+ }
+
+ override def headers: Seq[Node] = {
+ isAuronSortColumnValid(headerInfo, sortColumn)
+
+ headerAuronRow(
+ headerInfo,
+ desc,
+ pageSize,
+ sortColumn,
+ parameterPath,
+ executionTag,
+ tableHeaderId)
+ }
+
+ def headerAuronRow(
+ headerInfo: Seq[(String, Boolean, Option[String])],
+ desc: Boolean,
+ pageSize: Int,
+ sortColumn: String,
+ parameterPath: String,
+ tableTag: String,
+ headerId: String): Seq[Node] = {
+ val row: Seq[Node] = {
+ headerInfo.map { case (header, sortable, tooltip) =>
+ if (header == sortColumn) {
+ val headerLink = Unparsed(
+ parameterPath +
+ s"&$tableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" +
+ s"&$tableTag.desc=${!desc}" +
+ s"&$tableTag.pageSize=$pageSize" +
+ s"#$headerId")
+ val arrow = if (desc) "▾" else "▴" // UP or DOWN
+
+ <th>
+ <a href={headerLink}>
+ <span data-toggle="tooltip" data-placement="top"
title={tooltip.getOrElse("")}>
+ {header} {Unparsed(arrow)}
+ </span>
+ </a>
+ </th>
+ } else {
+ if (sortable) {
+ val headerLink = Unparsed(
+ parameterPath +
+ s"&$tableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" +
+ s"&$tableTag.pageSize=$pageSize" +
+ s"#$headerId")
+
+ <th>
+ <a href={headerLink}>
+ <span data-toggle="tooltip" data-placement="top"
title={tooltip.getOrElse("")}>
+ {header}
+ </span>
+ </a>
+ </th>
+ } else {
+ <th>
+ <span data-toggle="tooltip" data-placement="top"
title={tooltip.getOrElse("")}>
+ {header}
+ </span>
+ </th>
+ }
+ }
+ }
+ }
+ <thead>
+ <tr>
+ {row}
+ </tr>
+ </thead>
+ }
+
+ def isAuronSortColumnValid(
+ headerInfo: Seq[(String, Boolean, Option[String])],
+ sortColumn: String): Unit = {
+ if (!headerInfo.filter(_._2).map(_._1).contains(sortColumn)) {
+ throw new IllegalArgumentException(s"Unknown column: $sortColumn")
+ }
+ }
+
+ override def row(executionTableRow: AuronExecutionTableRowData): Seq[Node] =
{
+ val executionUIData = executionTableRow.executionUIData
+
+ <tr>
+ <td>
+ {executionUIData.executionId.toString}
+ </td>
+ <td>
+ {descriptionCell(executionUIData)}
+ </td>
+ <td sorttable_customkey={executionUIData.numAuronNodes.toString}>
+ {executionUIData.numAuronNodes.toString}
+ </td>
+ <td sorttable_customkey={executionUIData.numFallbackNodes.toString}>
+ {executionUIData.numFallbackNodes.toString}
+ </td>
+ </tr>
+ }
+
+ private def descriptionCell(execution: AuronSQLExecutionUIData): Seq[Node] =
{
+ val details = if (execution.description != null &&
execution.description.nonEmpty) {
+ val concat = new PlanStringConcat()
+ concat.append("== Fallback Summary ==\n")
+ val fallbackSummary = execution.fallbackNodeToReason
+ .map { case (name, reason) =>
+ val id = name.substring(0, 3)
+ val nodeName = name.substring(4)
+ s"(${id.toInt}) $nodeName: $reason"
+ }
+ .mkString("\n")
+ concat.append(fallbackSummary)
+ if (execution.fallbackNodeToReason.isEmpty) {
+ concat.append("No fallback nodes")
+ }
+ concat.append("\n\n")
+ concat.append(execution.fallbackDescription)
+
+ <span
onclick="this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')"
+ class="expand-details">
+ +details
+ </span> ++
+ <div class="stage-details collapsed">
+ <pre>{concat.toString()}</pre>
+ </div>
+ } else {
+ Nil
+ }
+
+ val desc = if (execution.description != null &&
execution.description.nonEmpty) {
+ <a href={executionURL(execution.executionId)} class="description-input">
+ {execution.description}</a>
+ } else {
+ <a href={executionURL(execution.executionId)}>{execution.executionId}</a>
+ }
+
+ <div>{desc}{details}</div>
+ }
+
+ private def executionURL(executionID: Long): String =
+ s"${UIUtils.prependBaseUri(request,
parent.basePath)}/SQL/execution/?id=$executionID"
+}
+
+private[ui] class AuronExecutionTableRowData(val executionUIData:
AuronSQLExecutionUIData)
+
+private[ui] class AuronExecutionDataSource(
+ executionData: Seq[AuronSQLExecutionUIData],
+ pageSize: Int,
+ sortColumn: String,
+ desc: Boolean)
+ extends PagedDataSource[AuronExecutionTableRowData](pageSize) {
+
+ // Convert ExecutionData to ExecutionTableRowData which contains the final
contents to show
+ // in the table so that we can avoid creating duplicate contents during
sorting the data
+ private val data =
executionData.map(executionRow).sorted(ordering(sortColumn, desc))
+
+ override def dataSize: Int = data.size
+
+ override def sliceData(from: Int, to: Int): Seq[AuronExecutionTableRowData] =
+ data.slice(from, to)
+
+ private def executionRow(
+ executionUIData: AuronSQLExecutionUIData): AuronExecutionTableRowData = {
+ new AuronExecutionTableRowData(executionUIData)
+ }
+
+ /** Return Ordering according to sortColumn and desc. */
+ private def ordering(
+ sortColumn: String,
+ desc: Boolean): Ordering[AuronExecutionTableRowData] = {
+ val ordering: Ordering[AuronExecutionTableRowData] = sortColumn match {
+ case "ID" => Ordering.by(_.executionUIData.executionId)
+ case "Description" => Ordering.by(_.executionUIData.fallbackDescription)
+ case "Num Auron Nodes" => Ordering.by(_.executionUIData.numAuronNodes)
+ case "Num Fallback Nodes" =>
Ordering.by(_.executionUIData.numFallbackNodes)
+ case unknownColumn => throw new IllegalArgumentException(s"Unknown
column: $unknownColumn")
+ }
Review Comment:
Sorting by the “Description” column uses `fallbackDescription` (physical
plan text) rather than `executionUIData.description`. This makes sorting
behavior inconsistent with the displayed column label/content. Update the
ordering for `"Description"` to use the `description` field.
##########
spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/execution/BuildInfoInSparkUISuite.scala:
##########
@@ -49,4 +49,13 @@ class BuildInfoInSparkUISuite extends AuronQueryTest with
BaseAuronSQLSuite {
assert(listener.getAuronBuildInfo() == 1)
}
+ test("test convert table in spark UI ") {
+ withTable("t1") {
+ sql(
+ "create table t1 using parquet PARTITIONED BY (part) as select 1 as
c1, 2 as c2, 'test test' as part")
+ val df = sql("select * from t1")
+ checkAnswer(df, Seq(Row(1, 2, "test test")))
+ }
+ }
Review Comment:
This test exercises table creation/querying but does not assert the new
Spark UI functionality (that `AuronPlanFallbackEvent` is posted and persisted
as `AuronSQLExecutionUIData`, and that counts/reasons/plan text are stored).
Add assertions against the `AuronSQLAppStatusListener`/KVStore (or
`AuronSQLAppStatusStore`) to verify at least one execution record is written
with expected fields (executionId, num nodes, fallback map/plan description).
##########
spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala:
##########
@@ -99,4 +100,36 @@ case class AuronColumnarOverrides(sparkSession:
SparkSession) extends ColumnarRu
}
}
}
+
+ override def postColumnarTransitions: Rule[SparkPlan] = {
+ new Rule[SparkPlan] {
+ override def apply(sparkPlan: SparkPlan): SparkPlan = {
+ if (SparkEnv.get.conf
+ .get(SparkAuronConfiguration.UI_ENABLED.key, "true")
+ .equals("true")) {
+ val sc = sparkSession.sparkContext
+ val executionId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+ if (executionId == null) {
+ logDebug(s"Unknown execution id for plan: $sparkPlan")
+ return sparkPlan
+ }
+ val concat = new PlanStringConcat()
+ concat.append("== Physical Plan ==\n")
+
+ val (numAuronNodes, fallbackNodeToReason) =
+ AuronExplainUtils.processPlan(sparkPlan, concat.append)
+
+ val event = AuronPlanFallbackEvent(
+ executionId.toLong,
+ numAuronNodes,
+ fallbackNodeToReason.size,
+ concat.toString(),
+ fallbackNodeToReason)
Review Comment:
Two issues here can cause unexpected failures/behavior: (1) UI enablement is
checked via a case-sensitive string comparison; prefer a boolean conf read (or
a safer parse) so values like `TRUE` / `true` behave consistently. (2)
`executionId.toLong` can throw if the local property is non-numeric; guard the
parse and skip posting the event (with debug logging) if it cannot be parsed.
##########
auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronSQLAppStatusStore.scala:
##########
@@ -16,11 +16,14 @@
*/
package org.apache.spark.sql.execution.ui
+import scala.jdk.CollectionConverters.asScalaIteratorConverter
Review Comment:
`scala.jdk.CollectionConverters` is Scala 2.13+ only. This repo appears to
support Spark 3.x as well (typically Scala 2.12), so this import can break
compilation for Spark 3 builds. Prefer `scala.collection.JavaConverters._` /
`JavaConverters.asScalaIteratorConverter` in Scala 2.12, or introduce a small
compatibility shim (or Spark-version-specific source) to select the right
converters per build target.
```suggestion
import scala.collection.JavaConverters._
```
##########
auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronAllExecutionsPage.scala:
##########
@@ -29,12 +37,22 @@ private[ui] class AuronAllExecutionsPage(parent:
AuronSQLTab) extends WebUIPage(
@sparkver("3.0 / 3.1 / 3.2 / 3.3 / 3.4 / 3.5")
override def render(request: javax.servlet.http.HttpServletRequest):
Seq[Node] = {
- UIUtils.headerSparkPage(request, "Auron",
buildInfoSummary(sqlStore.buildInfo()), parent)
+ UIUtils.headerSparkPage(
+ request,
+ "Auron",
+ buildInfoSummary(sqlStore.buildInfo()) ++
+ buildExecutionsListSummary(sqlStore.executionsList(), request),
+ parent)
Review Comment:
The page currently calls `sqlStore.executionsList()` and passes the full
result into the table, and the datasource later sorts that full in-memory
sequence. For applications with many SQL executions this can make the Auron tab
slow and memory-heavy. Prefer a KVStore-backed paged datasource: use
`executionsCount()` for the total, and fetch only the requested page via
`executionsList(offset, length)` (and ideally push sorting down to the KVStore
view when possible), mirroring Spark’s own UI pattern.
##########
auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronAllExecutionsPage.scala:
##########
@@ -77,4 +95,376 @@ private[ui] class AuronAllExecutionsPage(parent:
AuronSQLTab) extends WebUIPage(
}
}
+ @sparkver("3.0 / 3.1 / 3.2 / 3.3 / 3.4 / 3.5")
+ private def buildExecutionsListSummary(
+ executionsList: Seq[AuronSQLExecutionUIData],
+ request: javax.servlet.http.HttpServletRequest): NodeSeq = {
+ val content = {
+ val _content = mutable.ListBuffer[Node]()
+ val executionPage =
+ Option(request.getParameter("auron.page")).map(_.toInt).getOrElse(1)
+
+ val auronPageTable =
+ try {
+ new AuronExecutionPagedTable(
+ request,
+ parent,
+ executionsList,
+ "auron",
+ "auron",
+ UIUtils.prependBaseUri(request, parent.basePath),
+ "auron").table(executionPage)
+ } catch {
+ case e @ (_: IllegalArgumentException | _:
IndexOutOfBoundsException) =>
+ <div class="alert alert-error">
+ <p>Error while rendering execution table:</p>
+ <pre>
+ {Utils.exceptionString(e)}
+ </pre>
+ </div>
+ }
+
+ _content ++=
+ <span id="auron" class="collapse-aggregated-runningExecutions
collapse-table"
+ onClick="collapseTable('collapse-aggregated-runningExecutions',
+ 'aggregated-runningExecutions')">
+ <h4>
+ <span class="collapse-table-arrow arrow-open"></span>
+ <a href="#auron">
+ Queries:
+ </a>{executionsList.size}
+ </h4>
+ </span> ++
+ <div class="aggregated-runningExecutions collapsible-table">
+ {auronPageTable}
+ </div>
+
+ _content
+ }
+ content ++=
+ <script>
+ function clickDetail(details) {{
+
details.parentNode.querySelector('.stage-details').classList.toggle('collapsed')
+ }}
+ </script>
+ }
+
+ @sparkver("4.0 / 4.1")
+ private def buildExecutionsListSummary(
+ executionsList: Seq[AuronSQLExecutionUIData],
+ request: jakarta.servlet.http.HttpServletRequest): NodeSeq = {
+ val content = {
+ val _content = mutable.ListBuffer[Node]()
+ val executionPage =
+ Option(request.getParameter("auron.page")).map(_.toInt).getOrElse(1)
+
+ val auronPageTable =
+ try {
+ new AuronExecutionPagedTable(
+ request,
+ parent,
+ executionsList,
+ "auron",
+ "auron",
+ UIUtils.prependBaseUri(request, parent.basePath),
+ "auron").table(executionPage)
+ } catch {
+ case e @ (_: IllegalArgumentException | _:
IndexOutOfBoundsException) =>
+ <div class="alert alert-error">
+ <p>Error while rendering execution table:</p>
+ <pre>
+ {Utils.exceptionString(e)}
+ </pre>
+ </div>
+ }
+
+ _content ++=
+ <span id="auron" class="collapse-aggregated-runningExecutions
collapse-table"
+ onClick="collapseTable('collapse-aggregated-runningExecutions',
+ 'aggregated-runningExecutions')">
+ <h4>
+ <span class="collapse-table-arrow arrow-open"></span>
+ <a href="#auron">
+ Queries:
+ </a>{executionsList.size}
+ </h4>
+ </span> ++
+ <div class="aggregated-runningExecutions collapsible-table">
+ {auronPageTable}
+ </div>
+
+ _content
+ }
+ content ++=
+ <script>
+ function clickDetail(details) {{
+
details.parentNode.querySelector('.stage-details').classList.toggle('collapsed')
+ }}
+ </script>
+ }
+
+}
+
+private[ui] class AuronExecutionPagedTable(
+ request: HttpServletRequest,
+ parent: AuronSQLTab,
+ data: Seq[AuronSQLExecutionUIData],
+ tableHeaderId: String,
+ executionTag: String,
+ basePath: String,
+ subPath: String)
+ extends PagedTable[AuronExecutionTableRowData] {
+
+ private val (sortColumn, desc, pageSize) = getAuronTableParameters(request,
executionTag, "ID")
+
+ private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
+
+ override val dataSource = new AuronExecutionDataSource(data, pageSize,
sortColumn, desc)
+
+ private val parameterPath =
+ s"$basePath/$subPath/?${getAuronParameterOtherTable(request,
executionTag)}"
+
+ override def tableId: String = s"$executionTag-table"
+
+ override def tableCssClass: String =
+ "table table-bordered table-sm table-striped table-head-clickable
table-cell-width-limited"
+
+ override def pageLink(page: Int): String = {
+ parameterPath +
+ s"&$pageNumberFormField=$page" +
+ s"&$executionTag.sort=$encodedSortColumn" +
+ s"&$executionTag.desc=$desc" +
+ s"&$pageSizeFormField=$pageSize" +
+ s"#$tableHeaderId"
+ }
+
+ /**
+ * Returns parameters of other tables in the page.
+ */
+ def getAuronParameterOtherTable(request: HttpServletRequest, tableTag:
String): String = {
+ request.getParameterMap.asScala
+ .filterNot(_._1.startsWith(tableTag))
+ .map(parameter => parameter._1 + "=" + parameter._2(0))
Review Comment:
Query parameter values are concatenated into URLs without URL-encoding. This
can break navigation when values contain `&`, `=`, spaces, or other reserved
characters. Encode both keys and values (or use a Spark UI helper if available)
when rebuilding the query string.
```suggestion
.map { case (key, values) =>
val encodedKey = URLEncoder.encode(key, UTF_8.name())
val value = if (values != null && values.nonEmpty) values(0) else ""
val encodedValue = URLEncoder.encode(value, UTF_8.name())
s"$encodedKey=$encodedValue"
}
```
##########
auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronAllExecutionsPage.scala:
##########
@@ -77,4 +95,376 @@ private[ui] class AuronAllExecutionsPage(parent:
AuronSQLTab) extends WebUIPage(
}
}
+ @sparkver("3.0 / 3.1 / 3.2 / 3.3 / 3.4 / 3.5")
+ private def buildExecutionsListSummary(
+ executionsList: Seq[AuronSQLExecutionUIData],
+ request: javax.servlet.http.HttpServletRequest): NodeSeq = {
+ val content = {
+ val _content = mutable.ListBuffer[Node]()
+ val executionPage =
+ Option(request.getParameter("auron.page")).map(_.toInt).getOrElse(1)
+
+ val auronPageTable =
+ try {
+ new AuronExecutionPagedTable(
+ request,
+ parent,
+ executionsList,
+ "auron",
+ "auron",
+ UIUtils.prependBaseUri(request, parent.basePath),
+ "auron").table(executionPage)
+ } catch {
+ case e @ (_: IllegalArgumentException | _:
IndexOutOfBoundsException) =>
+ <div class="alert alert-error">
+ <p>Error while rendering execution table:</p>
+ <pre>
+ {Utils.exceptionString(e)}
+ </pre>
+ </div>
+ }
+
+ _content ++=
+ <span id="auron" class="collapse-aggregated-runningExecutions
collapse-table"
+ onClick="collapseTable('collapse-aggregated-runningExecutions',
+ 'aggregated-runningExecutions')">
+ <h4>
+ <span class="collapse-table-arrow arrow-open"></span>
+ <a href="#auron">
+ Queries:
+ </a>{executionsList.size}
+ </h4>
+ </span> ++
+ <div class="aggregated-runningExecutions collapsible-table">
+ {auronPageTable}
+ </div>
+
+ _content
+ }
+ content ++=
+ <script>
+ function clickDetail(details) {{
+
details.parentNode.querySelector('.stage-details').classList.toggle('collapsed')
+ }}
+ </script>
+ }
+
+ @sparkver("4.0 / 4.1")
+ private def buildExecutionsListSummary(
+ executionsList: Seq[AuronSQLExecutionUIData],
+ request: jakarta.servlet.http.HttpServletRequest): NodeSeq = {
+ val content = {
+ val _content = mutable.ListBuffer[Node]()
+ val executionPage =
+ Option(request.getParameter("auron.page")).map(_.toInt).getOrElse(1)
+
+ val auronPageTable =
+ try {
+ new AuronExecutionPagedTable(
+ request,
+ parent,
+ executionsList,
+ "auron",
+ "auron",
+ UIUtils.prependBaseUri(request, parent.basePath),
+ "auron").table(executionPage)
+ } catch {
+ case e @ (_: IllegalArgumentException | _:
IndexOutOfBoundsException) =>
+ <div class="alert alert-error">
+ <p>Error while rendering execution table:</p>
+ <pre>
+ {Utils.exceptionString(e)}
+ </pre>
+ </div>
+ }
+
+ _content ++=
+ <span id="auron" class="collapse-aggregated-runningExecutions
collapse-table"
+ onClick="collapseTable('collapse-aggregated-runningExecutions',
+ 'aggregated-runningExecutions')">
+ <h4>
+ <span class="collapse-table-arrow arrow-open"></span>
+ <a href="#auron">
+ Queries:
+ </a>{executionsList.size}
+ </h4>
+ </span> ++
+ <div class="aggregated-runningExecutions collapsible-table">
+ {auronPageTable}
+ </div>
+
+ _content
+ }
+ content ++=
+ <script>
+ function clickDetail(details) {{
+
details.parentNode.querySelector('.stage-details').classList.toggle('collapsed')
+ }}
+ </script>
+ }
+
+}
+
+private[ui] class AuronExecutionPagedTable(
+ request: HttpServletRequest,
+ parent: AuronSQLTab,
+ data: Seq[AuronSQLExecutionUIData],
+ tableHeaderId: String,
+ executionTag: String,
+ basePath: String,
+ subPath: String)
+ extends PagedTable[AuronExecutionTableRowData] {
+
+ private val (sortColumn, desc, pageSize) = getAuronTableParameters(request,
executionTag, "ID")
+
+ private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
+
+ override val dataSource = new AuronExecutionDataSource(data, pageSize,
sortColumn, desc)
+
+ private val parameterPath =
+ s"$basePath/$subPath/?${getAuronParameterOtherTable(request,
executionTag)}"
+
+ override def tableId: String = s"$executionTag-table"
+
+ override def tableCssClass: String =
+ "table table-bordered table-sm table-striped table-head-clickable
table-cell-width-limited"
+
+ override def pageLink(page: Int): String = {
+ parameterPath +
+ s"&$pageNumberFormField=$page" +
+ s"&$executionTag.sort=$encodedSortColumn" +
+ s"&$executionTag.desc=$desc" +
+ s"&$pageSizeFormField=$pageSize" +
+ s"#$tableHeaderId"
+ }
+
+ /**
+ * Returns parameters of other tables in the page.
+ */
+ def getAuronParameterOtherTable(request: HttpServletRequest, tableTag:
String): String = {
+ request.getParameterMap.asScala
+ .filterNot(_._1.startsWith(tableTag))
+ .map(parameter => parameter._1 + "=" + parameter._2(0))
+ .mkString("&")
+ }
+
+ /**
+ * Returns parameter of this table.
+ */
+ def getAuronTableParameters(
+ request: HttpServletRequest,
+ tableTag: String,
+ defaultSortColumn: String): (String, Boolean, Int) = {
+ val parameterSortColumn = request.getParameter(s"$tableTag.sort")
+ val parameterSortDesc = request.getParameter(s"$tableTag.desc")
+ val parameterPageSize = request.getParameter(s"$tableTag.pageSize")
+ val sortColumn = Option(parameterSortColumn)
+ .map { sortColumn =>
+ UIUtils.decodeURLParameter(sortColumn)
+ }
+ .getOrElse(defaultSortColumn)
+ val desc =
+ Option(parameterSortDesc).map(_.toBoolean).getOrElse(sortColumn ==
defaultSortColumn)
+ val pageSize = Option(parameterPageSize).map(_.toInt).getOrElse(100)
+
+ (sortColumn, desc, pageSize)
+ }
+
+ override def pageSizeFormField: String = s"$executionTag.pageSize"
+
+ override def pageNumberFormField: String = s"$executionTag.page"
+
+ override def goButtonFormPath: String =
+
s"$parameterPath&$executionTag.sort=$encodedSortColumn&$executionTag.desc=$desc#$tableHeaderId"
+
+ // Information for each header: title, sortable, tooltip
+ private val headerInfo: Seq[(String, Boolean, Option[String])] = {
+ Seq(
+ ("ID", true, None),
+ ("Description", true, None),
+ ("Num Auron Nodes", true, None),
+ ("Num Fallback Nodes", true, None))
+ }
+
+ override def headers: Seq[Node] = {
+ isAuronSortColumnValid(headerInfo, sortColumn)
+
+ headerAuronRow(
+ headerInfo,
+ desc,
+ pageSize,
+ sortColumn,
+ parameterPath,
+ executionTag,
+ tableHeaderId)
+ }
+
+ def headerAuronRow(
+ headerInfo: Seq[(String, Boolean, Option[String])],
+ desc: Boolean,
+ pageSize: Int,
+ sortColumn: String,
+ parameterPath: String,
+ tableTag: String,
+ headerId: String): Seq[Node] = {
+ val row: Seq[Node] = {
+ headerInfo.map { case (header, sortable, tooltip) =>
+ if (header == sortColumn) {
+ val headerLink = Unparsed(
+ parameterPath +
+ s"&$tableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" +
+ s"&$tableTag.desc=${!desc}" +
+ s"&$tableTag.pageSize=$pageSize" +
+ s"#$headerId")
+ val arrow = if (desc) "▾" else "▴" // UP or DOWN
+
+ <th>
+ <a href={headerLink}>
+ <span data-toggle="tooltip" data-placement="top"
title={tooltip.getOrElse("")}>
+ {header} {Unparsed(arrow)}
+ </span>
+ </a>
+ </th>
+ } else {
+ if (sortable) {
+ val headerLink = Unparsed(
+ parameterPath +
+ s"&$tableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" +
+ s"&$tableTag.pageSize=$pageSize" +
+ s"#$headerId")
+
+ <th>
+ <a href={headerLink}>
+ <span data-toggle="tooltip" data-placement="top"
title={tooltip.getOrElse("")}>
+ {header}
+ </span>
+ </a>
+ </th>
+ } else {
+ <th>
+ <span data-toggle="tooltip" data-placement="top"
title={tooltip.getOrElse("")}>
+ {header}
+ </span>
+ </th>
+ }
+ }
+ }
+ }
+ <thead>
+ <tr>
+ {row}
+ </tr>
+ </thead>
+ }
+
+ def isAuronSortColumnValid(
+ headerInfo: Seq[(String, Boolean, Option[String])],
+ sortColumn: String): Unit = {
+ if (!headerInfo.filter(_._2).map(_._1).contains(sortColumn)) {
+ throw new IllegalArgumentException(s"Unknown column: $sortColumn")
+ }
+ }
+
+ override def row(executionTableRow: AuronExecutionTableRowData): Seq[Node] =
{
+ val executionUIData = executionTableRow.executionUIData
+
+ <tr>
+ <td>
+ {executionUIData.executionId.toString}
+ </td>
+ <td>
+ {descriptionCell(executionUIData)}
+ </td>
+ <td sorttable_customkey={executionUIData.numAuronNodes.toString}>
+ {executionUIData.numAuronNodes.toString}
+ </td>
+ <td sorttable_customkey={executionUIData.numFallbackNodes.toString}>
+ {executionUIData.numFallbackNodes.toString}
+ </td>
+ </tr>
+ }
+
+ private def descriptionCell(execution: AuronSQLExecutionUIData): Seq[Node] =
{
+ val details = if (execution.description != null &&
execution.description.nonEmpty) {
+ val concat = new PlanStringConcat()
+ concat.append("== Fallback Summary ==\n")
+ val fallbackSummary = execution.fallbackNodeToReason
+ .map { case (name, reason) =>
+ val id = name.substring(0, 3)
+ val nodeName = name.substring(4)
+ s"(${id.toInt}) $nodeName: $reason"
+ }
+ .mkString("\n")
+ concat.append(fallbackSummary)
+ if (execution.fallbackNodeToReason.isEmpty) {
+ concat.append("No fallback nodes")
+ }
+ concat.append("\n\n")
+ concat.append(execution.fallbackDescription)
+
+ <span
onclick="this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')"
+ class="expand-details">
+ +details
+ </span> ++
+ <div class="stage-details collapsed">
+ <pre>{concat.toString()}</pre>
+ </div>
+ } else {
+ Nil
+ }
Review Comment:
The “+details” expansion is currently hidden whenever
`execution.description` is empty/null, even though fallback details
(reasons/plan text) may still exist and would be useful. Gate `details` on the
presence of fallback/plan content (e.g., `fallbackDescription` /
`fallbackNodeToReason.nonEmpty`) instead of `description`.
##########
spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronExplainUtils.scala:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.auron
+
+import java.util.Collections.newSetFromMap
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.auron.AuronConvertStrategy.neverConvertReasonTag
+import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression}
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.trees.TreeNodeTag
+import org.apache.spark.sql.execution.{BaseSubqueryExec, InputAdapter,
ReusedSubqueryExec, SparkPlan, WholeStageCodegenExec}
+import org.apache.spark.sql.execution.ExplainUtils.getOpId
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
QueryStageExec}
+import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec}
+
+import org.apache.auron.sparkver
+
+object AuronExplainUtils {
+ private def generateOperatorIDs(
+ plan: QueryPlan[_],
+ startOperatorID: Int,
+ visited: java.util.Set[QueryPlan[_]],
+ reusedExchanges: ArrayBuffer[ReusedExchangeExec],
+ addReusedExchanges: Boolean): Int = {
+ var currentOperationID = startOperatorID
+ if (plan.isInstanceOf[BaseSubqueryExec]) {
+ return currentOperationID
+ }
+
+ def setOpId(plan: QueryPlan[_]): Unit = if (!visited.contains(plan)) {
+ plan match {
+ case r: ReusedExchangeExec if addReusedExchanges =>
+ reusedExchanges.append(r)
+ case _ =>
+ }
+ visited.add(plan)
+ currentOperationID += 1
+ plan.setTagValue(TreeNodeTag[Int]("operatorId"), currentOperationID)
+ }
+
+ plan.foreachUp {
+ case _: WholeStageCodegenExec =>
+ case _: InputAdapter =>
+ case p: AdaptiveSparkPlanExec =>
+ currentOperationID = generateOperatorIDs(
+ p.executedPlan,
+ currentOperationID,
+ visited,
+ reusedExchanges,
+ addReusedExchanges)
+ setOpId(p)
+ case p: QueryStageExec =>
+ currentOperationID = generateOperatorIDs(
+ p.plan,
+ currentOperationID,
+ visited,
+ reusedExchanges,
+ addReusedExchanges)
+ setOpId(p)
+ case other: QueryPlan[_] =>
+ setOpId(other)
+ currentOperationID = other.innerChildren.foldLeft(currentOperationID)
{ (curId, plan) =>
+ generateOperatorIDs(plan, curId, visited, reusedExchanges,
addReusedExchanges)
+ }
+ }
+ currentOperationID
+ }
+
+ private def getSubqueries(
+ plan: => QueryPlan[_],
+ subqueries: ArrayBuffer[(SparkPlan, Expression, BaseSubqueryExec)]):
Unit = {
+ plan.foreach {
+ case a: AdaptiveSparkPlanExec =>
+ getSubqueries(a.executedPlan, subqueries)
+ case q: QueryStageExec =>
+ getSubqueries(q.plan, subqueries)
+ case p: SparkPlan =>
+ p.expressions.foreach(_.collect { case e: PlanExpression[_] =>
+ e.plan match {
+ case s: BaseSubqueryExec =>
+ subqueries += ((p, e, s))
+ getSubqueries(s, subqueries)
+ case _ =>
+ }
+ })
+ }
+ }
+
+ private def processPlanSkippingSubqueries[T <: QueryPlan[T]](
+ plan: T,
+ append: String => Unit): Unit = {
+ try {
+
+ QueryPlan.append(plan, append, verbose = false, addSuffix = false,
printOperatorId = true)
+
+ append("\n")
+ } catch {
+ case e: AnalysisException => append(e.toString)
+ }
+ }
+
+ private def collectFallbackNodes(plan: QueryPlan[_]): (Int, Map[String,
String]) = {
+ var numAuronNodes = 0
+ val fallbackNodeToReason = new mutable.HashMap[String, String]
+
+ def collect(tmp: QueryPlan[_]): Unit = {
+ tmp.foreachUp {
+ case p: ExecutedCommandExec =>
+ handleVanillaSparkPlan(p, fallbackNodeToReason)
+ case p: AdaptiveSparkPlanExec =>
+ handleVanillaSparkPlan(p, fallbackNodeToReason)
+ collect(p.executedPlan)
+ case p: QueryStageExec =>
+ handleVanillaSparkPlan(p, fallbackNodeToReason)
+ collect(p.plan)
+ case p: NativeSupports =>
+ numAuronNodes += 1
+ p.innerChildren.foreach(collect)
+ case p: SparkPlan =>
+ handleVanillaSparkPlan(p, fallbackNodeToReason)
+ p.innerChildren.foreach(collect)
+ case _ =>
+ }
+ }
+
+ collect(plan)
Review Comment:
`collectFallbackNodes` traverses the plan multiple times because it uses
`foreachUp` (which already walks the subtree) and also recursively calls
`collect(...)` on children/subplans inside the match arms. This will over-count
`numAuronNodes` and can duplicate fallback collection work (and can degrade
significantly on large plans). Refactor to do a single traversal (e.g., one
`foreachUp`/`foreach` walk) and handle adaptive/query-stage subplans without
re-traversing already-visited nodes (use a visited identity set if needed).
```suggestion
// Use an identity-based visited set to avoid re-traversing shared
subplans
val visited =
newSetFromMap[QueryPlan[_]](
new java.util.IdentityHashMap[QueryPlan[_], java.lang.Boolean]()
)
def traverse(root: QueryPlan[_]): Unit = {
// Skip traversal if we've already seen this plan instance
if (!visited.add(root)) {
return
}
root.foreachUp {
case p: ExecutedCommandExec =>
handleVanillaSparkPlan(p, fallbackNodeToReason)
case p: AdaptiveSparkPlanExec =>
handleVanillaSparkPlan(p, fallbackNodeToReason)
// Traverse the executed plan as a separate root, guarding with
visited set
traverse(p.executedPlan)
case p: QueryStageExec =>
handleVanillaSparkPlan(p, fallbackNodeToReason)
// Traverse the underlying stage plan as a separate root, guarding
with visited set
traverse(p.plan)
case p: NativeSupports =>
numAuronNodes += 1
case p: SparkPlan =>
handleVanillaSparkPlan(p, fallbackNodeToReason)
case _ =>
}
}
traverse(plan)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]