This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 02d3b3b [SPARK-37098][SQL] Alter table properties should invalidate cache 02d3b3b is described below commit 02d3b3b452892779b3f0df7018a9574fde02afee Author: ulysses-you <ulyssesyo...@gmail.com> AuthorDate: Mon Oct 25 16:02:38 2021 +0800 [SPARK-37098][SQL] Alter table properties should invalidate cache ### What changes were proposed in this pull request? Invalidate the table cache after alter table properties (set and unset). ### Why are the changes needed? The table properties can change the behavior of wriing. e.g. the parquet table with `parquet.compression`. If you execute the following SQL, we will get the file with snappy compression rather than zstd. ``` CREATE TABLE t (c int) STORED AS PARQUET; // cache table metadata SELECT * FROM t; ALTER TABLE t SET TBLPROPERTIES('parquet.compression'='zstd'); INSERT INTO TABLE t values(1); ``` So we should invalidate the table cache after alter table properties. ### Does this PR introduce _any_ user-facing change? yes, bug fix ### How was this patch tested? Add test Closes #34365 from ulysses-you/SPARK-37098. Authored-by: ulysses-you <ulyssesyo...@gmail.com> Signed-off-by: Kent Yao <y...@apache.org> --- .../apache/spark/sql/execution/command/ddl.scala | 2 ++ .../apache/spark/sql/hive/HiveParquetSuite.scala | 24 ++++++++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 5012b22..36a17a9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -276,6 +276,7 @@ case class AlterTableSetPropertiesCommand( properties = table.properties ++ properties, comment = properties.get(TableCatalog.PROP_COMMENT).orElse(table.comment)) catalog.alterTable(newTable) + catalog.invalidateCachedTable(tableName) Seq.empty[Row] } @@ -312,6 +313,7 @@ case class AlterTableUnsetPropertiesCommand( val newProperties = table.properties.filter { case (k, _) => !propKeys.contains(k) } val newTable = table.copy(properties = newProperties, comment = tableComment) catalog.alterTable(newTable) + catalog.invalidateCachedTable(tableName) Seq.empty[Row] } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala index e058e6a..76a66cf 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala @@ -144,4 +144,28 @@ class HiveParquetSuite extends QueryTest with ParquetTest with TestHiveSingleton .plus(123456, ChronoUnit.MICROS))) } } + + test("SPARK-37098: Alter table properties should invalidate cache") { + // specify the compression in case we change it in future + withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") { + withTempPath { dir => + withTable("t") { + sql(s"CREATE TABLE t (c int) STORED AS PARQUET LOCATION '${dir.getCanonicalPath}'") + // cache table metadata + sql("SELECT * FROM t") + sql("ALTER TABLE t SET TBLPROPERTIES('parquet.compression'='zstd')") + sql("INSERT INTO TABLE t values(1)") + val files1 = dir.listFiles().filter(_.getName.endsWith("zstd.parquet")) + assert(files1.length == 1) + + // cache table metadata again + sql("SELECT * FROM t") + sql("ALTER TABLE t UNSET TBLPROPERTIES('parquet.compression')") + sql("INSERT INTO TABLE t values(1)") + val files2 = dir.listFiles().filter(_.getName.endsWith("snappy.parquet")) + assert(files2.length == 1) + } + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org