LuciferYang commented on code in PR #55787:
URL: https://github.com/apache/spark/pull/55787#discussion_r3231749167
##########
sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala:
##########
@@ -85,7 +91,10 @@ private[v1] class SqlResource extends BaseAppResource {
// Echo draw counter to prevent stale responses
val draw = Option(uriParams.getFirst("draw")).map(_.toInt).getOrElse(0)
- val totalRecords = sqlStore.executionsCount()
+ // Sub-execution grouping flag; default to the cluster config
+ val groupSubExec = Option(uriParams.getFirst("groupSubExecution"))
+ .map(_.toBoolean)
Review Comment:
`Option(uriParams.getFirst("groupSubExecution")).map(_.toBoolean)` throws
`IllegalArgumentException` on any value other than `true` / `false` (Scala's
`.toBoolean` is case-insensitive) — e.g. `yes`, `1`, empty string — turning a
malformed query string into a 500 rather than falling back to the default.
Suggest:
```scala
val groupSubExec = Option(uriParams.getFirst("groupSubExecution"))
.flatMap(v => Try(v.toBoolean).toOption)
.getOrElse(ui.conf.get(UI_SQL_GROUP_SUB_EXECUTION_ENABLED))
```
The current frontend only ever sends `"true"` / `"false"`, so this is
unreachable via our UI — but this endpoint is a public REST surface (other
tooling, scripts, history-server replay all call it), and defensive parsing of
externally-supplied params is the baseline. The nearby `.toInt` calls on
`start` / `length` / `draw` carry the same flavor of pre-existing risk — worth
bundling a follow-up.
##########
sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala:
##########
@@ -125,26 +143,45 @@ private[v1] class SqlResource extends BaseAppResource {
val start = Option(uriParams.getFirst("start")).map(_.toInt).getOrElse(0)
val length =
Option(uriParams.getFirst("length")).map(_.toInt).getOrElse(20)
- val page = if (needsFilter) {
- // Filter/search: sort and paginate in memory
- val sorted = sortExecs(filteredExecs, sortCol, sortDir)
- if (length > 0) sorted.slice(start, start + length) else sorted
- } else {
- // No filter: use KVStore-level pagination for efficiency
- // KVStore returns in insertion order; sort in memory for the page
- val execs = sqlStore.executionsList()
- val sorted = sortExecs(execs, sortCol, sortDir)
- if (length > 0) sorted.slice(start, start + length) else sorted
+ val sortedRoots = sortExecs(rootRows, sortCol, sortDir)
+ val page = if (length > 0) sortedRoots.slice(start, start + length) else
sortedRoots
+
+ // Convert to Java-compatible row data; embed sub-executions when
grouping
+ val aaData = page.map { exec =>
+ val row = execToRow(exec)
+ if (groupSubExec) {
+ val subs = subsByRoot.getOrElse(exec.executionId, Seq.empty)
+ if (subs.nonEmpty) {
+ // Sort subs by id ascending so they appear in chronological order
+ val subRows = new
java.util.ArrayList[java.util.LinkedHashMap[String, Object]]()
+ sortExecs(subs, "id", "asc").foreach(s =>
subRows.add(execToRow(s)))
+ row.put("subExecutions", subRows)
+ }
+ }
+ row
}
- // Convert to Java-compatible row data
- val aaData = page.map(execToRow)
+ // Counts: when grouping, totals reflect root-only counts so DataTables
shows
+ // "Showing X to Y of Z entries" matching the rows the user actually
sees.
+ val recordsTotal = if (groupSubExec) {
+ if (needsFilter) {
+ // Re-derive root rows from the unfiltered set using the same
predicate
+ SqlResource.partitionRoots(allExecs)._1.size
+ } else {
+ rootRows.size
+ }
+ } else if (needsFilter) {
+ filteredExecs.size
+ } else {
+ sqlStore.executionsCount()
+ }
+ val recordsFiltered = if (groupSubExec) rootRows.size else
filteredExecs.size
Review Comment:
The grouped-mode handling here is correct (unfiltered root count vs.
filtered root count, matching DataTables' `filtered from W total entries`
string). But the flat-mode branch still carries the pre-existing bug: when
`needsFilter` is true, `recordsTotal` is set to `filteredExecs.size`, which
equals `recordsFiltered`, hiding the `filtered from W total entries` counter in
the UI. Since we're already rewriting this block, consider aligning flat mode:
```scala
val recordsTotal = if (groupSubExec) {
if (needsFilter) SqlResource.partitionRoots(allExecs)._1.size else
rootRows.size
} else {
sqlStore.executionsCount() // flat mode total is always the full count
}
val recordsFiltered = if (groupSubExec) rootRows.size else filteredExecs.size
```
This unifies the semantics across modes. Not introduced by this PR, but
near-free to fix alongside.
##########
sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala:
##########
@@ -125,26 +143,45 @@ private[v1] class SqlResource extends BaseAppResource {
val start = Option(uriParams.getFirst("start")).map(_.toInt).getOrElse(0)
val length =
Option(uriParams.getFirst("length")).map(_.toInt).getOrElse(20)
- val page = if (needsFilter) {
- // Filter/search: sort and paginate in memory
- val sorted = sortExecs(filteredExecs, sortCol, sortDir)
- if (length > 0) sorted.slice(start, start + length) else sorted
- } else {
- // No filter: use KVStore-level pagination for efficiency
- // KVStore returns in insertion order; sort in memory for the page
- val execs = sqlStore.executionsList()
- val sorted = sortExecs(execs, sortCol, sortDir)
- if (length > 0) sorted.slice(start, start + length) else sorted
+ val sortedRoots = sortExecs(rootRows, sortCol, sortDir)
+ val page = if (length > 0) sortedRoots.slice(start, start + length) else
sortedRoots
+
+ // Convert to Java-compatible row data; embed sub-executions when
grouping
+ val aaData = page.map { exec =>
+ val row = execToRow(exec)
+ if (groupSubExec) {
+ val subs = subsByRoot.getOrElse(exec.executionId, Seq.empty)
+ if (subs.nonEmpty) {
Review Comment:
nit: `row.put("subExecutions", subRows)` only runs when `subs.nonEmpty`, so
rows without children simply don't carry the field. The frontend handles it via
`row.subExecutions || []`, but external JSON consumers (other tooling, future
API stabilization, stricter test assertions) see an inconsistent schema. Always
emitting an (empty) list is cheaper for everyone:
```scala
if (groupSubExec) {
val subs = subsByRoot.getOrElse(exec.executionId, Seq.empty)
row.put("subExecutions", sortExecs(subs, "id", "asc").map(execToRow))
}
```
Also matches the `(row \ "subExecutions").children` idiom already used in
the test.
##########
sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala:
##########
@@ -125,26 +143,45 @@ private[v1] class SqlResource extends BaseAppResource {
val start = Option(uriParams.getFirst("start")).map(_.toInt).getOrElse(0)
val length =
Option(uriParams.getFirst("length")).map(_.toInt).getOrElse(20)
- val page = if (needsFilter) {
- // Filter/search: sort and paginate in memory
- val sorted = sortExecs(filteredExecs, sortCol, sortDir)
- if (length > 0) sorted.slice(start, start + length) else sorted
- } else {
- // No filter: use KVStore-level pagination for efficiency
- // KVStore returns in insertion order; sort in memory for the page
- val execs = sqlStore.executionsList()
- val sorted = sortExecs(execs, sortCol, sortDir)
- if (length > 0) sorted.slice(start, start + length) else sorted
+ val sortedRoots = sortExecs(rootRows, sortCol, sortDir)
+ val page = if (length > 0) sortedRoots.slice(start, start + length) else
sortedRoots
+
+ // Convert to Java-compatible row data; embed sub-executions when
grouping
+ val aaData = page.map { exec =>
+ val row = execToRow(exec)
+ if (groupSubExec) {
+ val subs = subsByRoot.getOrElse(exec.executionId, Seq.empty)
+ if (subs.nonEmpty) {
+ // Sort subs by id ascending so they appear in chronological order
+ val subRows = new
java.util.ArrayList[java.util.LinkedHashMap[String, Object]]()
+ sortExecs(subs, "id", "asc").foreach(s =>
subRows.add(execToRow(s)))
Review Comment:
how about
```scala
row.put("subExecutions", sortExecs(subs, "id", "asc").map(execToRow))
```
##########
sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala:
##########
@@ -215,6 +216,116 @@ class SqlResourceWithActualMetricsSuite
}
}
+ test("SPARK-56811: sqlTable groups sub-executions under their root
execution") {
+ // CACHE TABLE produces a root execution plus an inner sub-execution that
+ // shares its rootExecutionId. This is the canonical case where the SQL
+ // listing should fold the sub row under the root rather than flattening
it.
+ spark.sql("CREATE OR REPLACE TEMP VIEW spark_56811 AS SELECT id FROM
RANGE(10)")
+ .collect()
+ spark.sql("CACHE TABLE spark_56811_cached AS SELECT * FROM
spark_56811").collect()
+ try {
+ eventually(timeout(10.seconds), interval(1.second)) {
+ val baseUrl = spark.sparkContext.ui.get.webUrl +
+
s"/api/v1/applications/${spark.sparkContext.applicationId}/sql/sqlTable"
+
+ // Grouping ON: roots only, with subExecutions embedded on the root
that
+ // owns a sub-execution.
+ val groupedUrl = new URI(
+ s"$baseUrl?start=0&length=100&draw=1&groupSubExecution=true").toURL
+ val (groupedCode, groupedOpt, _) = getContentAndCode(groupedUrl)
+ assert(groupedCode === HttpServletResponse.SC_OK)
+ val groupedJson = JsonMethods.parse(groupedOpt.get)
+ val groupedRecordsTotal = (groupedJson \ "recordsTotal").extract[Long]
+ val groupedRecordsFiltered = (groupedJson \
"recordsFiltered").extract[Long]
+ val groupedRows = (groupedJson \ "aaData").children
+ assert(groupedRecordsTotal === groupedRows.size,
+ "with no filter, recordsTotal should match returned root count")
+ assert(groupedRecordsFiltered === groupedRows.size,
+ "with no filter, recordsFiltered should match returned root count")
+ // Every row in grouped mode is either a true root (id ==
rootExecutionId)
+ // or an orphan sub whose real parent is absent from the result set.
+ val visibleIds = groupedRows.map(r => (r \ "id").extract[Long]).toSet
+ groupedRows.foreach { row =>
+ val id = (row \ "id").extract[Long]
+ val rootId = (row \ "rootExecutionId").extract[Long]
+ assert(id == rootId || !visibleIds.contains(rootId),
+ s"grouped row $id (rootId=$rootId) is neither a root nor an
orphan")
+ }
+ val rootsWithSubs = groupedRows.filter { row =>
+ (row \ "subExecutions").children.nonEmpty
+ }
+ assert(rootsWithSubs.nonEmpty,
+ "CACHE TABLE should produce at least one root with sub-executions")
+ rootsWithSubs.foreach { row =>
+ val rootId = (row \ "id").extract[Long]
+ (row \ "subExecutions").children.foreach { sub =>
+ assert((sub \ "rootExecutionId").extract[Long] === rootId,
+ "sub-execution should reference its parent root")
+ assert((sub \ "id").extract[Long] !== rootId,
+ "sub-execution must not have the same id as its root")
+ }
+ }
+
+ // Grouping OFF: flat list of every execution, with no embedded subs.
+ val flatUrl = new URI(
+ s"$baseUrl?start=0&length=100&draw=2&groupSubExecution=false").toURL
+ val (flatCode, flatOpt, _) = getContentAndCode(flatUrl)
+ assert(flatCode === HttpServletResponse.SC_OK)
+ val flatJson = JsonMethods.parse(flatOpt.get)
+ val flatRows = (flatJson \ "aaData").children
+ assert(flatRows.size > groupedRows.size,
+ "flat listing should contain at least one extra sub-execution row")
+ flatRows.foreach { row =>
+ assert((row \ "subExecutions").children.isEmpty,
+ "flat listing should not embed subExecutions")
+ }
+ }
+ } finally {
+ spark.sql("UNCACHE TABLE IF EXISTS spark_56811_cached")
+ }
+ }
+
+ test("SPARK-56811: partitionRoots surfaces orphan sub-executions as root
rows") {
Review Comment:
This test already covers the predicate. What's missing is an end-to-end
test: an HTTP request with `search[value]=xxx` (or a `status=...`) that filters
out a root so its sub surfaces as a root row, verifying `recordsTotal` still
equals the unfiltered root count. Suggest a standalone test:
```scala
test("SPARK-56811: orphan sub-executions surface as root rows when the root
is filtered out") {
// Produce a scenario where the sub's description is searchable but the
// root's is not — e.g. root description = "foo", sub's description (via
// CTAS) contains a distinct substring. Then hit the endpoint with
// search[value]=<sub-only-substring>, assert an orphan row (id !=
// rootExecutionId) appears in aaData, recordsTotal === all roots count,
// recordsFiltered === orphan count.
}
```
The orphan branch (`!ids.contains(e.rootExecutionId)` in `partitionRoots`)
is the riskiest new logic in this PR — an e2e case gives much higher confidence
than the unit test alone.
##########
sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala:
##########
@@ -215,6 +216,116 @@ class SqlResourceWithActualMetricsSuite
}
}
+ test("SPARK-56811: sqlTable groups sub-executions under their root
execution") {
+ // CACHE TABLE produces a root execution plus an inner sub-execution that
+ // shares its rootExecutionId. This is the canonical case where the SQL
+ // listing should fold the sub row under the root rather than flattening
it.
+ spark.sql("CREATE OR REPLACE TEMP VIEW spark_56811 AS SELECT id FROM
RANGE(10)")
+ .collect()
+ spark.sql("CACHE TABLE spark_56811_cached AS SELECT * FROM
spark_56811").collect()
+ try {
+ eventually(timeout(10.seconds), interval(1.second)) {
+ val baseUrl = spark.sparkContext.ui.get.webUrl +
+
s"/api/v1/applications/${spark.sparkContext.applicationId}/sql/sqlTable"
+
+ // Grouping ON: roots only, with subExecutions embedded on the root
that
+ // owns a sub-execution.
+ val groupedUrl = new URI(
+ s"$baseUrl?start=0&length=100&draw=1&groupSubExecution=true").toURL
+ val (groupedCode, groupedOpt, _) = getContentAndCode(groupedUrl)
+ assert(groupedCode === HttpServletResponse.SC_OK)
+ val groupedJson = JsonMethods.parse(groupedOpt.get)
+ val groupedRecordsTotal = (groupedJson \ "recordsTotal").extract[Long]
+ val groupedRecordsFiltered = (groupedJson \
"recordsFiltered").extract[Long]
+ val groupedRows = (groupedJson \ "aaData").children
+ assert(groupedRecordsTotal === groupedRows.size,
+ "with no filter, recordsTotal should match returned root count")
+ assert(groupedRecordsFiltered === groupedRows.size,
+ "with no filter, recordsFiltered should match returned root count")
+ // Every row in grouped mode is either a true root (id ==
rootExecutionId)
+ // or an orphan sub whose real parent is absent from the result set.
+ val visibleIds = groupedRows.map(r => (r \ "id").extract[Long]).toSet
+ groupedRows.foreach { row =>
+ val id = (row \ "id").extract[Long]
+ val rootId = (row \ "rootExecutionId").extract[Long]
+ assert(id == rootId || !visibleIds.contains(rootId),
+ s"grouped row $id (rootId=$rootId) is neither a root nor an
orphan")
+ }
+ val rootsWithSubs = groupedRows.filter { row =>
+ (row \ "subExecutions").children.nonEmpty
+ }
+ assert(rootsWithSubs.nonEmpty,
+ "CACHE TABLE should produce at least one root with sub-executions")
+ rootsWithSubs.foreach { row =>
+ val rootId = (row \ "id").extract[Long]
+ (row \ "subExecutions").children.foreach { sub =>
+ assert((sub \ "rootExecutionId").extract[Long] === rootId,
+ "sub-execution should reference its parent root")
+ assert((sub \ "id").extract[Long] !== rootId,
+ "sub-execution must not have the same id as its root")
+ }
+ }
+
+ // Grouping OFF: flat list of every execution, with no embedded subs.
+ val flatUrl = new URI(
+ s"$baseUrl?start=0&length=100&draw=2&groupSubExecution=false").toURL
+ val (flatCode, flatOpt, _) = getContentAndCode(flatUrl)
+ assert(flatCode === HttpServletResponse.SC_OK)
+ val flatJson = JsonMethods.parse(flatOpt.get)
+ val flatRows = (flatJson \ "aaData").children
+ assert(flatRows.size > groupedRows.size,
Review Comment:
```
val embeddedSubs = groupedRows.map(r => (r \
"subExecutions").children.size).sum
assert(flatRows.size === groupedRows.size + embeddedSubs,
"flat_size = grouped_size + embedded_subs")
```
##########
sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/allexecutionspage.js:
##########
@@ -146,83 +241,56 @@ $(document).ready(function () {
if (sel) {
d.status = sel;
}
+ d.groupSubExecution = groupSubExecEnabled ? "true" : "false";
},
dataSrc: function (json) { return json.aaData; },
error: function () {
$("#sql-table_processing").css("display", "none");
}
},
- columns: [
- {
- data: "id", name: "id", title: "ID",
- render: function (data, type) {
- if (type !== "display") return data;
- var basePath = uiRoot + appBasePath;
- return '<a href="' + basePath + '/SQL/execution/?id=' + data +
'">' +
- data + '</a>';
- }
- },
- {
- data: "queryId", name: "queryId", title: "Query ID",
- orderable: false,
- render: function (data, type) {
- if (type !== "display" || !data) return data || "";
- return '<span title="' + data + '">' + data.substring(0, 8) +
'...</span>';
- }
- },
- {
- data: "status", name: "status", title: "Status",
- render: function (data, type) {
- if (type !== "display") return data;
- return statusBadge(data);
- }
- },
- {
- data: "description", name: "description", title: "Description",
- render: function (data, type, row) {
- if (type !== "display") return data || "";
- return descriptionHtml({ id: row.id, description: data });
- }
- },
- {
- data: "submissionTime", name: "submissionTime", title: "Submitted",
- render: function (data, type) {
- if (type !== "display") return data;
- return formatDateSql(data);
- }
- },
- {
- data: "duration", name: "duration", title: "Duration",
- render: function (data, type) {
- if (type !== "display") return data;
- return formatDurationSql(data);
- }
- },
- {
- data: "jobIds", name: "jobIds", title: "Succeeded Jobs",
- orderable: false,
- render: function (data, type) {
- if (type !== "display") return (data || []).join(",");
- return jobIdLinks(data || []);
- }
- },
- {
- data: "errorMessage", name: "errorMessage", title: "Error Message",
- orderable: false,
- render: function (data, type) {
- if (type !== "display" || !data) return data || "";
- if (data.length > 100) {
- return '<span title="' + escapeHtml(data) + '">' +
- escapeHtml(data.substring(0, 100)) + '...</span>';
- }
- return escapeHtml(data);
- }
- }
- ],
+ columns: columns,
order: [[0, "desc"]],
language: { search: "Search: " }
});
+ // Child-row expansion for sub-executions. Sub data is embedded per root
row
+ // in the server payload (`row.subExecutions`), so no second fetch is
needed.
+ if (groupSubExecEnabled) {
+ $("#sql-table tbody").on("click", "a.toggle-sub-exec", function (e) {
Review Comment:
Expand state is tracked only via the `<tr>.shown` class and
`dtRow.child(...)`. Under `serverSide: true`, every sort / filter / page-change
destroys the rows and recreates them, losing both the class and the child row.
A user expanding a row and then sorting will see the table collapse back.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]