This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git

commit 1ddac97ce493a675bddee61c7a247bbee36832e1
Author: Serge Rielau <[email protected]>
AuthorDate: Mon May 4 10:32:50 2026 -0700

    [SPARK-56681][SQL] PATH cleanup
    
    ### What changes were proposed in this pull request?
    
    Address the open follow-ups from 
[SPARK-56681](https://issues.apache.org/jira/browse/SPARK-56681) (umbrella for 
PATH / SPARK-56605 cleanup) in a single cleanup PR. Items #1 and #2 were 
already wired by SPARK-56639; this PR covers the remainder.
    
    | # | Item | Resolution |
    |---|---|---|
    | #1 | `FunctionResolution.resolveProcedure` was dead code | Already wired 
by SPARK-56639 (no action). |
    | #2 | Frozen view / SQL-function PATH wiring unfinished | Already done by 
SPARK-56639 (no action). |
    | #3 | `AnalysisContext.resolutionPathEntries` threadlocal | Audit only: 
confirmed `withNewAnalysisContext` / `reset()` correctly clear it. Full removal 
needs a coordinated refactor to plumb the path through `RelationResolution` / 
`FunctionResolution` method calls; flagged as a follow-up. |
    | #4 | `Analyzer.executeAndCheck` clobbers outer `SQLConf.withExistingConf` 
| Extracted `runWithSessionConf` helper, added `SQLConf.getExistingConfIfSet`. 
`executeAndCheck` and `executeSameContext` now share one path that yields to 
any outer scope. |
    | #5 | `VariableResolution.allowUnqualifiedSessionTempVariableLookup` 
force-loads default catalog | Replaced the hot-path catalog read with 
`CatalogManager.isSystemSessionOnPath`, which inspects stored session-path 
entries directly. No catalog load on column resolution. |
    | #6 | `DROP VARIABLE` PATH gate asymmetric with `DECLARE` / `CREATE` | 
Removed the gate. DDL on session variables (`DECLARE` / `CREATE` / `DROP`) 
always targets `system.session` directly; only DML (`SET VAR`, `SELECT x`) goes 
through PATH. |
    | #7 | `lookupFunctionType` exception swallow too broad | Narrowed from 
`NonFatal` to the explicit not-found list (`NoSuchFunctionException`, 
`NoSuchNamespaceException`, `CatalogNotFoundException`, `FORBIDDEN_OPERATION`). 
Other exceptions propagate. |
    | #8 | `lookupFunctionType` fan-out had wasteful `system.*` candidates | 
Filtered them out — `system.session`, `system.builtin`, `system.ai` are already 
resolved earlier in the same method. |
    | #9 | Three near-duplicate path-resolution helpers | Lifted into 
`CatalogManager.resolutionPathEntriesForAnalysis(pinnedEntries, 
viewCatalogAndNamespace)`. Relation, routine, and procedure resolution all 
route through it. |
    | #10 | Tests for the new error paths and gates | Added a DECLARE / SET VAR 
/ DROP cycle test under non-default PATH and a struct-variable 
field-vs-qualified ambiguity test in `sql-session-variables.sql`. |
    | #11 | `ProtoToParsedPlanTestSuite.analyzerIsolationConf` was a bare 
`SQLConf` | Clone `spark.sessionState.conf` and only override 
`PATH_ENABLED=false`, so all `sparkConf` overrides (ANSI, alias config, ...) 
propagate automatically. |
    | Bonus | `ResolveSetVariable` hardcoded `SYSTEM.SESSION` regardless of 
actual PATH | `unresolvedVariableError` now takes `Seq[Seq[String]]` path 
entries with **required** `Origin` (no overloads). DML lookup failures (`SET 
VAR`, `FETCH ... INTO`) report the full SQL path as a bracketed list, 
byte-for-byte consistent with `UNRESOLVED_ROUTINE` and 
`TABLE_OR_VIEW_NOT_FOUND`. DDL name validation in `ResolveCatalogs` continues 
to report `[system.session]` since PATH does not apply there. O [...]
    
    ### Why are the changes needed?
    
    These are the cleanup items called out on SPARK-56681 from the post-merge 
source review of SPARK-56605. They eliminate dead code paths, plug user-visible 
bugs (force-loading a misconfigured default catalog on column resolution; 
clobbering pinned session configs; swallowing real catalog errors as 
`UNRESOLVED_ROUTINE`), remove the asymmetry between DDL and DML on session 
variables, and make `UNRESOLVED_VARIABLE` self-consistent with the other "not 
found" errors.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes.
    
    - **`UNRESOLVED_VARIABLE.searchPath`** is now rendered as a bracketed list. 
For DML lookups (`SET VAR`, `FETCH ... INTO`), the list reflects the actual SQL 
PATH that was consulted instead of a hardcoded `SYSTEM.SESSION`. For DDL name 
validation (`DECLARE` / `DROP` with a non-session namespace), the list is `[`` 
`system`.`session` ``]` since PATH does not apply.
    - **`UNRESOLVED_VARIABLE`** now always carries a `queryContext` that 
highlights just the offending variable identifier (e.g. `"builtin.var1"`, 
`"ses.var1"`), not the whole `DECLARE` / `SET VAR` statement.
    - **`DROP TEMPORARY VARIABLE`** no longer raises `UNRESOLVED_VARIABLE` when 
the SQL PATH does not contain `system.session`. DDL on session variables 
ignores PATH, matching the existing behaviour of `DECLARE OR REPLACE VARIABLE`.
    - **`lookupFunctionType`** no longer swallows non–`NotFound` errors. A 
catalog reporting `PERMISSION_DENIED` (or similar) for a function lookup now 
propagates instead of silently producing `UNRESOLVED_ROUTINE`.
    
    ### How was this patch tested?
    
    - Added `sql-session-variables.sql` regression test for the struct-variable 
field-vs-qualified ambiguity (`DECLARE VARIABLE session STRUCT<a INT>` → 
`SELECT session.a` succeeds → `DROP` → `SELECT session.a` falls through to 
`UNRESOLVED_COLUMN`).
    - Updated `SetPathSuite`: DECLARE / SET VAR / DROP cycle under a 
non-default PATH; bonus test asserts the actual rendered search path and the 
variable-identifier `queryContext`.
    - Updated `SqlScriptingExecutionSuite` for the new bracketed `searchPath` 
and identifier-pinned `queryContext`.
    - Regenerated `sql-session-variables.sql.out` for the new error shape.
    - Added `resolutionPathEntriesForAnalysis` stubs to mocked `CatalogManager` 
instances in `PlanResolutionSuite`, `AlignAssignmentsSuiteBase`, and 
`TableLookupCacheSuite`.
    - Ran focused suites locally; all pass:
      - `build/sbt 'sql/testOnly *SetPathSuite *SqlScriptingExecutionSuite 
*ExecuteImmediateEndToEndSuite'`
      - `build/sbt 'sql/testOnly *SimpleSQLViewSuite *SQLFunctionSuite'`
      - `build/sbt 'sql/testOnly *PlanResolutionSuite 
*UpdateTableAlignAssignmentsSuite *MergeIntoTableAlignAssignmentsSuite'`
      - `build/sbt 'catalyst/testOnly *TableLookupCacheSuite *AnalysisSuite 
*AnalysisErrorSuite *LookupFunctionsSuite'`
      - `build/sbt 'sql/testOnly *FunctionQualificationSuite 
*RelationQualificationSuite *DataSourceV2FunctionSuite'`
      - `build/sbt 'sql/testOnly *SQLQuerySuite'`
      - `build/sbt 'connect/testOnly *ProtoToParsedPlanTestSuite'`
      - `build/sbt 'sql/testOnly *SQLQueryTestSuite -- -z 
sql-session-variables.sql'`
      - Full `org.apache.spark.sql.catalyst.analysis.*`, 
