This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 02d3b3b  [SPARK-37098][SQL] Alter table properties should invalidate 
cache
02d3b3b is described below

commit 02d3b3b452892779b3f0df7018a9574fde02afee
Author: ulysses-you <ulyssesyo...@gmail.com>
AuthorDate: Mon Oct 25 16:02:38 2021 +0800

    [SPARK-37098][SQL] Alter table properties should invalidate cache
    
    ### What changes were proposed in this pull request?
    
    Invalidate the table cache after alter table properties (set and unset).
    
    ### Why are the changes needed?
    
    The table properties can change the behavior of wriing. e.g. the parquet 
table with `parquet.compression`.
    
    If you execute the following SQL, we will get the file with snappy 
compression rather than zstd.
    ```
    CREATE TABLE t (c int) STORED AS PARQUET;
    // cache table metadata
    SELECT * FROM t;
    ALTER TABLE t SET TBLPROPERTIES('parquet.compression'='zstd');
    INSERT INTO TABLE t values(1);
    ```
    So we should invalidate the table cache after alter table properties.
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, bug fix
    
    ### How was this patch tested?
    
    Add test
    
    Closes #34365 from ulysses-you/SPARK-37098.
    
    Authored-by: ulysses-you <ulyssesyo...@gmail.com>
    Signed-off-by: Kent Yao <y...@apache.org>
---
 .../apache/spark/sql/execution/command/ddl.scala   |  2 ++
 .../apache/spark/sql/hive/HiveParquetSuite.scala   | 24 ++++++++++++++++++++++
 2 files changed, 26 insertions(+)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 5012b22..36a17a9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -276,6 +276,7 @@ case class AlterTableSetPropertiesCommand(
       properties = table.properties ++ properties,
       comment = 
properties.get(TableCatalog.PROP_COMMENT).orElse(table.comment))
     catalog.alterTable(newTable)
+    catalog.invalidateCachedTable(tableName)
     Seq.empty[Row]
   }
 
@@ -312,6 +313,7 @@ case class AlterTableUnsetPropertiesCommand(
     val newProperties = table.properties.filter { case (k, _) => 
!propKeys.contains(k) }
     val newTable = table.copy(properties = newProperties, comment = 
tableComment)
     catalog.alterTable(newTable)
+    catalog.invalidateCachedTable(tableName)
     Seq.empty[Row]
   }
 
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
index e058e6a..76a66cf 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
@@ -144,4 +144,28 @@ class HiveParquetSuite extends QueryTest with ParquetTest 
with TestHiveSingleton
             .plus(123456, ChronoUnit.MICROS)))
     }
   }
+
+  test("SPARK-37098: Alter table properties should invalidate cache") {
+    // specify the compression in case we change it in future
+    withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
+      withTempPath { dir =>
+        withTable("t") {
+          sql(s"CREATE TABLE t (c int) STORED AS PARQUET LOCATION 
'${dir.getCanonicalPath}'")
+          // cache table metadata
+          sql("SELECT * FROM t")
+          sql("ALTER TABLE t SET TBLPROPERTIES('parquet.compression'='zstd')")
+          sql("INSERT INTO TABLE t values(1)")
+          val files1 = 
dir.listFiles().filter(_.getName.endsWith("zstd.parquet"))
+          assert(files1.length == 1)
+
+          // cache table metadata again
+          sql("SELECT * FROM t")
+          sql("ALTER TABLE t UNSET TBLPROPERTIES('parquet.compression')")
+          sql("INSERT INTO TABLE t values(1)")
+          val files2 = 
dir.listFiles().filter(_.getName.endsWith("snappy.parquet"))
+          assert(files2.length == 1)
+        }
+      }
+    }
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to