This is an automated email from the ASF dual-hosted git repository. cloud-fan pushed a commit to branch branch-4.x in repository https://gitbox.apache.org/repos/asf/spark.git
commit b99151889ce28f58b20a7ab6d2e45ba24ef99994 Author: Serge Rielau <[email protected]> AuthorDate: Mon May 4 10:32:50 2026 -0700 [SPARK-56681][SQL] PATH cleanup ### What changes were proposed in this pull request? Address the open follow-ups from [SPARK-56681](https://issues.apache.org/jira/browse/SPARK-56681) (umbrella for PATH / SPARK-56605 cleanup) in a single cleanup PR. Items #1 and #2 were already wired by SPARK-56639; this PR covers the remainder. | # | Item | Resolution | |---|---|---| | #1 | `FunctionResolution.resolveProcedure` was dead code | Already wired by SPARK-56639 (no action). | | #2 | Frozen view / SQL-function PATH wiring unfinished | Already done by SPARK-56639 (no action). | | #3 | `AnalysisContext.resolutionPathEntries` threadlocal | Audit only: confirmed `withNewAnalysisContext` / `reset()` correctly clear it. Full removal needs a coordinated refactor to plumb the path through `RelationResolution` / `FunctionResolution` method calls; flagged as a follow-up. | | #4 | `Analyzer.executeAndCheck` clobbers outer `SQLConf.withExistingConf` | Extracted `runWithSessionConf` helper, added `SQLConf.getExistingConfIfSet`. `executeAndCheck` and `executeSameContext` now share one path that yields to any outer scope. | | #5 | `VariableResolution.allowUnqualifiedSessionTempVariableLookup` force-loads default catalog | Replaced the hot-path catalog read with `CatalogManager.isSystemSessionOnPath`, which inspects stored session-path entries directly. No catalog load on column resolution. | | #6 | `DROP VARIABLE` PATH gate asymmetric with `DECLARE` / `CREATE` | Removed the gate. DDL on session variables (`DECLARE` / `CREATE` / `DROP`) always targets `system.session` directly; only DML (`SET VAR`, `SELECT x`) goes through PATH. | | #7 | `lookupFunctionType` exception swallow too broad | Narrowed from `NonFatal` to the explicit not-found list (`NoSuchFunctionException`, `NoSuchNamespaceException`, `CatalogNotFoundException`, `FORBIDDEN_OPERATION`). Other exceptions propagate. | | #8 | `lookupFunctionType` fan-out had wasteful `system.*` candidates | Filtered them out — `system.session`, `system.builtin`, `system.ai` are already resolved earlier in the same method. | | #9 | Three near-duplicate path-resolution helpers | Lifted into `CatalogManager.resolutionPathEntriesForAnalysis(pinnedEntries, viewCatalogAndNamespace)`. Relation, routine, and procedure resolution all route through it. | | #10 | Tests for the new error paths and gates | Added a DECLARE / SET VAR / DROP cycle test under non-default PATH and a struct-variable field-vs-qualified ambiguity test in `sql-session-variables.sql`. | | #11 | `ProtoToParsedPlanTestSuite.analyzerIsolationConf` was a bare `SQLConf` | Clone `spark.sessionState.conf` and only override `PATH_ENABLED=false`, so all `sparkConf` overrides (ANSI, alias config, ...) propagate automatically. | | Bonus | `ResolveSetVariable` hardcoded `SYSTEM.SESSION` regardless of actual PATH | `unresolvedVariableError` now takes `Seq[Seq[String]]` path entries with **required** `Origin` (no overloads). DML lookup failures (`SET VAR`, `FETCH ... INTO`) report the full SQL path as a bracketed list, byte-for-byte consistent with `UNRESOLVED_ROUTINE` and `TABLE_OR_VIEW_NOT_FOUND`. DDL name validation in `ResolveCatalogs` continues to report `[system.session]` since PATH does not apply there. O [...] ### Why are the changes needed? These are the cleanup items called out on SPARK-56681 from the post-merge source review of SPARK-56605. They eliminate dead code paths, plug user-visible bugs (force-loading a misconfigured default catalog on column resolution; clobbering pinned session configs; swallowing real catalog errors as `UNRESOLVED_ROUTINE`), remove the asymmetry between DDL and DML on session variables, and make `UNRESOLVED_VARIABLE` self-consistent with the other "not found" errors. ### Does this PR introduce _any_ user-facing change? Yes. - **`UNRESOLVED_VARIABLE.searchPath`** is now rendered as a bracketed list. For DML lookups (`SET VAR`, `FETCH ... INTO`), the list reflects the actual SQL PATH that was consulted instead of a hardcoded `SYSTEM.SESSION`. For DDL name validation (`DECLARE` / `DROP` with a non-session namespace), the list is `[`` `system`.`session` ``]` since PATH does not apply. - **`UNRESOLVED_VARIABLE`** now always carries a `queryContext` that highlights just the offending variable identifier (e.g. `"builtin.var1"`, `"ses.var1"`), not the whole `DECLARE` / `SET VAR` statement. - **`DROP TEMPORARY VARIABLE`** no longer raises `UNRESOLVED_VARIABLE` when the SQL PATH does not contain `system.session`. DDL on session variables ignores PATH, matching the existing behaviour of `DECLARE OR REPLACE VARIABLE`. - **`lookupFunctionType`** no longer swallows non–`NotFound` errors. A catalog reporting `PERMISSION_DENIED` (or similar) for a function lookup now propagates instead of silently producing `UNRESOLVED_ROUTINE`. ### How was this patch tested? - Added `sql-session-variables.sql` regression test for the struct-variable field-vs-qualified ambiguity (`DECLARE VARIABLE session STRUCT<a INT>` → `SELECT session.a` succeeds → `DROP` → `SELECT session.a` falls through to `UNRESOLVED_COLUMN`). - Updated `SetPathSuite`: DECLARE / SET VAR / DROP cycle under a non-default PATH; bonus test asserts the actual rendered search path and the variable-identifier `queryContext`. - Updated `SqlScriptingExecutionSuite` for the new bracketed `searchPath` and identifier-pinned `queryContext`. - Regenerated `sql-session-variables.sql.out` for the new error shape. - Added `resolutionPathEntriesForAnalysis` stubs to mocked `CatalogManager` instances in `PlanResolutionSuite`, `AlignAssignmentsSuiteBase`, and `TableLookupCacheSuite`. - Ran focused suites locally; all pass: - `build/sbt 'sql/testOnly *SetPathSuite *SqlScriptingExecutionSuite *ExecuteImmediateEndToEndSuite'` - `build/sbt 'sql/testOnly *SimpleSQLViewSuite *SQLFunctionSuite'` - `build/sbt 'sql/testOnly *PlanResolutionSuite *UpdateTableAlignAssignmentsSuite *MergeIntoTableAlignAssignmentsSuite'` - `build/sbt 'catalyst/testOnly *TableLookupCacheSuite *AnalysisSuite *AnalysisErrorSuite *LookupFunctionsSuite'` - `build/sbt 'sql/testOnly *FunctionQualificationSuite *RelationQualificationSuite *DataSourceV2FunctionSuite'` - `build/sbt 'sql/testOnly *SQLQuerySuite'` - `build/sbt 'connect/testOnly *ProtoToParsedPlanTestSuite'` - `build/sbt 'sql/testOnly *SQLQueryTestSuite -- -z sql-session-variables.sql'` - Full `org.apache.spark.sql.catalyst.analysis.*`, `org.apache.spark.sql.catalyst.parser.*`, and `org.apache.spark.sql.analysis.resolver.*` suites. - `scalastyle` and `scalafmt` clean across catalyst, sql, and connect modules. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Cursor Claude Opus 4.7 Closes #55647 from srielau/SPARK-56681-patch-clean-up. Authored-by: Serge Rielau <[email protected]> Signed-off-by: Daniel Tenedorio <[email protected]> --- .../spark/sql/catalyst/analysis/Analyzer.scala | 35 ++++--- .../sql/catalyst/analysis/FunctionResolution.scala | 45 +++++---- .../sql/catalyst/analysis/RelationResolution.scala | 22 +---- .../sql/catalyst/analysis/ResolveCatalogs.scala | 24 +++-- .../sql/catalyst/analysis/ResolveFetchCursor.scala | 3 +- .../sql/catalyst/analysis/ResolveSetVariable.scala | 6 +- .../sql/catalyst/analysis/VariableResolution.scala | 23 ++++- .../sql/catalyst/catalog/VariableManager.scala | 13 ++- .../spark/sql/catalyst/parser/AstBuilder.scala | 19 ++-- .../sql/connector/catalog/CatalogManager.scala | 63 ++++++++++-- .../spark/sql/errors/QueryCompilationErrors.scala | 12 +-- .../org/apache/spark/sql/internal/SQLConf.scala | 7 ++ .../catalyst/analysis/TableLookupCacheSuite.scala | 3 + .../sql/connect/ProtoToParsedPlanTestSuite.scala | 11 +-- .../sql/execution/command/v2/FetchCursorExec.scala | 5 +- .../sql/execution/command/v2/SetVariableExec.scala | 5 +- .../command/v2/VariableAssignmentUtils.scala | 5 +- .../SqlScriptingLocalVariableManager.scala | 12 ++- .../analyzer-results/sql-session-variables.sql.out | 106 +++++++++++++++++--- .../sql-tests/inputs/sql-session-variables.sql | 13 +++ .../results/sql-session-variables.sql.out | 109 +++++++++++++++++++-- .../scala/org/apache/spark/sql/SetPathSuite.scala | 79 ++++++++++++++- .../command/AlignAssignmentsSuiteBase.scala | 3 + .../execution/command/PlanResolutionSuite.scala | 6 ++ .../sql/scripting/SqlScriptingExecutionSuite.scala | 4 +- 25 files changed, 492 insertions(+), 141 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index faa78e030636..f3ab3b59dd32 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -338,10 +338,7 @@ class Analyzer( AnalysisContext.reset() try { AnalysisHelper.markInAnalyzer { - sessionConf match { - case Some(c) => SQLConf.withExistingConf(c) { runAnalysis() } - case None => runAnalysis() - } + runWithSessionConf(runAnalysis()) } } finally { AnalysisContext.reset() @@ -349,16 +346,29 @@ class Analyzer( } else { AnalysisContext.withNewAnalysisContext { AnalysisHelper.markInAnalyzer { - sessionConf match { - case Some(c) => SQLConf.withExistingConf(c) { runAnalysis() } - case None => runAnalysis() - } + runWithSessionConf(runAnalysis()) } } } } } + /** + * Runs `thunk` under the analyzer's [[sessionConf]] for analyzer isolation, but yields to any + * outer [[SQLConf.withExistingConf]] scope (e.g. a SQL UDF / view body that pinned the + * creation-time configs). Falls through unchanged when [[sessionConf]] is unset, or when the + * outer scope already installed a different conf -- otherwise the outer scope's conf would be + * silently clobbered. + */ + private def runWithSessionConf[T](thunk: => T): T = sessionConf match { + case None => thunk + case Some(c) => + SQLConf.getExistingConfIfSet match { + case Some(outer) if outer ne c => thunk + case _ => SQLConf.withExistingConf(c) { thunk } + } + } + /** * Returns a copy of this analyzer that uses the given [[CatalogManager]] for all catalog * lookups. All other configuration (extended rules, checks, etc.) is preserved. Used by @@ -392,13 +402,8 @@ class Analyzer( } } - private def executeSameContext(plan: LogicalPlan): LogicalPlan = sessionConf match { - // Respect explicit nested SQLConf overrides (e.g. persisted SQL UDF/view configs). - // Otherwise, run analysis with the captured session conf for analyzer isolation. - case Some(c) if SQLConf.get ne c => super.execute(plan) - case Some(c) => SQLConf.withExistingConf(c) { super.execute(plan) } - case None => super.execute(plan) - } + private def executeSameContext(plan: LogicalPlan): LogicalPlan = + runWithSessionConf(super.execute(plan)) def resolver: Resolver = conf.resolver diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala index 8f8c77f38fea..4721425d2cb5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala @@ -71,13 +71,6 @@ class FunctionResolution( private val trimWarningEnabled = new AtomicBoolean(true) - /** Returns the current catalog path, preferring the view's context if resolving a view. */ - private def currentCatalogPath: Seq[String] = { - val ctx = AnalysisContext.get.catalogAndNamespace - if (ctx.nonEmpty) ctx - else (Seq(catalogManager.currentCatalog.name) ++ catalogManager.currentNamespace).toSeq - } - /** True if nameParts is 3-part and the first part is the system catalog name. */ private def isSystemCatalogQualified(nameParts: Seq[String]): Boolean = nameParts.length == 3 && @@ -101,18 +94,10 @@ class FunctionResolution( * directly, matching [[RelationResolution.relationResolutionEntries]] so routine order stays * aligned with relation order. */ - private[analysis] def sqlResolutionPathEntriesForAnalysis: Seq[Seq[String]] = { - AnalysisContext.get.resolutionPathEntries match { - case Some(entries) if conf.pathEnabled => entries - case _ => - val pathDefault = currentCatalogPath - catalogManager.sqlResolutionPathEntries( - pathDefault.head, - pathDefault.tail.toSeq, - catalogManager.currentCatalog.name, - catalogManager.currentNamespace.toSeq) - } - } + private[analysis] def sqlResolutionPathEntriesForAnalysis: Seq[Seq[String]] = + catalogManager.resolutionPathEntriesForAnalysis( + AnalysisContext.get.resolutionPathEntries, + AnalysisContext.get.catalogAndNamespace) private def resolutionCandidates(nameParts: Seq[String]): Seq[Seq[String]] = { if (nameParts.size == 1) { @@ -370,7 +355,20 @@ class FunctionResolution( if (nameParts.length == 1) { // Must match [[resolutionCandidates]] / [[resolveFunction]]: single-part names use PATH + // session order, not only the current namespace (LookupCatalog single-part rule). - for (candidate <- resolutionCandidates(nameParts)) { + // `system.session.<name>` and `system.builtin.<name>` candidates were already resolved by + // [[lookupBuiltinOrTempFunction]] / [[lookupBuiltinOrTempTableFunction]] above (they + // route through `identifierFromSystemNameParts`, which only accepts those two + // namespaces); skip them here to avoid redundant catalog calls. Other `system.<x>` + // namespaces -- if any are ever added -- still go through persistent lookup. + val persistentCandidates = resolutionCandidates(nameParts).filterNot { c => + c.length >= 2 && + c.head.equalsIgnoreCase(CatalogManager.SYSTEM_CATALOG_NAME) && { + val ns = c(1) + ns.equalsIgnoreCase(CatalogManager.SESSION_NAMESPACE) || + ns.equalsIgnoreCase(CatalogManager.BUILTIN_NAMESPACE) + } + } + for (candidate <- persistentCandidates) { try { candidate match { case CatalogAndIdentifier(catalog, ident) => @@ -380,7 +378,12 @@ class FunctionResolution( case _ => } } catch { - case NonFatal(_) => + // Only treat explicit "not found" / "forbidden" signals as a miss. Any other failure + // (e.g. permission denied, transient catalog error) propagates. + case _: NoSuchFunctionException + | _: NoSuchNamespaceException + | _: CatalogNotFoundException => + case e: AnalysisException if e.getCondition == "FORBIDDEN_OPERATION" => } } return FunctionType.NotFound diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala index ef5862547574..2a3ed248aa6d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala @@ -130,25 +130,9 @@ class RelationResolution( * When PATH is disabled, legacy resolution rules apply. */ private def relationResolutionEntries: Seq[Seq[String]] = { - val pinned = AnalysisContext.get.resolutionPathEntries - if (pinned.isDefined && conf.pathEnabled) { - pinned.get - } else { - val expandCatalog = catalogManager.currentCatalog.name - val expandNamespace = catalogManager.currentNamespace.toSeq - val (pathCatalog, pathNamespace) = - if (isResolvingView) { - val p = AnalysisContext.get.catalogAndNamespace - (p.head, p.tail.toSeq) - } else { - (expandCatalog, expandNamespace) - } - catalogManager.sqlResolutionPathEntries( - pathCatalog, - pathNamespace, - expandCatalog, - expandNamespace) - } + catalogManager.resolutionPathEntriesForAnalysis( + AnalysisContext.get.resolutionPathEntries, + AnalysisContext.get.catalogAndNamespace) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index f7319e9b03e8..185a5503b110 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -44,7 +44,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case c @ CreateVariable(identifiers, _, _) => // We resolve only UnresolvedIdentifiers, and pass on the other nodes val resolved = identifiers.map { - case UnresolvedIdentifier(nameParts, _) => + case u @ UnresolvedIdentifier(nameParts, _) => if (withinLocalVariableScope) { if (c.replace) { throw new AnalysisException( @@ -67,26 +67,22 @@ class ResolveCatalogs(val catalogManager: CatalogManager) val resolvedIdentifier = catalogManager.tempVariableManager.qualify(nameParts.last) - assertValidSessionVariableNameParts(nameParts, resolvedIdentifier) + assertValidSessionVariableNameParts(nameParts, resolvedIdentifier, u.origin) resolvedIdentifier } case plan => plan } c.copy(names = resolved) - case d @ DropVariable(UnresolvedIdentifier(nameParts, _), _) => + case d @ DropVariable(u @ UnresolvedIdentifier(nameParts, _), _) => if (withinLocalVariableScope) { throw new AnalysisException( "UNSUPPORTED_FEATURE.SQL_SCRIPTING_DROP_TEMPORARY_VARIABLE", Map.empty) } - if (nameParts.length == 1 && - !catalogManager.sessionScopeUnqualifiedAllowed( - catalogManager.currentCatalog.name(), - catalogManager.currentNamespace.toSeq)) { - throw QueryCompilationErrors.unresolvedVariableError(nameParts, Seq("SYSTEM", "SESSION")) - } + // DDL on session variables targets `system.session` directly; the SQL path only applies + // to DML (see [[VariableResolution.allowUnqualifiedSessionTempVariableLookup]]). val resolved = catalogManager.tempVariableManager.qualify(nameParts.last) - assertValidSessionVariableNameParts(nameParts, resolved) + assertValidSessionVariableNameParts(nameParts, resolved, u.origin) d.copy(name = resolved) case CreateFunction(UnresolvedIdentifier(nameParts, _), _, _, _, _) @@ -221,13 +217,15 @@ class ResolveCatalogs(val catalogManager: CatalogManager) private def assertValidSessionVariableNameParts( nameParts: Seq[String], - resolvedIdentifier: ResolvedIdentifier): Unit = { + resolvedIdentifier: ResolvedIdentifier, + origin: Origin): Unit = { if (!validSessionVariableName(nameParts)) { throw QueryCompilationErrors.unresolvedVariableError( nameParts, - Seq( + Seq(Seq( resolvedIdentifier.catalog.name(), - resolvedIdentifier.identifier.namespace().head) + resolvedIdentifier.identifier.namespace().head)), + origin ) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveFetchCursor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveFetchCursor.scala index b47332ace2b8..34942fcf08bc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveFetchCursor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveFetchCursor.scala @@ -64,7 +64,8 @@ class ResolveFetchCursor(val catalogManager: CatalogManager) extends Rule[Logica nameParts = u.nameParts ) match { case Some(variable) => variable.copy(canFold = false) - case _ => throw unresolvedVariableError(u.nameParts, Seq("SYSTEM", "SESSION")) + case _ => throw unresolvedVariableError( + u.nameParts, variableResolution.searchPathEntriesForError, u.origin) } case other => throw SparkException.internalError( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSetVariable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSetVariable.scala index 6ecbc87d3553..ab80fc829cf4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSetVariable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSetVariable.scala @@ -62,7 +62,11 @@ class ResolveSetVariable(val catalogManager: CatalogManager) extends Rule[Logica nameParts = u.nameParts ) match { case Some(variable) => variable.copy(canFold = false) - case _ => throw unresolvedVariableError(u.nameParts, Seq("SYSTEM", "SESSION")) + case _ => + throw unresolvedVariableError( + u.nameParts, + variableResolution.searchPathEntriesForError, + u.origin) } case other => throw SparkException.internalError( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/VariableResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/VariableResolution.scala index f8cce0d6f821..bc85ccfee34c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/VariableResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/VariableResolution.scala @@ -43,10 +43,25 @@ class VariableResolution( * (PATH enabled and explicitly set). */ private def allowUnqualifiedSessionTempVariableLookup(nameParts: Seq[String]): Boolean = { - if (nameParts.length != 1) return true - catalogManager.sessionScopeUnqualifiedAllowed( - catalogManager.currentCatalog.name(), - catalogManager.currentNamespace.toSeq) + nameParts.length != 1 || catalogManager.isSystemSessionOnPath + } + + /** + * Search-path entries to report in `UNRESOLVED_VARIABLE` for DML lookups (`SET VAR`, + * `FETCH ... INTO`). The full SQL path is reported regardless of how the name was + * qualified, matching the convention used by `TABLE_OR_VIEW_NOT_FOUND` and + * `UNRESOLVED_ROUTINE`. Keeping the rendering qualification-independent also avoids + * re-shaping the error if Spark ever grows struct-field assignment, where 2-part forms + * become genuinely ambiguous. + * + * DDL paths (`DECLARE` / `DROP` name validation in + * [[org.apache.spark.sql.catalyst.analysis.ResolveCatalogs]]) do not consult the SQL path + * and report `[system.session]` directly at their throw site. + */ + def searchPathEntriesForError: Seq[Seq[String]] = { + catalogManager.resolutionPathEntriesForAnalysis( + AnalysisContext.get.resolutionPathEntries, + AnalysisContext.get.catalogAndNamespace) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/VariableManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/VariableManager.scala index 4c7d8db6604b..e9edd45fae51 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/VariableManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/VariableManager.scala @@ -24,6 +24,7 @@ import scala.collection.mutable import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{FakeSystemCatalog, ResolvedIdentifier} import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.catalyst.trees.Origin import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier} import org.apache.spark.sql.connector.catalog.CatalogManager.{SESSION_NAMESPACE, SYSTEM_CATALOG_NAME} import org.apache.spark.sql.errors.DataTypeErrorsBase @@ -49,8 +50,11 @@ trait VariableManager { * * @param nameParts Name parts of the variable. * @param varDef The new VariableDefinition of the variable. + * @param origin Origin of the SET reference, used in + * [[org.apache.spark.sql.errors.QueryCompilationErrors.unresolvedVariableError]] + * if the variable is unexpectedly absent at execution time. */ - def set(nameParts: Seq[String], varDef: VariableDefinition): Unit + def set(nameParts: Seq[String], varDef: VariableDefinition, origin: Origin): Unit /** * Get an existing variable. @@ -130,11 +134,14 @@ class TempVariableManager extends VariableManager with DataTypeErrorsBase { variables.put(name, varDef) } - override def set(nameParts: Seq[String], varDef: VariableDefinition): Unit = synchronized { + override def set( + nameParts: Seq[String], + varDef: VariableDefinition, + origin: Origin): Unit = synchronized { val name = nameParts.last // Sanity check as this is already checked in ResolveSetVariable. if (!variables.contains(name)) { - throw unresolvedVariableError(nameParts, Seq("SYSTEM", "SESSION")) + throw unresolvedVariableError(nameParts, Seq(Seq("SYSTEM", "SESSION")), origin) } variables.put(name, varDef) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 2b282467b305..a9bbd34a72d1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -7140,11 +7140,13 @@ class AstBuilder extends DataTypeAstBuilder dataTypeOpt.map { dt => default.copy(child = Cast(default.child, dt)) }.getOrElse(default) } CreateVariable( - ctx.identifierReferences.asScala.map ( - identifierReference => { - withIdentClause(identifierReference, UnresolvedIdentifier(_)) - } - ).toSeq, + ctx.identifierReferences.asScala.map { identifierReference => + // Give each `UnresolvedIdentifier` its own origin pointing at the variable name + // fragment so analyzer-time errors (e.g. UNRESOLVED_VARIABLE) can highlight just + // that identifier rather than the whole `DECLARE ...` statement. + withIdentClause(identifierReference, parts => + withOrigin(identifierReference) { UnresolvedIdentifier(parts) }) + }.toSeq, defaultExpression, ctx.REPLACE() != null ) @@ -7160,7 +7162,8 @@ class AstBuilder extends DataTypeAstBuilder */ override def visitDropVariable(ctx: DropVariableContext): LogicalPlan = withOrigin(ctx) { DropVariable( - withIdentClause(ctx.identifierReference(), UnresolvedIdentifier(_)), + withIdentClause(ctx.identifierReference(), parts => + withOrigin(ctx.identifierReference()) { UnresolvedIdentifier(parts) }), ctx.EXISTS() != null ) } @@ -7285,7 +7288,7 @@ class AstBuilder extends DataTypeAstBuilder // The SET variable source is a query val variables = multipartIdentifierList.multipartIdentifier.asScala.map { variableIdent => val varName = visitMultipartIdentifier(variableIdent) - UnresolvedAttribute(varName) + withOrigin(variableIdent) { UnresolvedAttribute(varName) } }.toSeq SetVariable(variables, visitQuery(query)) } else { @@ -7297,7 +7300,7 @@ class AstBuilder extends DataTypeAstBuilder case n: NamedExpression => n case e => Alias(e, varIdent.last)() } - (UnresolvedAttribute(varIdent), varNamedExpr) + (withOrigin(assign.key) { UnresolvedAttribute(varIdent) }, varNamedExpr) }.toSeq.unzip SetVariable(variables, Project(values, OneRowRelation())) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index 0a2ad28051dd..f73b0e68cecc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -202,12 +202,63 @@ class CatalogManager( currentCatalog, currentNamespace, currentCatalog, currentNamespace) - /** True if [[sqlResolutionPathEntries]] includes `system.session`. */ - def sessionScopeUnqualifiedAllowed( - currentCatalog: String, - currentNamespace: Seq[String]): Boolean = - sqlResolutionPathEntries(currentCatalog, currentNamespace) - .exists(CatalogManager.isSystemSessionPathEntry) + /** + * True if `system.session` is on the SQL path. Only literal path entries can match: the + * [[CurrentSchemaEntry]] marker expands to `currentCatalog.name() +: currentNamespace`, and + * `system` is not a registered catalog (it is a synthetic namespace served via + * [[org.apache.spark.sql.catalyst.analysis.FakeSystemCatalog]] / `lookupBuiltinOrTempFunction`, + * not loadable via [[catalog]]), so `currentCatalog.name()` cannot be `"system"`. If that + * invariant ever changes, this short-circuit must be revisited. + * Inspecting stored entries directly avoids loading the configured default catalog. + */ + def isSystemSessionOnPath: Boolean = synchronized { + if (!conf.pathEnabled) return true + _sessionPath match { + case None => true + case Some(entries) => entries.exists { + case CatalogManager.LiteralPathEntry(parts) => + CatalogManager.isSystemSessionPathEntry(parts) + case _ => false + } + } + } + + /** + * Single source of truth for analysis-time resolution path entries used by relation, routine, + * and procedure resolution. When `pinnedEntries` are set (a view or SQL function body's + * persisted frozen path) and PATH is enabled, returns them as-is so unqualified lookups follow + * the creation-time path. Otherwise falls back to [[sqlResolutionPathEntries]] using the view's + * catalog/namespace as the path default (so unqualified names inside a view body see the view's + * home schema first), while always expanding markers like CURRENT_SCHEMA against the live + * session catalog/namespace. + * + * @param pinnedEntries persisted frozen path entries from view / SQL function metadata + * (typically `AnalysisContext.resolutionPathEntries`). + * @param viewCatalogAndNamespace the view's catalog and namespace + * (typically `AnalysisContext.catalogAndNamespace`); empty when + * not resolving a view body. + */ + def resolutionPathEntriesForAnalysis( + pinnedEntries: Option[Seq[Seq[String]]], + viewCatalogAndNamespace: Seq[String]): Seq[Seq[String]] = { + pinnedEntries match { + case Some(entries) if conf.pathEnabled => entries + case _ => + val expandCatalog = currentCatalog.name() + val expandNamespace = currentNamespace.toSeq + val (pathCatalog, pathNamespace) = + if (viewCatalogAndNamespace.nonEmpty) { + (viewCatalogAndNamespace.head, viewCatalogAndNamespace.tail.toSeq) + } else { + (expandCatalog, expandNamespace) + } + sqlResolutionPathEntries( + pathCatalog, + pathNamespace, + expandCatalog, + expandNamespace) + } + } private var _currentCatalogName: Option[String] = None diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 5cfdbc66e3f4..9b899867a9e3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -909,23 +909,15 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat messageParameters = Map("dt" -> dt.toString)) } - def unresolvedVariableError(name: Seq[String], searchPath: Seq[String]): Throwable = { - new AnalysisException( - errorClass = "UNRESOLVED_VARIABLE", - messageParameters = Map( - "variableName" -> toSQLId(name), - "searchPath" -> toSQLId(searchPath))) - } - def unresolvedVariableError( name: Seq[String], - searchPath: Seq[String], + pathEntries: Seq[Seq[String]], origin: Origin): Throwable = { new AnalysisException( errorClass = "UNRESOLVED_VARIABLE", messageParameters = Map( "variableName" -> toSQLId(name), - "searchPath" -> toSQLId(searchPath)), + "searchPath" -> pathEntries.map(toSQLId).mkString("[", ", ", "]")), origin = origin) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 03d6fe96c33b..d03920d539f7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -155,6 +155,13 @@ object SQLConf { override def initialValue: SQLConf = null } + /** + * Returns the [[SQLConf]] installed by an outer [[withExistingConf]] scope, or [[None]] if + * there is no such scope. Unlike [[get]], this peeks directly at the threadlocal so callers + * can distinguish "no outer scope" from "outer scope happens to install the same conf". + */ + def getExistingConfIfSet: Option[SQLConf] = Option(existingConf.get()) + def withExistingConf[T](conf: SQLConf)(f: => T): T = { val old = existingConf.get() existingConf.set(conf) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala index 63d5523be072..75846aa49616 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala @@ -83,6 +83,9 @@ class TableLookupCacheSuite extends AnalysisTest with Matchers { .thenReturn(defaultPath) when(catalogManager.sqlResolutionPathEntries(any[String], any[Seq[String]])) .thenReturn(defaultPath) + when(catalogManager.resolutionPathEntriesForAnalysis( + any[Option[Seq[Seq[String]]]], any[Seq[String]])) + .thenReturn(defaultPath) new Analyzer(catalogManager) } diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala index 8f20b277ddeb..8cfa219d370c 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala @@ -132,17 +132,16 @@ class ProtoToParsedPlanTestSuite extends SharedSparkSession with ResourceHelper /** * Isolated from [[SharedSparkSession]] so PATH / session path settings do not affect catalog. + * Cloned from the test session's conf so all sparkConf overrides (ANSI, alias config, etc.) are + * preserved automatically; only the genuine isolation knob is overridden explicitly. */ - private val analyzerIsolationConf: SQLConf = { - val c = new SQLConf() + private lazy val analyzerIsolationConf: SQLConf = { + val c = spark.sessionState.conf.clone() c.setConf(SQLConf.PATH_ENABLED, false) - // Match [[sparkConf]]: a bare SQLConf defaults ANSI_ENABLED to true, which changes - // function signatures in analyzed plans (e.g. make_date) vs golden files. - c.setConf(SQLConf.ANSI_ENABLED, false) c } - private val analyzer = { + private lazy val analyzer = { val inMemoryCatalog = new InMemoryChangelogCatalog // Name must match [[CatalogManager.SESSION_CATALOG_NAME]]: path entries use // [[currentCatalog.name()]], then resolution calls [[catalogManager.catalog]] on that segment. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/FetchCursorExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/FetchCursorExec.scala index ad867e653767..dc28d4a7e7f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/FetchCursorExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/FetchCursorExec.scala @@ -194,7 +194,8 @@ case class FetchCursorExec( case FakeLocalCatalog => scriptingVariableManager.get case FakeSystemCatalog if tempVariableManager.get(namePartsCaseAdjusted).isEmpty => - throw unresolvedVariableError(namePartsCaseAdjusted, Seq("SYSTEM", "SESSION")) + throw unresolvedVariableError( + namePartsCaseAdjusted, Seq(Seq("SYSTEM", "SESSION")), varRef.origin) case FakeSystemCatalog => tempVariableManager @@ -207,7 +208,7 @@ case class FetchCursorExec( Literal(value, varRef.dataType) ) - variableManager.set(namePartsCaseAdjusted, varDef) + variableManager.set(namePartsCaseAdjusted, varDef, varRef.origin) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/SetVariableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/SetVariableExec.scala index ef8e238832b3..9861bd77616a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/SetVariableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/SetVariableExec.scala @@ -80,7 +80,8 @@ case class SetVariableExec(variables: Seq[VariableReference], query: SparkPlan) case FakeLocalCatalog => scriptingVariableManager.get case FakeSystemCatalog if tempVariableManager.get(namePartsCaseAdjusted).isEmpty => - throw unresolvedVariableError(namePartsCaseAdjusted, Seq("SYSTEM", "SESSION")) + throw unresolvedVariableError( + namePartsCaseAdjusted, Seq(Seq("SYSTEM", "SESSION")), variable.origin) case FakeSystemCatalog => tempVariableManager @@ -90,7 +91,7 @@ case class SetVariableExec(variables: Seq[VariableReference], query: SparkPlan) val varDef = VariableDefinition( variable.identifier, variable.varDef.defaultValueSQL, Literal(value, variable.dataType)) - variableManager.set(namePartsCaseAdjusted, varDef) + variableManager.set(namePartsCaseAdjusted, varDef, variable.origin) } override def output: Seq[Attribute] = Seq.empty diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/VariableAssignmentUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/VariableAssignmentUtils.scala index 3a4d55169d90..d99ddae538a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/VariableAssignmentUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/v2/VariableAssignmentUtils.scala @@ -68,7 +68,8 @@ object VariableAssignmentUtils { case FakeLocalCatalog => scriptingVariableManager.get case FakeSystemCatalog if tempVariableManager.get(namePartsCaseAdjusted).isEmpty => - throw unresolvedVariableError(namePartsCaseAdjusted, Seq("SYSTEM", "SESSION")) + throw unresolvedVariableError( + namePartsCaseAdjusted, Seq(Seq("SYSTEM", "SESSION")), varRef.origin) case FakeSystemCatalog => tempVariableManager @@ -81,6 +82,6 @@ object VariableAssignmentUtils { Literal(value, varRef.dataType) ) - variableManager.set(namePartsCaseAdjusted, varDef) + variableManager.set(namePartsCaseAdjusted, varDef, varRef.origin) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingLocalVariableManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingLocalVariableManager.scala index 0ad1974b7d76..c8a893f374f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingLocalVariableManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingLocalVariableManager.scala @@ -21,6 +21,7 @@ import org.apache.spark.SparkException import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{FakeLocalCatalog, ResolvedIdentifier} import org.apache.spark.sql.catalyst.catalog.{VariableDefinition, VariableManager} +import org.apache.spark.sql.catalyst.trees.Origin import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.errors.DataTypeErrorsBase import org.apache.spark.sql.errors.QueryCompilationErrors.unresolvedVariableError @@ -47,13 +48,18 @@ class SqlScriptingLocalVariableManager(context: SqlScriptingExecutionContext) context.currentScope.variables.put(name, varDef) } - override def set(nameParts: Seq[String], varDef: VariableDefinition): Unit = { + override def set( + nameParts: Seq[String], + varDef: VariableDefinition, + origin: Origin): Unit = { val scope = findScopeOfVariable(nameParts) .getOrElse( - throw unresolvedVariableError(nameParts, varDef.identifier.namespace().toIndexedSeq)) + throw unresolvedVariableError( + nameParts, Seq(varDef.identifier.namespace().toIndexedSeq), origin)) if (!scope.variables.contains(nameParts.last)) { - throw unresolvedVariableError(nameParts, varDef.identifier.namespace().toIndexedSeq) + throw unresolvedVariableError( + nameParts, Seq(varDef.identifier.namespace().toIndexedSeq), origin) } scope.variables.put(nameParts.last, varDef) diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/sql-session-variables.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/sql-session-variables.sql.out index fdb7c8adf282..4b6fc5d45014 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/sql-session-variables.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/sql-session-variables.sql.out @@ -504,9 +504,58 @@ Project [scalar-subquery#x [title#x] AS scalarsubquery(title)#xL] -- !query -SET VARIABLE title = 'Test qualifiers - fail' +SET VARIABLE title = 'Dropped struct variable -- field access vs qualified name' -- !query analysis SetVariable [variablereference(system.session.title='Test variable in aggregate')] ++- Project [Dropped struct variable -- field access vs qualified name AS title#x] + +- OneRowRelation + + +-- !query +DECLARE OR REPLACE VARIABLE session STRUCT<a INT> = NAMED_STRUCT('a', 1) +-- !query analysis +CreateVariable default(cast(named_struct(a, 1) as struct<a:int>), sql='NAMED_STRUCT('a', 1)'), true ++- ResolvedIdentifier org.apache.spark.sql.catalyst.analysis.FakeSystemCatalog$@xxxxxxxx, session.session + + +-- !query +SELECT session.a +-- !query analysis +Project [variablereference(system.session.session=NAMED_STRUCT('a', 1)).a AS a#x] ++- OneRowRelation + + +-- !query +DROP TEMPORARY VARIABLE session +-- !query analysis +DropVariable false ++- ResolvedIdentifier org.apache.spark.sql.catalyst.analysis.FakeSystemCatalog$@xxxxxxxx, session.session + + +-- !query +SELECT session.a +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "UNRESOLVED_COLUMN.WITHOUT_SUGGESTION", + "sqlState" : "42703", + "messageParameters" : { + "objectName" : "`session`.`a`" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 16, + "fragment" : "session.a" + } ] +} + + +-- !query +SET VARIABLE title = 'Test qualifiers - fail' +-- !query analysis +SetVariable [variablereference(system.session.title='Dropped struct variable -- field access vs qualified name')] +- Project [Test qualifiers - fail AS title#x] +- OneRowRelation @@ -519,9 +568,16 @@ org.apache.spark.sql.AnalysisException "errorClass" : "UNRESOLVED_VARIABLE", "sqlState" : "42883", "messageParameters" : { - "searchPath" : "`system`.`session`", + "searchPath" : "[`system`.`session`]", "variableName" : "`builtin`.`var1`" - } + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 29, + "stopIndex" : 40, + "fragment" : "builtin.var1" + } ] } @@ -533,9 +589,16 @@ org.apache.spark.sql.AnalysisException "errorClass" : "UNRESOLVED_VARIABLE", "sqlState" : "42883", "messageParameters" : { - "searchPath" : "`system`.`session`", + "searchPath" : "[`system`.`session`]", "variableName" : "`system`.`sesion`.`var1`" - } + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 29, + "stopIndex" : 46, + "fragment" : "system.sesion.var1" + } ] } @@ -547,9 +610,16 @@ org.apache.spark.sql.AnalysisException "errorClass" : "UNRESOLVED_VARIABLE", "sqlState" : "42883", "messageParameters" : { - "searchPath" : "`system`.`session`", + "searchPath" : "[`system`.`session`]", "variableName" : "`sys`.`session`.`var1`" - } + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 29, + "stopIndex" : 44, + "fragment" : "sys.session.var1" + } ] } @@ -648,9 +718,16 @@ org.apache.spark.sql.AnalysisException "errorClass" : "UNRESOLVED_VARIABLE", "sqlState" : "42883", "messageParameters" : { - "searchPath" : "`SYSTEM`.`SESSION`", + "searchPath" : "[`system`.`builtin`, `system`.`session`, `spark_catalog`.`default`]", "variableName" : "`ses`.`var1`" - } + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 14, + "stopIndex" : 21, + "fragment" : "ses.var1" + } ] } @@ -662,9 +739,16 @@ org.apache.spark.sql.AnalysisException "errorClass" : "UNRESOLVED_VARIABLE", "sqlState" : "42883", "messageParameters" : { - "searchPath" : "`SYSTEM`.`SESSION`", + "searchPath" : "[`system`.`builtin`, `system`.`session`, `spark_catalog`.`default`]", "variableName" : "`builtn`.`session`.`var1`" - } + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 14, + "stopIndex" : 32, + "fragment" : "builtn.session.var1" + } ] } diff --git a/sql/core/src/test/resources/sql-tests/inputs/sql-session-variables.sql b/sql/core/src/test/resources/sql-tests/inputs/sql-session-variables.sql index 2e4eaa1f8f6c..86cd70cfbf98 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/sql-session-variables.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/sql-session-variables.sql @@ -83,6 +83,19 @@ DROP TEMPORARY VARIABLE var1; SET VARIABLE title = 'Test variable in aggregate'; SELECT (SELECT MAX(id) FROM RANGE(10) WHERE id < title) FROM VALUES 1, 2 AS t(title); +SET VARIABLE title = 'Dropped struct variable -- field access vs qualified name'; +-- `session.a` is ambiguous: (a) 2-part qualified variable, or (b) field `a` of a 1-part +-- variable `session`. Variable resolution tries (a) first via longest match, falls back to +-- (b). With `session` declared as a struct, (b) succeeds. After the variable is dropped, +-- both interpretations fail and the SELECT falls through to column resolution, which +-- reports `UNRESOLVED_COLUMN`. Because either interpretation could have been intended, +-- the variable error path (when reached) must dump the full SQL path -- see +-- `VariableResolution.searchPathEntriesForError`. +DECLARE OR REPLACE VARIABLE session STRUCT<a INT> = NAMED_STRUCT('a', 1); +SELECT session.a; +DROP TEMPORARY VARIABLE session; +SELECT session.a; + SET VARIABLE title = 'Test qualifiers - fail'; DECLARE OR REPLACE VARIABLE builtin.var1 INT; DECLARE OR REPLACE VARIABLE system.sesion.var1 INT; diff --git a/sql/core/src/test/resources/sql-tests/results/sql-session-variables.sql.out b/sql/core/src/test/resources/sql-tests/results/sql-session-variables.sql.out index de8d6743fc76..3357f2e52630 100644 --- a/sql/core/src/test/resources/sql-tests/results/sql-session-variables.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/sql-session-variables.sql.out @@ -561,6 +561,60 @@ struct<scalarsubquery(title):bigint> 1 +-- !query +SET VARIABLE title = 'Dropped struct variable -- field access vs qualified name' +-- !query schema +struct<> +-- !query output + + + +-- !query +DECLARE OR REPLACE VARIABLE session STRUCT<a INT> = NAMED_STRUCT('a', 1) +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT session.a +-- !query schema +struct<a:int> +-- !query output +1 + + +-- !query +DROP TEMPORARY VARIABLE session +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT session.a +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "UNRESOLVED_COLUMN.WITHOUT_SUGGESTION", + "sqlState" : "42703", + "messageParameters" : { + "objectName" : "`session`.`a`" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 16, + "fragment" : "session.a" + } ] +} + + -- !query SET VARIABLE title = 'Test qualifiers - fail' -- !query schema @@ -579,9 +633,16 @@ org.apache.spark.sql.AnalysisException "errorClass" : "UNRESOLVED_VARIABLE", "sqlState" : "42883", "messageParameters" : { - "searchPath" : "`system`.`session`", + "searchPath" : "[`system`.`session`]", "variableName" : "`builtin`.`var1`" - } + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 29, + "stopIndex" : 40, + "fragment" : "builtin.var1" + } ] } @@ -595,9 +656,16 @@ org.apache.spark.sql.AnalysisException "errorClass" : "UNRESOLVED_VARIABLE", "sqlState" : "42883", "messageParameters" : { - "searchPath" : "`system`.`session`", + "searchPath" : "[`system`.`session`]", "variableName" : "`system`.`sesion`.`var1`" - } + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 29, + "stopIndex" : 46, + "fragment" : "system.sesion.var1" + } ] } @@ -611,9 +679,16 @@ org.apache.spark.sql.AnalysisException "errorClass" : "UNRESOLVED_VARIABLE", "sqlState" : "42883", "messageParameters" : { - "searchPath" : "`system`.`session`", + "searchPath" : "[`system`.`session`]", "variableName" : "`sys`.`session`.`var1`" - } + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 29, + "stopIndex" : 44, + "fragment" : "sys.session.var1" + } ] } @@ -723,9 +798,16 @@ org.apache.spark.sql.AnalysisException "errorClass" : "UNRESOLVED_VARIABLE", "sqlState" : "42883", "messageParameters" : { - "searchPath" : "`SYSTEM`.`SESSION`", + "searchPath" : "[`system`.`builtin`, `system`.`session`, `spark_catalog`.`default`]", "variableName" : "`ses`.`var1`" - } + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 14, + "stopIndex" : 21, + "fragment" : "ses.var1" + } ] } @@ -739,9 +821,16 @@ org.apache.spark.sql.AnalysisException "errorClass" : "UNRESOLVED_VARIABLE", "sqlState" : "42883", "messageParameters" : { - "searchPath" : "`SYSTEM`.`SESSION`", + "searchPath" : "[`system`.`builtin`, `system`.`session`, `spark_catalog`.`default`]", "variableName" : "`builtn`.`session`.`var1`" - } + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 14, + "stopIndex" : 32, + "fragment" : "builtn.session.var1" + } ] } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala index d33a65fb73c8..ad6bcd414966 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala @@ -372,22 +372,31 @@ class SetPathSuite extends SharedSparkSession { } } - test("PATH enabled: unqualified session variable lookup follows PATH") { + test("PATH enabled: unqualified SET VAR follows PATH; DDL on variables ignores PATH") { withPathEnabled { sql("DECLARE VARIABLE system.session.path_var_gate = 7") try { sql("SET PATH = spark_catalog.default") checkError( exception = intercept[AnalysisException] { - sql("DROP TEMPORARY VARIABLE path_var_gate") + sql("SET VAR path_var_gate = 8") }, condition = "UNRESOLVED_VARIABLE", sqlState = "42883", parameters = Map( "variableName" -> "`path_var_gate`", - "searchPath" -> "`SYSTEM`.`SESSION`")) + "searchPath" -> "[`spark_catalog`.`default`]"), + context = ExpectedContext("path_var_gate", 8, 20)) + sql("SET VAR system.session.path_var_gate = 9") + checkAnswer(sql("SELECT system.session.path_var_gate"), Row(9)) + + sql("DROP TEMPORARY VARIABLE path_var_gate") + + sql("DECLARE VARIABLE system.session.path_var_gate = 7") sql("SET PATH = spark_catalog.default, system.session") + sql("SET VAR path_var_gate = 11") + checkAnswer(sql("SELECT path_var_gate"), Row(11)) sql("DROP TEMPORARY VARIABLE path_var_gate") } finally { sql("DROP TEMPORARY VARIABLE IF EXISTS system.session.path_var_gate") @@ -395,6 +404,70 @@ class SetPathSuite extends SharedSparkSession { } } + test("PATH enabled: unqualified FETCH ... INTO follows PATH") { + withSQLConf( + SQLConf.PATH_ENABLED.key -> "true", + SQLConf.SQL_SCRIPTING_CURSOR_ENABLED.key -> "true") { + sql("DECLARE OR REPLACE VARIABLE path_fetch_target INT") + try { + // Sanity: FETCH INTO works under the default path (system.session is on it). + val ok = sql( + """ + |BEGIN + | DECLARE cur CURSOR FOR SELECT 42 AS val; + | OPEN cur; + | FETCH cur INTO path_fetch_target; + | CLOSE cur; + |END; + |""".stripMargin) + checkAnswer(ok, Seq.empty[Row]) + checkAnswer(sql("SELECT path_fetch_target"), Row(42)) + + // Set PATH to exclude system.session: unqualified FETCH INTO target now fails + // with the actual SQL path rendered as a bracketed list. + sql("SET PATH = spark_catalog.default") + checkError( + exception = intercept[AnalysisException] { + sql( + """ + |BEGIN + | DECLARE cur CURSOR FOR SELECT 99 AS val; + | OPEN cur; + | FETCH cur INTO path_fetch_target; + | CLOSE cur; + |END; + |""".stripMargin) + }, + condition = "UNRESOLVED_VARIABLE", + sqlState = "42883", + parameters = Map( + "variableName" -> "`path_fetch_target`", + "searchPath" -> "[`spark_catalog`.`default`]"), + context = ExpectedContext("path_fetch_target", -1, -1)) + } finally { + sql("SET PATH = DEFAULT_PATH") + sql("DROP TEMPORARY VARIABLE IF EXISTS path_fetch_target") + } + } + } + + test("PATH enabled: DECLARE / SET VAR / DROP cycle under non-default PATH") { + withPathEnabled { + sql("CREATE SCHEMA IF NOT EXISTS path_var_cycle") + try { + sql("SET PATH = spark_catalog.path_var_cycle, system.session") + sql("DECLARE OR REPLACE VARIABLE cycle_var = 1") + sql("SET VAR system.session.cycle_var = 2") + sql("SET VAR cycle_var = 3") + checkAnswer(sql("SELECT cycle_var"), Row(3)) + sql("DROP TEMPORARY VARIABLE cycle_var") + } finally { + sql("DROP TEMPORARY VARIABLE IF EXISTS system.session.cycle_var") + sql("DROP SCHEMA IF EXISTS path_var_cycle") + } + } + } + test("PATH enabled: current_path does not accept arguments") { withPathEnabled { // Ensure built-in function lookup succeeds so this assertion targets arg-count semantics. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignAssignmentsSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignAssignmentsSuiteBase.scala index 1a6dc178b6e5..c88ebb0d69ee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignAssignmentsSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignAssignmentsSuiteBase.scala @@ -195,6 +195,9 @@ abstract class AlignAssignmentsSuiteBase extends AnalysisTest { .thenReturn(defaultPath) when(manager.sqlResolutionPathEntries(any[String], any[Seq[String]])) .thenReturn(defaultPath) + when(manager.resolutionPathEntriesForAnalysis( + any[Option[Seq[Seq[String]]]], any[Seq[String]])) + .thenReturn(defaultPath) manager } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 907aa895a562..cd917a817f7f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -243,6 +243,9 @@ class PlanResolutionSuite extends SharedSparkSession with AnalysisTest { .thenReturn(defaultPath) when(manager.sqlResolutionPathEntries(any[String], any[Seq[String]])) .thenReturn(defaultPath) + when(manager.resolutionPathEntriesForAnalysis( + any[Option[Seq[Seq[String]]]], any[Seq[String]])) + .thenReturn(defaultPath) manager } @@ -269,6 +272,9 @@ class PlanResolutionSuite extends SharedSparkSession with AnalysisTest { .thenReturn(defaultPath2) when(manager.sqlResolutionPathEntries(any[String], any[Seq[String]])) .thenReturn(defaultPath2) + when(manager.resolutionPathEntriesForAnalysis( + any[Option[Seq[Seq[String]]]], any[Seq[String]])) + .thenReturn(defaultPath2) manager } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionSuite.scala index 9a691d4430ef..9e9991774992 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionSuite.scala @@ -3691,7 +3691,9 @@ class SqlScriptingExecutionSuite extends SharedSparkSession { sqlState = "42883", parameters = Map( "variableName" -> toSQLId("LOCALVAR"), - "searchPath" -> toSQLId("SYSTEM.SESSION")) + "searchPath" -> + "[`system`.`builtin`, `system`.`session`, `spark_catalog`.`default`]"), + context = ExpectedContext("LOCALVAR", 54, 61) ) } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