`org.apache.spark.sql.catalyst.parser.*`, and 
`org.apache.spark.sql.analysis.resolver.*` suites.
    - `scalastyle` and `scalafmt` clean across catalyst, sql, and connect 
modules.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Cursor Claude Opus 4.7
    
    Closes #55647 from srielau/SPARK-56681-patch-clean-up.
    
    Authored-by: Serge Rielau <[email protected]>
    Signed-off-by: Daniel Tenedorio <[email protected]>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  35 ++++---
 .../sql/catalyst/analysis/FunctionResolution.scala |  45 +++++----
 .../sql/catalyst/analysis/RelationResolution.scala |  22 +----
 .../sql/catalyst/analysis/ResolveCatalogs.scala    |  24 +++--
 .../sql/catalyst/analysis/ResolveFetchCursor.scala |   3 +-
 .../sql/catalyst/analysis/ResolveSetVariable.scala |   6 +-
 .../sql/catalyst/analysis/VariableResolution.scala |  23 ++++-
 .../sql/catalyst/catalog/VariableManager.scala     |  13 ++-
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  19 ++--
 .../sql/connector/catalog/CatalogManager.scala     |  63 ++++++++++--
 .../spark/sql/errors/QueryCompilationErrors.scala  |  12 +--
 .../org/apache/spark/sql/internal/SQLConf.scala    |   7 ++
 .../catalyst/analysis/TableLookupCacheSuite.scala  |   3 +
 .../sql/connect/ProtoToParsedPlanTestSuite.scala   |  11 +--
 .../sql/execution/command/v2/FetchCursorExec.scala |   5 +-
 .../sql/execution/command/v2/SetVariableExec.scala |   5 +-
 .../command/v2/VariableAssignmentUtils.scala       |   5 +-
 .../SqlScriptingLocalVariableManager.scala         |  12 ++-
 .../analyzer-results/sql-session-variables.sql.out | 106 +++++++++++++++++---
 .../sql-tests/inputs/sql-session-variables.sql     |  13 +++
 .../results/sql-session-variables.sql.out          | 109 +++++++++++++++++++--
 .../scala/org/apache/spark/sql/SetPathSuite.scala  |  79 ++++++++++++++-
 .../command/AlignAssignmentsSuiteBase.scala        |   3 +
 .../execution/command/PlanResolutionSuite.scala    |   6 ++
 .../sql/scripting/SqlScriptingExecutionSuite.scala |   4 +-
 25 files changed, 492 insertions(+), 141 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 9fbfe1320aae..739ef41c7791 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -338,10 +338,7 @@ class Analyzer(
         AnalysisContext.reset()
         try {
           AnalysisHelper.markInAnalyzer {
-            sessionConf match {
-              case Some(c) => SQLConf.withExistingConf(c) { runAnalysis() }
-              case None => runAnalysis()
-            }
+            runWithSessionConf(runAnalysis())
           }
         } finally {
           AnalysisContext.reset()
@@ -349,16 +346,29 @@ class Analyzer(
       } else {
         AnalysisContext.withNewAnalysisContext {
           AnalysisHelper.markInAnalyzer {
-            sessionConf match {
-              case Some(c) => SQLConf.withExistingConf(c) { runAnalysis() }
-              case None => runAnalysis()
-            }
+            runWithSessionConf(runAnalysis())
           }
         }
       }
     }
   }
 
+  /**
+   * Runs `thunk` under the analyzer's [[sessionConf]] for analyzer isolation, 
but yields to any
+   * outer [[SQLConf.withExistingConf]] scope (e.g. a SQL UDF / view body that 
pinned the
+   * creation-time configs). Falls through unchanged when [[sessionConf]] is 
unset, or when the
+   * outer scope already installed a different conf -- otherwise the outer 
scope's conf would be
+   * silently clobbered.
+   */
+  private def runWithSessionConf[T](thunk: => T): T = sessionConf match {
+    case None => thunk
+    case Some(c) =>
+      SQLConf.getExistingConfIfSet match {
+        case Some(outer) if outer ne c => thunk
+        case _ => SQLConf.withExistingConf(c) { thunk }
+      }
+  }
+
   /**
    * Returns a copy of this analyzer that uses the given [[CatalogManager]] 
for all catalog
    * lookups. All other configuration (extended rules, checks, etc.) is 
preserved. Used by
@@ -392,13 +402,8 @@ class Analyzer(
     }
   }
 
-  private def executeSameContext(plan: LogicalPlan): LogicalPlan = sessionConf 
match {
-    // Respect explicit nested SQLConf overrides (e.g. persisted SQL UDF/view 
configs).
-    // Otherwise, run analysis with the captured session conf for analyzer 
isolation.
-    case Some(c) if SQLConf.get ne c => super.execute(plan)
-    case Some(c) => SQLConf.withExistingConf(c) { super.execute(plan) }
-    case None => super.execute(plan)
-  }
+  private def executeSameContext(plan: LogicalPlan): LogicalPlan =
+    runWithSessionConf(super.execute(plan))
 
   def resolver: Resolver = conf.resolver
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala
index 8f8c77f38fea..4721425d2cb5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala
@@ -71,13 +71,6 @@ class FunctionResolution(
 
   private val trimWarningEnabled = new AtomicBoolean(true)
 
-  /** Returns the current catalog path, preferring the view's context if 
resolving a view. */
-  private def currentCatalogPath: Seq[String] = {
-    val ctx = AnalysisContext.get.catalogAndNamespace
-    if (ctx.nonEmpty) ctx
-    else (Seq(catalogManager.currentCatalog.name) ++ 
catalogManager.currentNamespace).toSeq
-  }
-
   /** True if nameParts is 3-part and the first part is the system catalog 
name. */
   private def isSystemCatalogQualified(nameParts: Seq[String]): Boolean =
     nameParts.length == 3 &&
