This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new ace3c69ef18 [SPARK-41154][SQL][3.3] Incorrect relation caching for queries with time travel spec ace3c69ef18 is described below commit ace3c69ef18d648bb15855c40c8b7e44987dede4 Author: ulysses-you <ulyssesyo...@gmail.com> AuthorDate: Tue Nov 22 10:07:17 2022 +0800 [SPARK-41154][SQL][3.3] Incorrect relation caching for queries with time travel spec backport https://github.com/apache/spark/pull/38687 for branch-3.3 ### What changes were proposed in this pull request? Add TimeTravelSpec to the key of relation cache in AnalysisContext. ### Why are the changes needed? Correct the relation resolution for the same table but different TimeTravelSpec. ### Does this PR introduce _any_ user-facing change? yes, bug fix ### How was this patch tested? add test Closes #38741 from ulysses-you/time-travel-spec-3.3. Authored-by: ulysses-you <ulyssesyo...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 11 ++++++----- .../spark/sql/connector/DataSourceV2SQLSuite.scala | 17 +++++++++++++++++ 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 2a2fe6f2957..0c68dd8839d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -112,9 +112,9 @@ object FakeV2SessionCatalog extends TableCatalog with FunctionCatalog { * @param nestedViewDepth The nested depth in the view resolution, this enables us to limit the * depth of nested views. * @param maxNestedViewDepth The maximum allowed depth of nested view resolution. - * @param relationCache A mapping from qualified table names to resolved relations. This can ensure - * that the table is resolved only once if a table is used multiple times - * in a query. + * @param relationCache A mapping from qualified table names and time travel spec to resolved + * relations. This can ensure that the table is resolved only once if a table + * is used multiple times in a query. * @param referredTempViewNames All the temp view names referred by the current view we are * resolving. It's used to make sure the relation resolution is * consistent between view creation and view resolution. For example, @@ -128,7 +128,8 @@ case class AnalysisContext( catalogAndNamespace: Seq[String] = Nil, nestedViewDepth: Int = 0, maxNestedViewDepth: Int = -1, - relationCache: mutable.Map[Seq[String], LogicalPlan] = mutable.Map.empty, + relationCache: mutable.Map[(Seq[String], Option[TimeTravelSpec]), LogicalPlan] = + mutable.Map.empty, referredTempViewNames: Seq[Seq[String]] = Seq.empty, // 1. If we are resolving a view, this field will be restored from the view metadata, // by calling `AnalysisContext.withAnalysisContext(viewDesc)`. @@ -1188,7 +1189,7 @@ class Analyzer(override val catalogManager: CatalogManager) lookupTempView(u.multipartIdentifier, u.isStreaming, timeTravelSpec.isDefined).orElse { expandIdentifier(u.multipartIdentifier) match { case CatalogAndIdentifier(catalog, ident) => - val key = catalog.name +: ident.namespace :+ ident.name + val key = ((catalog.name +: ident.namespace :+ ident.name).toSeq, timeTravelSpec) AnalysisContext.get.relationCache.get(key).map(_.transform { case multi: MultiInstanceRelation => val newRelation = multi.newInstance() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 44f97f55713..7470911c9e5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2576,6 +2576,23 @@ class DataSourceV2SQLSuite } } + test("SPARK-41154: Incorrect relation caching for queries with time travel spec") { + sql("use testcat") + val t1 = "testcat.t1" + val t2 = "testcat.t2" + withTable(t1, t2) { + sql(s"CREATE TABLE $t1 USING foo AS SELECT 1 as c") + sql(s"CREATE TABLE $t2 USING foo AS SELECT 2 as c") + assert( + sql(""" + |SELECT * FROM t VERSION AS OF '1' + |UNION ALL + |SELECT * FROM t VERSION AS OF '2' + |""".stripMargin + ).collect() === Array(Row(1), Row(2))) + } + } + private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = { val e = intercept[AnalysisException] { sql(s"$sqlCommand $sqlParams") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org