This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new 9562629 [SPARK-34027][SQL][3.1] Refresh cache in `ALTER TABLE .. RECOVER PARTITIONS` 9562629 is described below commit 95626295d43150d9d90a08f352f89761e60a9443 Author: Max Gekk <max.g...@gmail.com> AuthorDate: Tue Jan 19 03:32:23 2021 +0000 [SPARK-34027][SQL][3.1] Refresh cache in `ALTER TABLE .. RECOVER PARTITIONS` ### What changes were proposed in this pull request? Invoke `refreshTable()` from `CatalogImpl` which refreshes the cache in v1 `ALTER TABLE .. RECOVER PARTITIONS`. ### Why are the changes needed? This fixes the issues portrayed by the example: ```sql spark-sql> create table tbl (col int, part int) using parquet partitioned by (part); spark-sql> insert into tbl partition (part=0) select 0; spark-sql> cache table tbl; spark-sql> select * from tbl; 0 0 spark-sql> show table extended like 'tbl' partition(part=0); default tbl false Partition Values: [part=0] Location: file:/Users/maximgekk/proj/recover-partitions-refresh-cache/spark-warehouse/tbl/part=0 ... ``` Create new partition by copying the existing one: ``` $ cp -r /Users/maximgekk/proj/recover-partitions-refresh-cache/spark-warehouse/tbl/part=0 /Users/maximgekk/proj/recover-partitions-refresh-cache/spark-warehouse/tbl/part=1 ``` ```sql spark-sql> alter table tbl recover partitions; spark-sql> select * from tbl; 0 0 ``` The last query must return `0 1` since it has been recovered by `ALTER TABLE .. RECOVER PARTITIONS`. ### Does this PR introduce _any_ user-facing change? Yes. After the changes for the example above: ```sql ... spark-sql> alter table tbl recover partitions; spark-sql> select * from tbl; 0 0 0 1 ``` ### How was this patch tested? By running the affected test suite: ``` $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *CachedTableSuite" ``` Authored-by: Max Gekk <max.gekkgmail.com> Signed-off-by: Wenchen Fan <wenchendatabricks.com> (cherry picked from commit dee596e3efe54651aa1e7c467b4f987f662e60b0) Signed-off-by: Max Gekk <max.gekkgmail.com> Closes #31235 from MaxGekk/recover-partitions-refresh-cache-3.1. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../org/apache/spark/sql/execution/command/ddl.scala | 2 +- .../scala/org/apache/spark/sql/CachedTableSuite.scala | 16 ++++++++++++++-- .../apache/spark/sql/hive/HiveSchemaInferenceSuite.scala | 11 ++++++----- .../spark/sql/hive/PartitionedTablePerfStatsSuite.scala | 4 ++-- 4 files changed, 23 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index f657f42..3380d5a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -676,7 +676,7 @@ case class AlterTableRecoverPartitionsCommand( // This is always the case for Hive format tables, but is not true for Datasource tables created // before Spark 2.1 unless they are converted via `msck repair table`. spark.sessionState.catalog.alterTable(table.copy(tracksPartitionsInCatalog = true)) - catalog.refreshTable(tableName) + spark.catalog.refreshTable(tableIdentWithDB) logInfo(s"Recovered all partitions ($total).") Seq.empty[Row] } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 2b4871a..a3f4e9e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -1338,7 +1338,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils } } - test("SPARK-34055: refresh cache in partition adding") { + private def testCacheRefreshing(cmd: String => DataFrame): Unit = { withTable("t") { sql("CREATE TABLE t (id int, part int) USING parquet PARTITIONED BY (part)") sql("INSERT INTO t PARTITION (part=0) SELECT 0") @@ -1359,9 +1359,21 @@ class CachedTableSuite extends QueryTest with SQLTestUtils val part1Loc = part0Loc.replace("part=0", "part=1") FileUtils.copyDirectory(new File(part0Loc), new File(part1Loc)) - sql(s"ALTER TABLE t ADD PARTITION (part=1) LOCATION '$part1Loc'") + cmd(part1Loc) assert(spark.catalog.isCached("t")) checkAnswer(sql("SELECT * FROM t"), Seq(Row(0, 0), Row(0, 1))) } } + + test("SPARK-34055: refresh cache in partition adding") { + testCacheRefreshing { location => + sql(s"ALTER TABLE t ADD PARTITION (part=1) LOCATION '$location'") + } + } + + test("SPARK-34027: refresh cache in partitions recovering") { + testCacheRefreshing { _ => + sql("ALTER TABLE t RECOVER PARTITIONS") + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala index ce82756..ebf0228 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala @@ -118,11 +118,6 @@ class HiveSchemaInferenceSuite properties = Map.empty), true) - // Add partition records (if specified) - if (!partitionCols.isEmpty) { - spark.catalog.recoverPartitions(TEST_TABLE_NAME) - } - // Check that the table returned by HiveExternalCatalog has schemaPreservesCase set to false // and that the raw table returned by the Hive client doesn't have any Spark SQL properties // set (table needs to be obtained from client since HiveExternalCatalog filters these @@ -130,6 +125,12 @@ class HiveSchemaInferenceSuite assert(!externalCatalog.getTable(DATABASE, TEST_TABLE_NAME).schemaPreservesCase) val rawTable = client.getTable(DATABASE, TEST_TABLE_NAME) assert(rawTable.properties.filterKeys(_.startsWith(DATASOURCE_SCHEMA_PREFIX)).isEmpty) + + // Add partition records (if specified) + if (!partitionCols.isEmpty) { + spark.catalog.recoverPartitions(TEST_TABLE_NAME) + } + schema } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala index 49e2661..dbed98c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala @@ -405,8 +405,8 @@ class PartitionedTablePerfStatsSuite }) executorPool.shutdown() executorPool.awaitTermination(30, TimeUnit.SECONDS) - assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 50) - assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 1) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 100) + assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 2) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org