@@ -101,18 +94,10 @@ class FunctionResolution(
    * directly, matching [[RelationResolution.relationResolutionEntries]] so 
routine order stays
    * aligned with relation order.
    */
-  private[analysis] def sqlResolutionPathEntriesForAnalysis: Seq[Seq[String]] 
= {
-    AnalysisContext.get.resolutionPathEntries match {
-      case Some(entries) if conf.pathEnabled => entries
-      case _ =>
-        val pathDefault = currentCatalogPath
-        catalogManager.sqlResolutionPathEntries(
-          pathDefault.head,
-          pathDefault.tail.toSeq,
-          catalogManager.currentCatalog.name,
-          catalogManager.currentNamespace.toSeq)
-    }
-  }
+  private[analysis] def sqlResolutionPathEntriesForAnalysis: Seq[Seq[String]] =
+    catalogManager.resolutionPathEntriesForAnalysis(
+      AnalysisContext.get.resolutionPathEntries,
+      AnalysisContext.get.catalogAndNamespace)
 
   private def resolutionCandidates(nameParts: Seq[String]): Seq[Seq[String]] = 
{
     if (nameParts.size == 1) {
@@ -370,7 +355,20 @@ class FunctionResolution(
     if (nameParts.length == 1) {
       // Must match [[resolutionCandidates]] / [[resolveFunction]]: 
single-part names use PATH +
       // session order, not only the current namespace (LookupCatalog 
single-part rule).
-      for (candidate <- resolutionCandidates(nameParts)) {
+      // `system.session.<name>` and `system.builtin.<name>` candidates were 
already resolved by
+      // [[lookupBuiltinOrTempFunction]] / 
[[lookupBuiltinOrTempTableFunction]] above (they
+      // route through `identifierFromSystemNameParts`, which only accepts 
those two
+      // namespaces); skip them here to avoid redundant catalog calls. Other 
`system.<x>`
+      // namespaces -- if any are ever added -- still go through persistent 
lookup.
+      val persistentCandidates = resolutionCandidates(nameParts).filterNot { c 
=>
+        c.length >= 2 &&
+          c.head.equalsIgnoreCase(CatalogManager.SYSTEM_CATALOG_NAME) && {
+            val ns = c(1)
+            ns.equalsIgnoreCase(CatalogManager.SESSION_NAMESPACE) ||
+              ns.equalsIgnoreCase(CatalogManager.BUILTIN_NAMESPACE)
+          }
+      }
+      for (candidate <- persistentCandidates) {
         try {
           candidate match {
             case CatalogAndIdentifier(catalog, ident) =>
@@ -380,7 +378,12 @@ class FunctionResolution(
             case _ =>
           }
         } catch {
-          case NonFatal(_) =>
+          // Only treat explicit "not found" / "forbidden" signals as a miss. 
Any other failure
+          // (e.g. permission denied, transient catalog error) propagates.
+          case _: NoSuchFunctionException
+             | _: NoSuchNamespaceException
+             | _: CatalogNotFoundException =>
+          case e: AnalysisException if e.getCondition == "FORBIDDEN_OPERATION" 
=>
         }
       }
       return FunctionType.NotFound
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala
index 5f0da599e27e..8769d1c4e4ff 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala
@@ -130,25 +130,9 @@ class RelationResolution(
    * When PATH is disabled, legacy resolution rules apply.
    */
   private def relationResolutionEntries: Seq[Seq[String]] = {
-    val pinned = AnalysisContext.get.resolutionPathEntries
-    if (pinned.isDefined && conf.pathEnabled) {
-      pinned.get
-    } else {
-      val expandCatalog = catalogManager.currentCatalog.name
-      val expandNamespace = catalogManager.currentNamespace.toSeq
-      val (pathCatalog, pathNamespace) =
-        if (isResolvingView) {
-          val p = AnalysisContext.get.catalogAndNamespace
-          (p.head, p.tail.toSeq)
-        } else {
-          (expandCatalog, expandNamespace)
-        }
-      catalogManager.sqlResolutionPathEntries(
-        pathCatalog,
-        pathNamespace,
-        expandCatalog,
-        expandNamespace)
-    }
+    catalogManager.resolutionPathEntriesForAnalysis(
+      AnalysisContext.get.resolutionPathEntries,
+      AnalysisContext.get.catalogAndNamespace)
   }
 
   /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
index f7319e9b03e8..185a5503b110 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
@@ -44,7 +44,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
     case c @ CreateVariable(identifiers, _, _) =>
       // We resolve only UnresolvedIdentifiers, and pass on the other nodes
       val resolved = identifiers.map {
-        case UnresolvedIdentifier(nameParts, _) =>
+        case u @ UnresolvedIdentifier(nameParts, _) =>
           if (withinLocalVariableScope) {
             if (c.replace) {
               throw new AnalysisException(
@@ -67,26 +67,22 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
             val resolvedIdentifier
             = catalogManager.tempVariableManager.qualify(nameParts.last)
 
-            assertValidSessionVariableNameParts(nameParts, resolvedIdentifier)
+            assertValidSessionVariableNameParts(nameParts, resolvedIdentifier, 
u.origin)
             resolvedIdentifier
           }
         case plan => plan
       }
       c.copy(names = resolved)
 
-    case d @ DropVariable(UnresolvedIdentifier(nameParts, _), _) =>
+    case d @ DropVariable(u @ UnresolvedIdentifier(nameParts, _), _) =>
       if (withinLocalVariableScope) {
         throw new AnalysisException(
           "UNSUPPORTED_FEATURE.SQL_SCRIPTING_DROP_TEMPORARY_VARIABLE", 
Map.empty)
       }
-      if (nameParts.length == 1 &&
-          !catalogManager.sessionScopeUnqualifiedAllowed(
-            catalogManager.currentCatalog.name(),
-            catalogManager.currentNamespace.toSeq)) {
-        throw QueryCompilationErrors.unresolvedVariableError(nameParts, 
Seq("SYSTEM", "SESSION"))
-      }
+      // DDL on session variables targets `system.session` directly; the SQL 
path only applies
+      // to DML (see 
[[VariableResolution.allowUnqualifiedSessionTempVariableLookup]]).
       val resolved = catalogManager.tempVariableManager.qualify(nameParts.last)
-      assertValidSessionVariableNameParts(nameParts, resolved)
+      assertValidSessionVariableNameParts(nameParts, resolved, u.origin)
       d.copy(name = resolved)
 
     case CreateFunction(UnresolvedIdentifier(nameParts, _), _, _, _, _)
@@ -221,13 +217,15 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
 
   private def assertValidSessionVariableNameParts(
       nameParts: Seq[String],
-      resolvedIdentifier: ResolvedIdentifier): Unit = {
+      resolvedIdentifier: ResolvedIdentifier,
+      origin: Origin): Unit = {
     if (!validSessionVariableName(nameParts)) {
       throw QueryCompilationErrors.unresolvedVariableError(
         nameParts,
-        Seq(
+        Seq(Seq(
           resolvedIdentifier.catalog.name(),
-          resolvedIdentifier.identifier.namespace().head)
+          resolvedIdentifier.identifier.namespace().head)),
+        origin
       )
     }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveFetchCursor.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveFetchCursor.scala
index b47332ace2b8..34942fcf08bc 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveFetchCursor.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveFetchCursor.scala
@@ -64,7 +64,8 @@ class ResolveFetchCursor(val catalogManager: CatalogManager) 
extends Rule[Logica
           nameParts = u.nameParts
         ) match {
           case Some(variable) => variable.copy(canFold = false)
-          case _ => throw unresolvedVariableError(u.nameParts, Seq("SYSTEM", 
"SESSION"))
+          case _ => throw unresolvedVariableError(
+            u.nameParts, variableResolution.searchPathEntriesForError, 
u.origin)
         }
 
       case other => throw SparkException.internalError(
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSetVariable.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSetVariable.scala
index 6ecbc87d3553..ab80fc829cf4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSetVariable.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSetVariable.scala
@@ -62,7 +62,11 @@ class ResolveSetVariable(val catalogManager: CatalogManager) 
extends Rule[Logica
             nameParts = u.nameParts
           ) match {
             case Some(variable) => variable.copy(canFold = false)
-            case _ => throw unresolvedVariableError(u.nameParts, Seq("SYSTEM", 
"SESSION"))
+            case _ =>
+              throw unresolvedVariableError(
+                u.nameParts,
+                variableResolution.searchPathEntriesForError,
+                u.origin)
           }
 
         case other => throw SparkException.internalError(
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/VariableResolution.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/VariableResolution.scala
index f8cce0d6f821..bc85ccfee34c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/VariableResolution.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/VariableResolution.scala
@@ -43,10 +43,25 @@ class VariableResolution(
    * (PATH enabled and explicitly set).
    */
   private def allowUnqualifiedSessionTempVariableLookup(nameParts: 
Seq[String]): Boolean = {
-    if (nameParts.length != 1) return true
-    catalogManager.sessionScopeUnqualifiedAllowed(
-      catalogManager.currentCatalog.name(),
-      catalogManager.currentNamespace.toSeq)
+    nameParts.length != 1 || catalogManager.isSystemSessionOnPath
+  }
+
+  /**
+   * Search-path entries to report in `UNRESOLVED_VARIABLE` for DML lookups 
(`SET VAR`,
+   * `FETCH ... INTO`). The full SQL path is reported regardless of how the 
name was
+   * qualified, matching the convention used by `TABLE_OR_VIEW_NOT_FOUND` and
+   * `UNRESOLVED_ROUTINE`. Keeping the rendering qualification-independent 
also avoids
+   * re-shaping the error if Spark ever grows struct-field assignment, where 
2-part forms
+   * become genuinely ambiguous.
+   *
+   * DDL paths (`DECLARE` / `DROP` name validation in
+   * [[org.apache.spark.sql.catalyst.analysis.ResolveCatalogs]]) do not 
consult the SQL path
+   * and report `[system.session]` directly at their throw site.
+   */
+  def searchPathEntriesForError: Seq[Seq[String]] = {
+    catalogManager.resolutionPathEntriesForAnalysis(
+      AnalysisContext.get.resolutionPathEntries,
+      AnalysisContext.get.catalogAndNamespace)
   }
 
   /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/VariableManager.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/VariableManager.scala
index 4c7d8db6604b..e9edd45fae51 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/VariableManager.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/VariableManager.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.{FakeSystemCatalog, 
ResolvedIdentifier}
 import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.trees.Origin
 import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier}
 import 
org.apache.spark.sql.connector.catalog.CatalogManager.{SESSION_NAMESPACE, 
SYSTEM_CATALOG_NAME}
 import org.apache.spark.sql.errors.DataTypeErrorsBase
@@ -49,8 +50,11 @@ trait VariableManager {
  *
    * @param nameParts Name parts of the variable.
    * @param varDef The new VariableDefinition of the variable.
+   * @param origin Origin of the SET reference, used in
+   *               
[[org.apache.spark.sql.errors.QueryCompilationErrors.unresolvedVariableError]]
+   *               if the variable is unexpectedly absent at execution time.
    */
-  def set(nameParts: Seq[String], varDef: VariableDefinition): Unit
+  def set(nameParts: Seq[String], varDef: VariableDefinition, origin: Origin): 
Unit
 
 /**
  * Get an existing variable.
@@ -130,11 +134,14 @@ class TempVariableManager extends VariableManager with 
DataTypeErrorsBase {
     variables.put(name, varDef)
   }
 
-  override def set(nameParts: Seq[String], varDef: VariableDefinition): Unit = 
synchronized {
+  override def set(
+      nameParts: Seq[String],
+      varDef: VariableDefinition,
+      origin: Origin): Unit = synchronized {
     val name = nameParts.last
     // Sanity check as this is already checked in ResolveSetVariable.
     if (!variables.contains(name)) {
-      throw unresolvedVariableError(nameParts, Seq("SYSTEM", "SESSION"))
+      throw unresolvedVariableError(nameParts, Seq(Seq("SYSTEM", "SESSION")), 
origin)
     }
     variables.put(name, varDef)
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 2b282467b305..a9bbd34a72d1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -7140,11 +7140,13 @@ class AstBuilder extends DataTypeAstBuilder
       dataTypeOpt.map { dt => default.copy(child = Cast(default.child, dt)) 
}.getOrElse(default)
     }
     CreateVariable(
-      ctx.identifierReferences.asScala.map (
-        identifierReference => {
-          withIdentClause(identifierReference, UnresolvedIdentifier(_))
-        }
-      ).toSeq,
+      ctx.identifierReferences.asScala.map { identifierReference =>
+        // Give each `UnresolvedIdentifier` its own origin pointing at the 
variable name
+        // fragment so analyzer-time errors (e.g. UNRESOLVED_VARIABLE) can 
highlight just
+        // that identifier rather than the whole `DECLARE ...` statement.
+        withIdentClause(identifierReference, parts =>
+          withOrigin(identifierReference) { UnresolvedIdentifier(parts) })
+      }.toSeq,
       defaultExpression,
       ctx.REPLACE() != null
     )
@@ -7160,7 +7162,8 @@ class AstBuilder extends DataTypeAstBuilder
    */
   override def visitDropVariable(ctx: DropVariableContext): LogicalPlan = 
withOrigin(ctx) {
     DropVariable(
-      withIdentClause(ctx.identifierReference(), UnresolvedIdentifier(_)),
+      withIdentClause(ctx.identifierReference(), parts =>
+        withOrigin(ctx.identifierReference()) { UnresolvedIdentifier(parts) }),
       ctx.EXISTS() != null
     )
   }
@@ -7285,7 +7288,7 @@ class AstBuilder extends DataTypeAstBuilder
       // The SET variable source is a query
       val variables = multipartIdentifierList.multipartIdentifier.asScala.map 
{ variableIdent =>
         val varName = visitMultipartIdentifier(variableIdent)
-        UnresolvedAttribute(varName)
+        withOrigin(variableIdent) { UnresolvedAttribute(varName) }
       }.toSeq
       SetVariable(variables, visitQuery(query))
     } else {
@@ -7297,7 +7300,7 @@ class AstBuilder extends DataTypeAstBuilder
           case n: NamedExpression => n
           case e => Alias(e, varIdent.last)()
         }
-        (UnresolvedAttribute(varIdent), varNamedExpr)
+        (withOrigin(assign.key) { UnresolvedAttribute(varIdent) }, 
varNamedExpr)
       }.toSeq.unzip
       SetVariable(variables, Project(values, OneRowRelation()))
     }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
index 0a2ad28051dd..f73b0e68cecc 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
@@ -202,12 +202,63 @@ class CatalogManager(
       currentCatalog, currentNamespace,
       currentCatalog, currentNamespace)
 
-  /** True if [[sqlResolutionPathEntries]] includes `system.session`. */
-  def sessionScopeUnqualifiedAllowed(
-      currentCatalog: String,
-      currentNamespace: Seq[String]): Boolean =
-    sqlResolutionPathEntries(currentCatalog, currentNamespace)
-      .exists(CatalogManager.isSystemSessionPathEntry)
+  /**
+   * True if `system.session` is on the SQL path. Only literal path entries 
can match: the
+   * [[CurrentSchemaEntry]] marker expands to `currentCatalog.name() +: 
currentNamespace`, and
+   * `system` is not a registered catalog (it is a synthetic namespace served 
via
+   * [[org.apache.spark.sql.catalyst.analysis.FakeSystemCatalog]] / 
`lookupBuiltinOrTempFunction`,
+   * not loadable via [[catalog]]), so `currentCatalog.name()` cannot be 
`"system"`. If that
+   * invariant ever changes, this short-circuit must be revisited.
+   * Inspecting stored entries directly avoids loading the configured default 
catalog.
+   */
+  def isSystemSessionOnPath: Boolean = synchronized {
+    if (!conf.pathEnabled) return true
+    _sessionPath match {
+      case None => true
+      case Some(entries) => entries.exists {
+        case CatalogManager.LiteralPathEntry(parts) =>
+          CatalogManager.isSystemSessionPathEntry(parts)
+        case _ => false
+      }
+    }
+  }
+
+  /**
+   * Single source of truth for analysis-time resolution path entries used by 
relation, routine,
+   * and procedure resolution. When `pinnedEntries` are set (a view or SQL 
function body's
+   * persisted frozen path) and PATH is enabled, returns them as-is so 
unqualified lookups follow
+   * the creation-time path. Otherwise falls back to 
[[sqlResolutionPathEntries]] using the view's
+   * catalog/namespace as the path default (so unqualified names inside a view 
body see the view's
+   * home schema first), while always expanding markers like CURRENT_SCHEMA 
against the live
+   * session catalog/namespace.
+   *
+   * @param pinnedEntries persisted frozen path entries from view / SQL 
function metadata
+   *                      (typically `AnalysisContext.resolutionPathEntries`).
+   * @param viewCatalogAndNamespace the view's catalog and namespace
+   *                               (typically 
`AnalysisContext.catalogAndNamespace`); empty when
+   *                               not resolving a view body.
+   */
+  def resolutionPathEntriesForAnalysis(
+      pinnedEntries: Option[Seq[Seq[String]]],
+      viewCatalogAndNamespace: Seq[String]): Seq[Seq[String]] = {
+    pinnedEntries match {
+      case Some(entries) if conf.pathEnabled => entries
+      case _ =>
+        val expandCatalog = currentCatalog.name()
+        val expandNamespace = currentNamespace.toSeq
+        val (pathCatalog, pathNamespace) =
+          if (viewCatalogAndNamespace.nonEmpty) {
+            (viewCatalogAndNamespace.head, viewCatalogAndNamespace.tail.toSeq)
+          } else {
+            (expandCatalog, expandNamespace)
+          }
+        sqlResolutionPathEntries(
+          pathCatalog,
+          pathNamespace,
+          expandCatalog,
+          expandNamespace)
+    }
+  }
 
   private var _currentCatalogName: Option[String] = None
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 5cfdbc66e3f4..9b899867a9e3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -909,23 +909,15 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
       messageParameters = Map("dt" -> dt.toString))
   }
 
-  def unresolvedVariableError(name: Seq[String], searchPath: Seq[String]): 
Throwable = {
-    new AnalysisException(
-      errorClass = "UNRESOLVED_VARIABLE",
-      messageParameters = Map(
-        "variableName" -> toSQLId(name),
-        "searchPath" -> toSQLId(searchPath)))
-  }
-
   def unresolvedVariableError(
       name: Seq[String],
-      searchPath: Seq[String],
+      pathEntries: Seq[Seq[String]],
       origin: Origin): Throwable = {
     new AnalysisException(
       errorClass = "UNRESOLVED_VARIABLE",
       messageParameters = Map(
         "variableName" -> toSQLId(name),
-        "searchPath" -> toSQLId(searchPath)),
+        "searchPath" -> pathEntries.map(toSQLId).mkString("[", ", ", "]")),
       origin = origin)
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 29a734f05598..59958eb8da5c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -155,6 +155,13 @@ object SQLConf {
     override def initialValue: SQLConf = null
   }
 
+  /**
+   * Returns the [[SQLConf]] installed by an outer [[withExistingConf]] scope, 
or [[None]] if
+   * there is no such scope. Unlike [[get]], this peeks directly at the 
threadlocal so callers
+   * can distinguish "no outer scope" from "outer scope happens to install the 
same conf".
+   */
+  def getExistingConfIfSet: Option[SQLConf] = Option(existingConf.get())
+
   def withExistingConf[T](conf: SQLConf)(f: => T): T = {
     val old = existingConf.get()
     existingConf.set(conf)
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala
index 63d5523be072..75846aa49616 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala
@@ -83,6 +83,9 @@ class TableLookupCacheSuite extends AnalysisTest with 
Matchers {
       .thenReturn(defaultPath)
     when(catalogManager.sqlResolutionPathEntries(any[String], 
any[Seq[String]]))
       .thenReturn(defaultPath)
+    when(catalogManager.resolutionPathEntriesForAnalysis(
+      any[Option[Seq[Seq[String]]]], any[Seq[String]]))
+      .thenReturn(defaultPath)
 
     new Analyzer(catalogManager)
   }
diff --git 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
index fabbf3071c4e..913d3d1c0ff5 100644
--- 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
+++ 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
@@ -135,17 +135,16 @@ class ProtoToParsedPlanTestSuite
 
   /**
    * Isolated from [[SharedSparkSession]] so PATH / session path settings do 
not affect catalog.
+   * Cloned from the test session's conf so all sparkConf overrides (ANSI, 
alias config, etc.) are
+   * preserved automatically; only the genuine isolation knob is overridden 
explicitly.
    */
-  private val analyzerIsolationConf: SQLConf = {
-    val c = new SQLConf()
+  private lazy val analyzerIsolationConf: SQLConf = {
+    val c = spark.sessionState.conf.clone()
     c.setConf(SQLConf.PATH_ENABLED, false)
-    // Match [[sparkConf]]: a bare SQLConf defaults ANSI_ENABLED to true, 
which changes
-    // function signatures in analyzed plans (e.g. make_date) vs golden files.
-    c.setConf(SQLConf.ANSI_ENABLED, false)
     c
   }
 
-  private val analyzer = {
+  private lazy val analyzer = {
     val inMemoryCatalog = new InMemoryChangelogCatalog
     // Name must match [[CatalogManager.SESSION_CATALOG_NAME]]: path entries 
use
     // [[currentCatalog.name()]], then resolution calls 
[[catalogManager.catalog]] on that segment.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/FetchCursorExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/FetchCursorExec.scala
index ad867e653767..dc28d4a7e7f1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/FetchCursorExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/FetchCursorExec.scala
@@ -194,7 +194,8 @@ case class FetchCursorExec(
       case FakeLocalCatalog => scriptingVariableManager.get
 
       case FakeSystemCatalog if 
tempVariableManager.get(namePartsCaseAdjusted).isEmpty =>
-        throw unresolvedVariableError(namePartsCaseAdjusted, Seq("SYSTEM", 
"SESSION"))
+        throw unresolvedVariableError(
+          namePartsCaseAdjusted, Seq(Seq("SYSTEM", "SESSION")), varRef.origin)
 
       case FakeSystemCatalog => tempVariableManager
 
@@ -207,7 +208,7 @@ case class FetchCursorExec(
       Literal(value, varRef.dataType)
     )
 
-    variableManager.set(namePartsCaseAdjusted, varDef)
+    variableManager.set(namePartsCaseAdjusted, varDef, varRef.origin)
   }
 
   /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/SetVariableExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/SetVariableExec.scala
index ef8e238832b3..9861bd77616a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/SetVariableExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/SetVariableExec.scala
@@ -80,7 +80,8 @@ case class SetVariableExec(variables: Seq[VariableReference], 
query: SparkPlan)
       case FakeLocalCatalog => scriptingVariableManager.get
 
       case FakeSystemCatalog if 
tempVariableManager.get(namePartsCaseAdjusted).isEmpty =>
-        throw unresolvedVariableError(namePartsCaseAdjusted, Seq("SYSTEM", 
"SESSION"))
+        throw unresolvedVariableError(
+          namePartsCaseAdjusted, Seq(Seq("SYSTEM", "SESSION")), 
variable.origin)
 
       case FakeSystemCatalog => tempVariableManager
 
@@ -90,7 +91,7 @@ case class SetVariableExec(variables: Seq[VariableReference], 
query: SparkPlan)
     val varDef = VariableDefinition(
       variable.identifier, variable.varDef.defaultValueSQL, Literal(value, 
variable.dataType))
 
-    variableManager.set(namePartsCaseAdjusted, varDef)
+    variableManager.set(namePartsCaseAdjusted, varDef, variable.origin)
   }
 
   override def output: Seq[Attribute] = Seq.empty
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/VariableAssignmentUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/VariableAssignmentUtils.scala
index 3a4d55169d90..d99ddae538a6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/VariableAssignmentUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/VariableAssignmentUtils.scala
@@ -68,7 +68,8 @@ object VariableAssignmentUtils {
       case FakeLocalCatalog => scriptingVariableManager.get
 
       case FakeSystemCatalog if 
tempVariableManager.get(namePartsCaseAdjusted).isEmpty =>
-        throw unresolvedVariableError(namePartsCaseAdjusted, Seq("SYSTEM", 
"SESSION"))
+        throw unresolvedVariableError(
+          namePartsCaseAdjusted, Seq(Seq("SYSTEM", "SESSION")), varRef.origin)
 
       case FakeSystemCatalog => tempVariableManager
 
@@ -81,6 +82,6 @@ object VariableAssignmentUtils {
       Literal(value, varRef.dataType)
     )
 
-    variableManager.set(namePartsCaseAdjusted, varDef)
+    variableManager.set(namePartsCaseAdjusted, varDef, varRef.origin)
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingLocalVariableManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingLocalVariableManager.scala
index 0ad1974b7d76..c8a893f374f3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingLocalVariableManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingLocalVariableManager.scala
@@ -21,6 +21,7 @@ import org.apache.spark.SparkException
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.{FakeLocalCatalog, 
ResolvedIdentifier}
 import org.apache.spark.sql.catalyst.catalog.{VariableDefinition, 
VariableManager}
+import org.apache.spark.sql.catalyst.trees.Origin
 import org.apache.spark.sql.connector.catalog.Identifier
 import org.apache.spark.sql.errors.DataTypeErrorsBase
 import 
org.apache.spark.sql.errors.QueryCompilationErrors.unresolvedVariableError
@@ -47,13 +48,18 @@ class SqlScriptingLocalVariableManager(context: 
SqlScriptingExecutionContext)
     context.currentScope.variables.put(name, varDef)
   }
 
-  override def set(nameParts: Seq[String], varDef: VariableDefinition): Unit = 
{
+  override def set(
+      nameParts: Seq[String],
+      varDef: VariableDefinition,
+      origin: Origin): Unit = {
     val scope = findScopeOfVariable(nameParts)
       .getOrElse(
-        throw unresolvedVariableError(nameParts, 
varDef.identifier.namespace().toIndexedSeq))
+        throw unresolvedVariableError(
+          nameParts, Seq(varDef.identifier.namespace().toIndexedSeq), origin))
 
     if (!scope.variables.contains(nameParts.last)) {
-      throw unresolvedVariableError(nameParts, 
varDef.identifier.namespace().toIndexedSeq)
+      throw unresolvedVariableError(
+        nameParts, Seq(varDef.identifier.namespace().toIndexedSeq), origin)
     }
 
     scope.variables.put(nameParts.last, varDef)
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/sql-session-variables.sql.out
 
b/sql/core/src/test/resources/sql-tests/analyzer-results/sql-session-variables.sql.out
index fdb7c8adf282..4b6fc5d45014 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/sql-session-variables.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/sql-session-variables.sql.out
@@ -504,9 +504,58 @@ Project [scalar-subquery#x [title#x] AS 
scalarsubquery(title)#xL]
 
 
 -- !query
-SET VARIABLE title = 'Test qualifiers - fail'
+SET VARIABLE title = 'Dropped struct variable -- field access vs qualified 
name'
 -- !query analysis
 SetVariable [variablereference(system.session.title='Test variable in 
aggregate')]
++- Project [Dropped struct variable -- field access vs qualified name AS 
title#x]
+   +- OneRowRelation
+
+
+-- !query
+DECLARE OR REPLACE VARIABLE session STRUCT<a INT> = NAMED_STRUCT('a', 1)
+-- !query analysis
+CreateVariable default(cast(named_struct(a, 1) as struct<a:int>), 
sql='NAMED_STRUCT('a', 1)'), true
++- ResolvedIdentifier 
org.apache.spark.sql.catalyst.analysis.FakeSystemCatalog$@xxxxxxxx, 
session.session
+
+
+-- !query
+SELECT session.a
+-- !query analysis
+Project [variablereference(system.session.session=NAMED_STRUCT('a', 1)).a AS 
a#x]
++- OneRowRelation
+
+
+-- !query
+DROP TEMPORARY VARIABLE session
+-- !query analysis
+DropVariable false
++- ResolvedIdentifier 
org.apache.spark.sql.catalyst.analysis.FakeSystemCatalog$@xxxxxxxx, 
session.session
+
+
+-- !query
+SELECT session.a
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "UNRESOLVED_COLUMN.WITHOUT_SUGGESTION",
+  "sqlState" : "42703",
+  "messageParameters" : {
+    "objectName" : "`session`.`a`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 16,
+    "fragment" : "session.a"
+  } ]
+}
+
+
+-- !query
+SET VARIABLE title = 'Test qualifiers - fail'
+-- !query analysis
+SetVariable [variablereference(system.session.title='Dropped struct variable 
-- field access vs qualified name')]
 +- Project [Test qualifiers - fail AS title#x]
    +- OneRowRelation
 
@@ -519,9 +568,16 @@ org.apache.spark.sql.AnalysisException
   "errorClass" : "UNRESOLVED_VARIABLE",
   "sqlState" : "42883",
   "messageParameters" : {
-    "searchPath" : "`system`.`session`",
+    "searchPath" : "[`system`.`session`]",
     "variableName" : "`builtin`.`var1`"
-  }
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 29,
+    "stopIndex" : 40,
+    "fragment" : "builtin.var1"
+  } ]
 }
 
 
@@ -533,9 +589,16 @@ org.apache.spark.sql.AnalysisException
   "errorClass" : "UNRESOLVED_VARIABLE",
   "sqlState" : "42883",
   "messageParameters" : {
-    "searchPath" : "`system`.`session`",
+    "searchPath" : "[`system`.`session`]",
     "variableName" : "`system`.`sesion`.`var1`"
-  }
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 29,
+    "stopIndex" : 46,
+    "fragment" : "system.sesion.var1"
+  } ]
 }
 
 
@@ -547,9 +610,16 @@ org.apache.spark.sql.AnalysisException
   "errorClass" : "UNRESOLVED_VARIABLE",
   "sqlState" : "42883",
   "messageParameters" : {
-    "searchPath" : "`system`.`session`",
+    "searchPath" : "[`system`.`session`]",
     "variableName" : "`sys`.`session`.`var1`"
-  }
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 29,
+    "stopIndex" : 44,
+    "fragment" : "sys.session.var1"
+  } ]
 }
 
 
@@ -648,9 +718,16 @@ org.apache.spark.sql.AnalysisException
   "errorClass" : "UNRESOLVED_VARIABLE",
   "sqlState" : "42883",
   "messageParameters" : {
-    "searchPath" : "`SYSTEM`.`SESSION`",
+    "searchPath" : "[`system`.`builtin`, `system`.`session`, 
`spark_catalog`.`default`]",
     "variableName" : "`ses`.`var1`"
-  }
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 14,
+    "stopIndex" : 21,
+    "fragment" : "ses.var1"
+  } ]
 }
 
 
@@ -662,9 +739,16 @@ org.apache.spark.sql.AnalysisException
   "errorClass" : "UNRESOLVED_VARIABLE",
   "sqlState" : "42883",
   "messageParameters" : {
-    "searchPath" : "`SYSTEM`.`SESSION`",
+    "searchPath" : "[`system`.`builtin`, `system`.`session`, 
`spark_catalog`.`default`]",
     "variableName" : "`builtn`.`session`.`var1`"
-  }
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 14,
+    "stopIndex" : 32,
+    "fragment" : "builtn.session.var1"
+  } ]
 }
 
 
diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/sql-session-variables.sql 
b/sql/core/src/test/resources/sql-tests/inputs/sql-session-variables.sql
index 2e4eaa1f8f6c..86cd70cfbf98 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/sql-session-variables.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/sql-session-variables.sql
@@ -83,6 +83,19 @@ DROP TEMPORARY VARIABLE var1;
 SET VARIABLE title = 'Test variable in aggregate';
 SELECT (SELECT MAX(id) FROM RANGE(10) WHERE id < title) FROM VALUES 1, 2 AS 
t(title);
 
+SET VARIABLE title = 'Dropped struct variable -- field access vs qualified 
name';
+-- `session.a` is ambiguous: (a) 2-part qualified variable, or (b) field `a` 
of a 1-part
+-- variable `session`. Variable resolution tries (a) first via longest match, 
falls back to
+-- (b). With `session` declared as a struct, (b) succeeds. After the variable 
is dropped,
+-- both interpretations fail and the SELECT falls through to column 
resolution, which
+-- reports `UNRESOLVED_COLUMN`. Because either interpretation could have been 
intended,
+-- the variable error path (when reached) must dump the full SQL path -- see
+-- `VariableResolution.searchPathEntriesForError`.
+DECLARE OR REPLACE VARIABLE session STRUCT<a INT> = NAMED_STRUCT('a', 1);
+SELECT session.a;
+DROP TEMPORARY VARIABLE session;
+SELECT session.a;
+
 SET VARIABLE title = 'Test qualifiers - fail';
 DECLARE OR REPLACE VARIABLE builtin.var1 INT;
 DECLARE OR REPLACE VARIABLE system.sesion.var1 INT;
diff --git 
a/sql/core/src/test/resources/sql-tests/results/sql-session-variables.sql.out 
b/sql/core/src/test/resources/sql-tests/results/sql-session-variables.sql.out
index de8d6743fc76..3357f2e52630 100644
--- 
a/sql/core/src/test/resources/sql-tests/results/sql-session-variables.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/sql-session-variables.sql.out
@@ -561,6 +561,60 @@ struct<scalarsubquery(title):bigint>
 1
 
 
+-- !query
+SET VARIABLE title = 'Dropped struct variable -- field access vs qualified 
name'
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+DECLARE OR REPLACE VARIABLE session STRUCT<a INT> = NAMED_STRUCT('a', 1)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT session.a
+-- !query schema
+struct<a:int>
+-- !query output
+1
+
+
+-- !query
+DROP TEMPORARY VARIABLE session
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT session.a
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "UNRESOLVED_COLUMN.WITHOUT_SUGGESTION",
+  "sqlState" : "42703",
+  "messageParameters" : {
+    "objectName" : "`session`.`a`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 16,
+    "fragment" : "session.a"
+  } ]
+}
+
+
 -- !query
 SET VARIABLE title = 'Test qualifiers - fail'
 -- !query schema
