This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 9562629  [SPARK-34027][SQL][3.1] Refresh cache in `ALTER TABLE .. 
RECOVER PARTITIONS`
9562629 is described below

commit 95626295d43150d9d90a08f352f89761e60a9443
Author: Max Gekk <max.g...@gmail.com>
AuthorDate: Tue Jan 19 03:32:23 2021 +0000

    [SPARK-34027][SQL][3.1] Refresh cache in `ALTER TABLE .. RECOVER PARTITIONS`
    
    ### What changes were proposed in this pull request?
    Invoke `refreshTable()` from `CatalogImpl` which refreshes the cache in v1 
`ALTER TABLE .. RECOVER PARTITIONS`.
    
    ### Why are the changes needed?
    This fixes the issues portrayed by the example:
    ```sql
    spark-sql> create table tbl (col int, part int) using parquet partitioned 
by (part);
    spark-sql> insert into tbl partition (part=0) select 0;
    spark-sql> cache table tbl;
    spark-sql> select * from tbl;
    0   0
    spark-sql> show table extended like 'tbl' partition(part=0);
    default     tbl     false   Partition Values: [part=0]
    Location: 
file:/Users/maximgekk/proj/recover-partitions-refresh-cache/spark-warehouse/tbl/part=0
    ...
    ```
    Create new partition by copying the existing one:
    ```
    $ cp -r 
/Users/maximgekk/proj/recover-partitions-refresh-cache/spark-warehouse/tbl/part=0
 
/Users/maximgekk/proj/recover-partitions-refresh-cache/spark-warehouse/tbl/part=1
    ```
    ```sql
    spark-sql> alter table tbl recover partitions;
    spark-sql> select * from tbl;
    0   0
    ```
    
    The last query must return `0       1` since it has been recovered by 
`ALTER TABLE .. RECOVER PARTITIONS`.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. After the changes for the example above:
    ```sql
    ...
    spark-sql> alter table tbl recover partitions;
    spark-sql> select * from tbl;
    0   0
    0   1
    ```
    
    ### How was this patch tested?
    By running the affected test suite:
    ```
    $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *CachedTableSuite"
    ```
    
    Authored-by: Max Gekk <max.gekkgmail.com>
    Signed-off-by: Wenchen Fan <wenchendatabricks.com>
    (cherry picked from commit dee596e3efe54651aa1e7c467b4f987f662e60b0)
    Signed-off-by: Max Gekk <max.gekkgmail.com>
    
    Closes #31235 from MaxGekk/recover-partitions-refresh-cache-3.1.
    
    Authored-by: Max Gekk <max.g...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../org/apache/spark/sql/execution/command/ddl.scala     |  2 +-
 .../scala/org/apache/spark/sql/CachedTableSuite.scala    | 16 ++++++++++++++--
 .../apache/spark/sql/hive/HiveSchemaInferenceSuite.scala | 11 ++++++-----
 .../spark/sql/hive/PartitionedTablePerfStatsSuite.scala  |  4 ++--
 4 files changed, 23 insertions(+), 10 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index f657f42..3380d5a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -676,7 +676,7 @@ case class AlterTableRecoverPartitionsCommand(
     // This is always the case for Hive format tables, but is not true for 
Datasource tables created
     // before Spark 2.1 unless they are converted via `msck repair table`.
     spark.sessionState.catalog.alterTable(table.copy(tracksPartitionsInCatalog 
= true))
-    catalog.refreshTable(tableName)
+    spark.catalog.refreshTable(tableIdentWithDB)
     logInfo(s"Recovered all partitions ($total).")
     Seq.empty[Row]
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 2b4871a..a3f4e9e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -1338,7 +1338,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
     }
   }
 
-  test("SPARK-34055: refresh cache in partition adding") {
+  private def testCacheRefreshing(cmd: String => DataFrame): Unit = {
     withTable("t") {
       sql("CREATE TABLE t (id int, part int) USING parquet PARTITIONED BY 
(part)")
       sql("INSERT INTO t PARTITION (part=0) SELECT 0")
@@ -1359,9 +1359,21 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils
       val part1Loc = part0Loc.replace("part=0", "part=1")
       FileUtils.copyDirectory(new File(part0Loc), new File(part1Loc))
 
-      sql(s"ALTER TABLE t ADD PARTITION (part=1) LOCATION '$part1Loc'")
+      cmd(part1Loc)
       assert(spark.catalog.isCached("t"))
       checkAnswer(sql("SELECT * FROM t"), Seq(Row(0, 0), Row(0, 1)))
     }
   }
+
+  test("SPARK-34055: refresh cache in partition adding") {
+    testCacheRefreshing { location =>
+      sql(s"ALTER TABLE t ADD PARTITION (part=1) LOCATION '$location'")
+    }
+  }
+
+  test("SPARK-34027: refresh cache in partitions recovering") {
+    testCacheRefreshing { _ =>
+      sql("ALTER TABLE t RECOVER PARTITIONS")
+    }
+  }
 }
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
index ce82756..ebf0228 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
@@ -118,11 +118,6 @@ class HiveSchemaInferenceSuite
         properties = Map.empty),
       true)
 
-    // Add partition records (if specified)
-    if (!partitionCols.isEmpty) {
-      spark.catalog.recoverPartitions(TEST_TABLE_NAME)
-    }
-
     // Check that the table returned by HiveExternalCatalog has 
schemaPreservesCase set to false
     // and that the raw table returned by the Hive client doesn't have any 
Spark SQL properties
     // set (table needs to be obtained from client since HiveExternalCatalog 
filters these
@@ -130,6 +125,12 @@ class HiveSchemaInferenceSuite
     assert(!externalCatalog.getTable(DATABASE, 
TEST_TABLE_NAME).schemaPreservesCase)
     val rawTable = client.getTable(DATABASE, TEST_TABLE_NAME)
     
assert(rawTable.properties.filterKeys(_.startsWith(DATASOURCE_SCHEMA_PREFIX)).isEmpty)
+
+    // Add partition records (if specified)
+    if (!partitionCols.isEmpty) {
+      spark.catalog.recoverPartitions(TEST_TABLE_NAME)
+    }
+
     schema
   }
 
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
index 49e2661..dbed98c 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
@@ -405,8 +405,8 @@ class PartitionedTablePerfStatsSuite
           })
           executorPool.shutdown()
           executorPool.awaitTermination(30, TimeUnit.SECONDS)
-          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 50)
-          
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 1)
+          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 100)
+          
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 2)
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to