cloud-fan commented on code in PR #55569:
URL: https://github.com/apache/spark/pull/55569#discussion_r3159893045
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala:
##########
@@ -349,4 +350,30 @@ private[sql] object CatalogManager {
compact(JArray(entries.map(parts =>
JArray(parts.map(JString(_)).toList)).toList))
}
+
+ /**
+ * Parse a stored frozen path string from view/function metadata.
+ * Returns None if the payload is malformed.
+ */
+ def deserializePathEntries(storedPathStr: String): Option[Seq[Seq[String]]]
= {
+ import org.json4s.JsonAST.{JArray, JString}
+ import org.json4s.jackson.JsonMethods.parse
+
+ Try(parse(storedPathStr)).toOption match {
+ case Some(JArray(entries)) if entries.nonEmpty =>
+ val converted = entries.foldLeft(Option(Seq.empty[Seq[String]])) {
(acc, entry) =>
+ acc.flatMap { collected =>
+ entry match {
+ case JArray(parts) if parts.nonEmpty =>
+ val strings = parts.collect { case JString(s) => s }
+ if (strings.size == parts.size) Some(collected :+ strings)
+ else None
+ case _ => None
+ }
+ }
+ }
+ converted.filter(_.nonEmpty)
Review Comment:
nit: this `filter(_.nonEmpty)` is unreachable. The outer `case
Some(JArray(entries)) if entries.nonEmpty` already guarantees `entries` is
non-empty, and the fold appends exactly one inner `Seq` per entry (or returns
`None` on the first malformed entry), so when the fold returns `Some(s)`,
`s.size == entries.size > 0`. The filter never trims anything.
```suggestion
converted
```
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala:
##########
@@ -349,4 +350,30 @@ private[sql] object CatalogManager {
compact(JArray(entries.map(parts =>
JArray(parts.map(JString(_)).toList)).toList))
}
+
+ /**
+ * Parse a stored frozen path string from view/function metadata.
+ * Returns None if the payload is malformed.
+ */
+ def deserializePathEntries(storedPathStr: String): Option[Seq[Seq[String]]]
= {
+ import org.json4s.JsonAST.{JArray, JString}
+ import org.json4s.jackson.JsonMethods.parse
+
+ Try(parse(storedPathStr)).toOption match {
+ case Some(JArray(entries)) if entries.nonEmpty =>
+ val converted = entries.foldLeft(Option(Seq.empty[Seq[String]])) {
(acc, entry) =>
+ acc.flatMap { collected =>
+ entry match {
+ case JArray(parts) if parts.nonEmpty =>
+ val strings = parts.collect { case JString(s) => s }
+ if (strings.size == parts.size) Some(collected :+ strings)
+ else None
+ case _ => None
+ }
+ }
+ }
+ converted.filter(_.nonEmpty)
+ case _ => None
Review Comment:
Question on the silent-fallback policy: if `viewStoredResolutionPath` /
`functionStoredResolutionPath` is present but unparseable (corruption,
version-skew bug, manual edit), this returns `None` and the call site falls
back to the live session path with no signal. The view/function appears to
"work" but with the wrong PATH semantics. Could we `logWarning` on the
structurally-broken branches (parse failure, non-array root, malformed entry)
so an operator has something to grep for? The legitimately-empty case (e.g.
stored `[]` after stripping `system.session`) probably still wants to be
silent, but the corrupted cases feel different.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/SQLFunctionSuite.scala:
##########
@@ -111,4 +112,117 @@ class SQLFunctionSuite extends SharedSparkSession {
)
}
}
+
+ test("SPARK-56639: SQL function uses frozen SQL path") {
+ withSQLConf(SQLConf.PATH_ENABLED.key -> "true") {
+ withDatabase("path_func_db_a", "path_func_db_b") {
+ withTable("path_func_db_a.frozen_t", "path_func_db_b.frozen_t") {
+ withUserDefinedFunction("frozen_fn" -> false) {
+ sql("USE default")
+ sql("CREATE DATABASE path_func_db_a")
+ sql("CREATE DATABASE path_func_db_b")
+ sql("CREATE TABLE path_func_db_a.frozen_t USING parquet AS SELECT
10 AS id")
+ sql("CREATE TABLE path_func_db_b.frozen_t USING parquet AS SELECT
20 AS id")
+ try {
+ sql("SET PATH = spark_catalog.path_func_db_a, system.builtin")
+ sql(
+ """
+ |CREATE FUNCTION frozen_fn()
+ |RETURNS INT
+ |RETURN (SELECT MAX(id) FROM frozen_t)
+ |""".stripMargin)
+ sql("SET PATH = spark_catalog.path_func_db_b, system.builtin")
+
+ checkAnswer(sql("SELECT MAX(id) FROM frozen_t"), Row(20))
+ checkAnswer(sql("SELECT default.frozen_fn()"), Row(10))
+ } finally {
+ sql("SET PATH = DEFAULT_PATH")
+ }
+ }
+ }
+ }
+ }
+ }
+
+ test("SPARK-56639: SQL table function uses frozen SQL path") {
+ withSQLConf(SQLConf.PATH_ENABLED.key -> "true") {
+ withDatabase("path_tvf_db_a", "path_tvf_db_b") {
+ withTable("path_tvf_db_a.frozen_t", "path_tvf_db_b.frozen_t") {
+ withUserDefinedFunction("frozen_tvf" -> false) {
+ sql("USE default")
+ sql("CREATE DATABASE path_tvf_db_a")
+ sql("CREATE DATABASE path_tvf_db_b")
+ sql("CREATE TABLE path_tvf_db_a.frozen_t USING parquet AS SELECT
100 AS id")
+ sql("CREATE TABLE path_tvf_db_b.frozen_t USING parquet AS SELECT
200 AS id")
+ try {
+ sql("SET PATH = spark_catalog.path_tvf_db_a, system.builtin")
+ sql(
+ """
+ |CREATE FUNCTION frozen_tvf()
+ |RETURNS TABLE(id INT)
+ |RETURN SELECT MAX(id) AS id FROM frozen_t
+ |""".stripMargin)
+ sql("SET PATH = spark_catalog.path_tvf_db_b, system.builtin")
+
+ checkAnswer(sql("SELECT MAX(id) FROM frozen_t"), Row(200))
+ checkAnswer(sql("SELECT * FROM default.frozen_tvf()"), Row(100))
+ } finally {
+ sql("SET PATH = DEFAULT_PATH")
+ }
+ }
+ }
+ }
+ }
+ }
+
+ test("SPARK-56639: current_schema/current_path in SQL functions use invoker
context") {
Review Comment:
These two `current_schema/current_path … use invoker context` tests (here
and in `SQLViewSuite`) are useful regression guards, but they don't actually
exercise the wiring this PR adds — `CURRENT_SCHEMA` / `CURRENT_PATH` are
replaced post-analysis by `ReplaceCurrentLike` (in `finishAnalysis.scala`)
using live `catalogManager` state, independently of
`AnalysisContext.resolutionPathEntries`. They would have passed before this PR.
The names read like positive feature tests for the new wiring. A one-line
comment in each test marking them as a regression guard ("ensures the frozen
path doesn't leak into `current_schema` / `current_path`") would prevent future
readers from misreading them as evidence the wiring works.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -224,7 +225,10 @@ object AnalysisContext {
def withAnalysisContext[A](function: SQLFunction)(f: => A): A = {
val originContext = value.get()
- val context = originContext.copy(collation = function.collation)
+ val context = originContext.copy(
+ resolutionPathEntries = function.functionStoredResolutionPath
+ .flatMap(CatalogManager.deserializePathEntries),
+ collation = function.collation)
Review Comment:
Just to confirm the intent (no change requested): for a SQL function with no
stored path, `resolutionPathEntries` is now explicitly overwritten to `None`
rather than inherited from `originContext`. That mirrors the existing
`collation = function.collation` overwrite on the same line — i.e. a function
body is path-independent of its caller. Pre-PR this was a no-op since views
also didn't set the field, but now that views do, it's a real semantic. Worth a
one-line comment in the body, or a sentence in the PR description, so the
choice is documented?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]