@@ -579,9 +633,16 @@ org.apache.spark.sql.AnalysisException
   "errorClass" : "UNRESOLVED_VARIABLE",
   "sqlState" : "42883",
   "messageParameters" : {
-    "searchPath" : "`system`.`session`",
+    "searchPath" : "[`system`.`session`]",
     "variableName" : "`builtin`.`var1`"
-  }
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 29,
+    "stopIndex" : 40,
+    "fragment" : "builtin.var1"
+  } ]
 }
 
 
@@ -595,9 +656,16 @@ org.apache.spark.sql.AnalysisException
   "errorClass" : "UNRESOLVED_VARIABLE",
   "sqlState" : "42883",
   "messageParameters" : {
-    "searchPath" : "`system`.`session`",
+    "searchPath" : "[`system`.`session`]",
     "variableName" : "`system`.`sesion`.`var1`"
-  }
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 29,
+    "stopIndex" : 46,
+    "fragment" : "system.sesion.var1"
+  } ]
 }
 
 
@@ -611,9 +679,16 @@ org.apache.spark.sql.AnalysisException
   "errorClass" : "UNRESOLVED_VARIABLE",
   "sqlState" : "42883",
   "messageParameters" : {
-    "searchPath" : "`system`.`session`",
+    "searchPath" : "[`system`.`session`]",
     "variableName" : "`sys`.`session`.`var1`"
-  }
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 29,
+    "stopIndex" : 44,
+    "fragment" : "sys.session.var1"
+  } ]
 }
 
 
