This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new dd986e8 [SPARK-36588] Migrate SHOW TABLES to use V2 command by default dd986e8 is described below commit dd986e87fc0069102d64c68826f30cfb2e103d08 Author: Terry Kim <yumin...@gmail.com> AuthorDate: Mon Oct 11 19:28:32 2021 +0800 [SPARK-36588] Migrate SHOW TABLES to use V2 command by default ### What changes were proposed in this pull request? This PR proposes to use V2 commands as default as outlined in [SPARK-36588](https://issues.apache.org/jira/browse/SPARK-36588), and this PR migrates `SHOW TABLES` to use v2 command by default. ### Why are the changes needed? It's been a while since we introduced the v2 commands, and it seems reasonable to use v2 commands by default even for the session catalog, with a legacy config to fall back to the v1 commands. ### Does this PR introduce _any_ user-facing change? No, the user can use v1 command by setting `spark.sql.legacy.useV1Command` to `true`. ### How was this patch tested? Added unit tests. Closes #34137 from imback82/SPARK-36588-show-tables. Authored-by: Terry Kim <yumin...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/analysis/Analyzer.scala | 2 ++ .../sql/catalyst/analysis/KeepLegacyOutputs.scala} | 26 +++++++++++++------- .../org/apache/spark/sql/internal/SQLConf.scala | 10 ++++++++ .../catalyst/analysis/ResolveSessionCatalog.scala | 2 +- .../execution/datasources/v2/ShowTablesExec.scala | 11 +++++++-- .../datasources/v2/V2SessionCatalog.scala | 6 ++++- .../spark/sql/execution/QueryExecutionSuite.scala | 10 +++----- .../execution/command/ShowTablesSuiteBase.scala | 20 +++++++++------- .../sql/execution/command/v1/ShowTablesSuite.scala | 28 ++++++++++++++++++++-- .../org/apache/spark/sql/test/SQLTestUtils.scala | 11 +++++++++ .../hive/execution/command/ShowTablesSuite.scala | 2 ++ 11 files changed, 97 insertions(+), 31 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 0f90159..3ec50b5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -262,6 +262,8 @@ class Analyzer(override val catalogManager: CatalogManager) ResolveHints.ResolveCoalesceHints), Batch("Simple Sanity Check", Once, LookupFunctions), + Batch("Keep Legacy Outputs", Once, + KeepLegacyOutputs), Batch("Resolution", fixedPoint, ResolveTableValuedFunctions(v1SessionCatalog) :: ResolveNamespace(catalogManager) :: diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowTablesSuite.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/KeepLegacyOutputs.scala similarity index 51% copy from sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowTablesSuite.scala copy to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/KeepLegacyOutputs.scala index b6db9a3..baee2bd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowTablesSuite.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/KeepLegacyOutputs.scala @@ -15,19 +15,27 @@ * limitations under the License. */ -package org.apache.spark.sql.hive.execution.command +package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.execution.command.v1 +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ShowTables} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND +import org.apache.spark.sql.internal.SQLConf /** - * The class contains tests for the `SHOW TABLES` command to check V1 Hive external table catalog. + * A rule for keeping the SQL command's legacy outputs. */ -class ShowTablesSuite extends v1.ShowTablesSuiteBase with CommandSuiteBase { - test("hive client calls") { - withNamespaceAndTable("ns", "tbl") { t => - sql(s"CREATE TABLE $t (id int) $defaultUsing") - checkHiveClientCalls(expected = 3) { - sql(s"SHOW TABLES IN $catalog.ns") +object KeepLegacyOutputs extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = { + if (!conf.getConf(SQLConf.LEGACY_KEEP_COMMAND_OUTPUT_SCHEMA)) { + plan + } else { + plan.resolveOperatorsUpWithPruning( + _.containsPattern(COMMAND)) { + case s: ShowTables => + assert(s.output.length == 3) + val newOutput = s.output.head.withName("database") +: s.output.tail + s.copy(output = newOutput) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 98aad1c..1dc4a8e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3394,6 +3394,14 @@ object SQLConf { .booleanConf .createWithDefault(false) + val LEGACY_USE_V1_COMMAND = + buildConf("spark.sql.legacy.useV1Command") + .internal() + .doc("When true, Spark will use legacy V1 SQL commands.") + .version("3.3.0") + .booleanConf + .createWithDefault(false) + /** * Holds information about keys that have been deprecated. * @@ -4115,6 +4123,8 @@ class SQLConf extends Serializable with Logging { def inferDictAsStruct: Boolean = getConf(SQLConf.INFER_NESTED_DICT_AS_STRUCT) + def useV1Command: Boolean = getConf(SQLConf.LEGACY_USE_V1_COMMAND) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 5dacc16..e17a879 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -269,7 +269,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case d @ DropNamespace(DatabaseInSessionCatalog(db), _, _) => DropDatabaseCommand(db, d.ifExists, d.cascade) - case ShowTables(DatabaseInSessionCatalog(db), pattern, output) => + case ShowTables(DatabaseInSessionCatalog(db), pattern, output) if conf.useV1Command => val newOutput = if (conf.getConf(SQLConf.LEGACY_KEEP_COMMAND_OUTPUT_SCHEMA)) { assert(output.length == 3) output.head.withName("database") +: output.tail diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala index b624e62..cde3dfb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowTablesExec.scala @@ -22,8 +22,8 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.util.StringUtils +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper -import org.apache.spark.sql.connector.catalog.TableCatalog import org.apache.spark.sql.execution.LeafExecNode /** @@ -40,10 +40,17 @@ case class ShowTablesExec( val tables = catalog.listTables(namespace.toArray) tables.map { table => if (pattern.map(StringUtils.filterPattern(Seq(table.name()), _).nonEmpty).getOrElse(true)) { - rows += toCatalystRow(table.namespace().quoted, table.name(), false) + rows += toCatalystRow(table.namespace().quoted, table.name(), isTempView(table)) } } rows.toSeq } + + private def isTempView(ident: Identifier): Boolean = { + catalog match { + case s: V2SessionCatalog => s.isTempView(ident) + case _ => false + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 33b8f22..1202498 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -53,7 +53,7 @@ class V2SessionCatalog(catalog: SessionCatalog) case Array(db) => catalog .listTables(db) - .map(ident => Identifier.of(Array(ident.database.getOrElse("")), ident.table)) + .map(ident => Identifier.of(ident.database.map(Array(_)).getOrElse(Array()), ident.table)) .toArray case _ => throw QueryCompilationErrors.noSuchNamespaceError(namespace) @@ -277,6 +277,10 @@ class V2SessionCatalog(catalog: SessionCatalog) throw QueryCompilationErrors.noSuchNamespaceError(namespace) } + def isTempView(ident: Identifier): Boolean = { + catalog.isTempView(ident.namespace() :+ ident.name()) + } + override def toString: String = s"V2SessionCatalog($name)" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala index 82bc22f3..86261b4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedNamespace import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{CommandResult, LogicalPlan, OneRowRelation, Project, ShowTables, SubqueryAlias} import org.apache.spark.sql.catalyst.trees.TreeNodeTag -import org.apache.spark.sql.execution.command.{ExecutedCommandExec, ShowTablesCommand} +import org.apache.spark.sql.execution.datasources.v2.ShowTablesExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.util.Utils @@ -247,9 +247,7 @@ class QueryExecutionSuite extends SharedSparkSession { assert(showTablesQe.commandExecuted.isInstanceOf[CommandResult]) assert(showTablesQe.executedPlan.isInstanceOf[CommandResultExec]) val showTablesResultExec = showTablesQe.executedPlan.asInstanceOf[CommandResultExec] - assert(showTablesResultExec.commandPhysicalPlan.isInstanceOf[ExecutedCommandExec]) - assert(showTablesResultExec.commandPhysicalPlan.asInstanceOf[ExecutedCommandExec] - .cmd.isInstanceOf[ShowTablesCommand]) + assert(showTablesResultExec.commandPhysicalPlan.isInstanceOf[ShowTablesExec]) val project = Project(showTables.output, SubqueryAlias("s", showTables)) val projectQe = qe(project) @@ -260,8 +258,6 @@ class QueryExecutionSuite extends SharedSparkSession { assert(projectQe.commandExecuted.children(0).children(0).isInstanceOf[CommandResult]) assert(projectQe.executedPlan.isInstanceOf[CommandResultExec]) val cmdResultExec = projectQe.executedPlan.asInstanceOf[CommandResultExec] - assert(cmdResultExec.commandPhysicalPlan.isInstanceOf[ExecutedCommandExec]) - assert(cmdResultExec.commandPhysicalPlan.asInstanceOf[ExecutedCommandExec] - .cmd.isInstanceOf[ShowTablesCommand]) + assert(cmdResultExec.commandPhysicalPlan.isInstanceOf[ShowTablesExec]) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowTablesSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowTablesSuiteBase.scala index a01adb8..5f56b91 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowTablesSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowTablesSuiteBase.scala @@ -111,17 +111,19 @@ trait ShowTablesSuiteBase extends QueryTest with DDLCommandTestUtils { } test("change current catalog and namespace with USE statements") { - withNamespaceAndTable("ns", "table") { t => - sql(s"CREATE TABLE $t (name STRING, id INT) $defaultUsing") + withCurrentCatalogAndNamespace { + withNamespaceAndTable("ns", "table") { t => + sql(s"CREATE TABLE $t (name STRING, id INT) $defaultUsing") - sql(s"USE $catalog") - // No table is matched since the current namespace is not ["ns"] - assert(defaultNamespace != Seq("ns")) - runShowTablesSql("SHOW TABLES", Seq()) + sql(s"USE $catalog") + // No table is matched since the current namespace is not ["ns"] + assert(defaultNamespace != Seq("ns")) + runShowTablesSql("SHOW TABLES", Seq()) - // Update the current namespace to match "ns.tbl". - sql(s"USE $catalog.ns") - runShowTablesSql("SHOW TABLES", Seq(Row("ns", "table", false))) + // Update the current namespace to match "ns.tbl". + sql(s"USE $catalog.ns") + runShowTablesSql("SHOW TABLES", Seq(Row("ns", "table", false))) + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowTablesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowTablesSuite.scala index f47493b..23b9b54 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowTablesSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowTablesSuite.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.execution.command.v1 +import org.scalactic.source.Position +import org.scalatest.Tag + import org.apache.spark.sql.{AnalysisException, Row, SaveMode} import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException import org.apache.spark.sql.execution.command @@ -32,6 +35,26 @@ import org.apache.spark.sql.internal.SQLConf */ trait ShowTablesSuiteBase extends command.ShowTablesSuiteBase { override def defaultNamespace: Seq[String] = Seq("default") + var _version: String = "" + override def version: String = _version + + // Tests using V1 catalogs will run with `spark.sql.legacy.useV1Command` on and off + // to test both V1 and V2 commands. + override def test(testName: String, testTags: Tag*)(testFun: => Any) + (implicit pos: Position): Unit = { + Seq(true, false).foreach { useV1Command => + _version = if (useV1Command) { + "using V1 catalog with V1 command" + } else { + "using V1 catalog with V2 command" + } + super.test(testName, testTags: _*) { + withSQLConf(SQLConf.LEGACY_USE_V1_COMMAND.key -> useV1Command.toString) { + testFun + } + } + } + } private def withSourceViews(f: => Unit): Unit = { withTable("source", "source2") { @@ -84,7 +107,7 @@ trait ShowTablesSuiteBase extends command.ShowTablesSuiteBase { false -> "PARTITION(YEAR = 2015, Month = 1)" ).foreach { case (caseSensitive, partitionSpec) => withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { - val df = sql(s"SHOW TABLE EXTENDED LIKE 'part_table' $partitionSpec") + val df = sql(s"SHOW TABLE EXTENDED IN ns LIKE 'part_table' $partitionSpec") val information = df.select("information").first().getString(0) assert(information.contains("Partition Values: [year=2015, month=1]")) } @@ -129,7 +152,6 @@ trait ShowTablesSuiteBase extends command.ShowTablesSuiteBase { } } - test("show table in a not existing namespace") { val msg = intercept[NoSuchDatabaseException] { runShowTablesSql(s"SHOW TABLES IN $catalog.unknown", Seq()) @@ -143,6 +165,8 @@ trait ShowTablesSuiteBase extends command.ShowTablesSuiteBase { * The class contains tests for the `SHOW TABLES` command to check V1 In-Memory table catalog. */ class ShowTablesSuite extends ShowTablesSuiteBase with CommandSuiteBase { + override def version: String = super[ShowTablesSuiteBase].version + test("SPARK-33670: show partitions from a datasource table") { import testImplicits._ withNamespace(s"$catalog.ns") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index ba0b599..ae42541 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -393,6 +393,17 @@ private[sql] trait SQLTestUtilsBase } /** + * Restores the current catalog/database after calling `f`. + */ + protected def withCurrentCatalogAndNamespace(f: => Unit): Unit = { + val curCatalog = sql("select current_catalog()").head().getString(0) + val curDatabase = sql("select current_database()").head().getString(0) + Utils.tryWithSafeFinally(f) { + spark.sql(s"USE $curCatalog.$curDatabase") + } + } + + /** * Enables Locale `language` before executing `f`, then switches back to the default locale of JVM * after `f` returns. */ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowTablesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowTablesSuite.scala index b6db9a3..6050618 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowTablesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowTablesSuite.scala @@ -23,6 +23,8 @@ import org.apache.spark.sql.execution.command.v1 * The class contains tests for the `SHOW TABLES` command to check V1 Hive external table catalog. */ class ShowTablesSuite extends v1.ShowTablesSuiteBase with CommandSuiteBase { + override def version: String = super[ShowTablesSuiteBase].version + test("hive client calls") { withNamespaceAndTable("ns", "tbl") { t => sql(s"CREATE TABLE $t (id int) $defaultUsing") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org