@@ -723,9 +798,16 @@ org.apache.spark.sql.AnalysisException
   "errorClass" : "UNRESOLVED_VARIABLE",
   "sqlState" : "42883",
   "messageParameters" : {
-    "searchPath" : "`SYSTEM`.`SESSION`",
+    "searchPath" : "[`system`.`builtin`, `system`.`session`, 
`spark_catalog`.`default`]",
     "variableName" : "`ses`.`var1`"
-  }
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 14,
+    "stopIndex" : 21,
+    "fragment" : "ses.var1"
+  } ]
 }
 
 
@@ -739,9 +821,16 @@ org.apache.spark.sql.AnalysisException
   "errorClass" : "UNRESOLVED_VARIABLE",
   "sqlState" : "42883",
   "messageParameters" : {
-    "searchPath" : "`SYSTEM`.`SESSION`",
+    "searchPath" : "[`system`.`builtin`, `system`.`session`, 
`spark_catalog`.`default`]",
     "variableName" : "`builtn`.`session`.`var1`"
-  }
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 14,
+    "stopIndex" : 32,
+    "fragment" : "builtn.session.var1"
+  } ]
 }
 
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala
index d33a65fb73c8..ad6bcd414966 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala
@@ -372,22 +372,31 @@ class SetPathSuite extends SharedSparkSession {
     }
   }
 
-  test("PATH enabled: unqualified session variable lookup follows PATH") {
+  test("PATH enabled: unqualified SET VAR follows PATH; DDL on variables 
ignores PATH") {
     withPathEnabled {
       sql("DECLARE VARIABLE system.session.path_var_gate = 7")
       try {
         sql("SET PATH = spark_catalog.default")
         checkError(
           exception = intercept[AnalysisException] {
-            sql("DROP TEMPORARY VARIABLE path_var_gate")
+            sql("SET VAR path_var_gate = 8")
           },
           condition = "UNRESOLVED_VARIABLE",
           sqlState = "42883",
           parameters = Map(
             "variableName" -> "`path_var_gate`",
-            "searchPath" -> "`SYSTEM`.`SESSION`"))
+            "searchPath" -> "[`spark_catalog`.`default`]"),
+          context = ExpectedContext("path_var_gate", 8, 20))
 
+        sql("SET VAR system.session.path_var_gate = 9")
+        checkAnswer(sql("SELECT system.session.path_var_gate"), Row(9))
+
+        sql("DROP TEMPORARY VARIABLE path_var_gate")
+
+        sql("DECLARE VARIABLE system.session.path_var_gate = 7")
         sql("SET PATH = spark_catalog.default, system.session")
+        sql("SET VAR path_var_gate = 11")
+        checkAnswer(sql("SELECT path_var_gate"), Row(11))
         sql("DROP TEMPORARY VARIABLE path_var_gate")
       } finally {
         sql("DROP TEMPORARY VARIABLE IF EXISTS system.session.path_var_gate")
@@ -395,6 +404,70 @@ class SetPathSuite extends SharedSparkSession {
     }
   }
 
+  test("PATH enabled: unqualified FETCH ... INTO follows PATH") {
+    withSQLConf(
+      SQLConf.PATH_ENABLED.key -> "true",
+      SQLConf.SQL_SCRIPTING_CURSOR_ENABLED.key -> "true") {
+      sql("DECLARE OR REPLACE VARIABLE path_fetch_target INT")
+      try {
+        // Sanity: FETCH INTO works under the default path (system.session is 
on it).
+        val ok = sql(
+          """
+            |BEGIN
+            |  DECLARE cur CURSOR FOR SELECT 42 AS val;
+            |  OPEN cur;
+            |  FETCH cur INTO path_fetch_target;
+            |  CLOSE cur;
+            |END;
+            |""".stripMargin)
+        checkAnswer(ok, Seq.empty[Row])
+        checkAnswer(sql("SELECT path_fetch_target"), Row(42))
+
+        // Set PATH to exclude system.session: unqualified FETCH INTO target 
now fails
+        // with the actual SQL path rendered as a bracketed list.
+        sql("SET PATH = spark_catalog.default")
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql(
+              """
+                |BEGIN
+                |  DECLARE cur CURSOR FOR SELECT 99 AS val;
+                |  OPEN cur;
+                |  FETCH cur INTO path_fetch_target;
+                |  CLOSE cur;
+                |END;
+                |""".stripMargin)
+          },
+          condition = "UNRESOLVED_VARIABLE",
+          sqlState = "42883",
+          parameters = Map(
+            "variableName" -> "`path_fetch_target`",
+            "searchPath" -> "[`spark_catalog`.`default`]"),
+          context = ExpectedContext("path_fetch_target", -1, -1))
+      } finally {
+        sql("SET PATH = DEFAULT_PATH")
+        sql("DROP TEMPORARY VARIABLE IF EXISTS path_fetch_target")
+      }
+    }
+  }
+
+  test("PATH enabled: DECLARE / SET VAR / DROP cycle under non-default PATH") {
+    withPathEnabled {
+      sql("CREATE SCHEMA IF NOT EXISTS path_var_cycle")
+      try {
+        sql("SET PATH = spark_catalog.path_var_cycle, system.session")
+        sql("DECLARE OR REPLACE VARIABLE cycle_var = 1")
+        sql("SET VAR system.session.cycle_var = 2")
+        sql("SET VAR cycle_var = 3")
+        checkAnswer(sql("SELECT cycle_var"), Row(3))
+        sql("DROP TEMPORARY VARIABLE cycle_var")
+      } finally {
+        sql("DROP TEMPORARY VARIABLE IF EXISTS system.session.cycle_var")
+        sql("DROP SCHEMA IF EXISTS path_var_cycle")
+      }
+    }
+  }
+
   test("PATH enabled: current_path does not accept arguments") {
     withPathEnabled {
       // Ensure built-in function lookup succeeds so this assertion targets 
arg-count semantics.
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignAssignmentsSuiteBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignAssignmentsSuiteBase.scala
index 1a6dc178b6e5..c88ebb0d69ee 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignAssignmentsSuiteBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignAssignmentsSuiteBase.scala
@@ -195,6 +195,9 @@ abstract class AlignAssignmentsSuiteBase extends 
AnalysisTest {
       .thenReturn(defaultPath)
     when(manager.sqlResolutionPathEntries(any[String], any[Seq[String]]))
       .thenReturn(defaultPath)
+    when(manager.resolutionPathEntriesForAnalysis(
+      any[Option[Seq[Seq[String]]]], any[Seq[String]]))
+      .thenReturn(defaultPath)
     manager
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 907aa895a562..cd917a817f7f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -243,6 +243,9 @@ class PlanResolutionSuite extends SharedSparkSession with 
AnalysisTest {
       .thenReturn(defaultPath)
     when(manager.sqlResolutionPathEntries(any[String], any[Seq[String]]))
       .thenReturn(defaultPath)
+    when(manager.resolutionPathEntriesForAnalysis(
+      any[Option[Seq[Seq[String]]]], any[Seq[String]]))
+      .thenReturn(defaultPath)
     manager
   }
 
@@ -269,6 +272,9 @@ class PlanResolutionSuite extends SharedSparkSession with 
AnalysisTest {
       .thenReturn(defaultPath2)
     when(manager.sqlResolutionPathEntries(any[String], any[Seq[String]]))
       .thenReturn(defaultPath2)
+    when(manager.resolutionPathEntriesForAnalysis(
+      any[Option[Seq[Seq[String]]]], any[Seq[String]]))
+      .thenReturn(defaultPath2)
     manager
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionSuite.scala
index 9a691d4430ef..9e9991774992 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionSuite.scala
@@ -3691,7 +3691,9 @@ class SqlScriptingExecutionSuite extends 
SharedSparkSession {
       sqlState = "42883",
       parameters = Map(
         "variableName" -> toSQLId("LOCALVAR"),
-        "searchPath" -> toSQLId("SYSTEM.SESSION"))
+        "searchPath" ->
+          "[`system`.`builtin`, `system`.`session`, 
`spark_catalog`.`default`]"),
+      context = ExpectedContext("LOCALVAR", 54, 61)
     )
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to