This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new e882c90 [SPARK-33950][SQL][3.1][3.0] Refresh cache in v1 `ALTER TABLE .. DROP PARTITION` e882c90 is described below commit e882c9058fa010f133cd794c114c67cfd4541186 Author: Max Gekk <max.g...@gmail.com> AuthorDate: Mon Jan 4 13:33:41 2021 -0800 [SPARK-33950][SQL][3.1][3.0] Refresh cache in v1 `ALTER TABLE .. DROP PARTITION` ### What changes were proposed in this pull request? Invoke `refreshTable()` from `AlterTableDropPartitionCommand.run()` after partitions dropping. In particular, this re-creates the cache associated with the modified table. ### Why are the changes needed? This fixes the issues portrayed by the example: ```sql spark-sql> CREATE TABLE tbl1 (col0 int, part0 int) USING parquet PARTITIONED BY (part0); spark-sql> INSERT INTO tbl1 PARTITION (part0=0) SELECT 0; spark-sql> INSERT INTO tbl1 PARTITION (part0=1) SELECT 1; spark-sql> CACHE TABLE tbl1; spark-sql> SELECT * FROM tbl1; 0 0 1 1 spark-sql> ALTER TABLE tbl1 DROP PARTITION (part0=0); spark-sql> SELECT * FROM tbl1; 0 0 1 1 ``` The last query must not return `0 0` since it was deleted by previous command. ### Does this PR introduce _any_ user-facing change? Yes. After the changes for the example above: ```sql ... spark-sql> ALTER TABLE tbl1 DROP PARTITION (part0=0); spark-sql> SELECT * FROM tbl1; 1 1 ``` ### How was this patch tested? By running the affected test suites: ``` $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *CachedTableSuite" ``` Authored-by: Max Gekk <max.gekkgmail.com> Signed-off-by: Wenchen Fan <wenchendatabricks.com> (cherry picked from commit 67195d0d977caa5a458e8a609c434205f9b54d1b) Signed-off-by: Max Gekk <max.gekkgmail.com> Closes #31006 from MaxGekk/drop-partition-refresh-cache-3.1. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> (cherry picked from commit eef0e4c0389c815e8df25fd2c23a73fd85e20029) Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../org/apache/spark/sql/execution/command/ddl.scala | 1 + .../scala/org/apache/spark/sql/CachedTableSuite.scala | 15 +++++++++++++++ .../org/apache/spark/sql/hive/CachedTableSuite.scala | 17 ++++++++++++++++- 3 files changed, 32 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index f41c4ec..748bb1b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -580,6 +580,7 @@ case class AlterTableDropPartitionCommand( table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge, retainData = retainData) + sparkSession.catalog.refreshTable(table.identifier.quotedString) CommandUtils.updateTableStats(sparkSession, table) Seq.empty[Row] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index e14c72e..0e8122e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -1270,4 +1270,19 @@ class CachedTableSuite extends QueryTest with SQLTestUtils } } } + + test("SPARK-33950: refresh cache after partition dropping") { + withTable("t") { + sql(s"CREATE TABLE t (id int, part int) USING parquet PARTITIONED BY (part)") + sql("INSERT INTO t PARTITION (part=0) SELECT 0") + sql("INSERT INTO t PARTITION (part=1) SELECT 1") + assert(!spark.catalog.isCached("t")) + sql("CACHE TABLE t") + assert(spark.catalog.isCached("t")) + checkAnswer(sql("SELECT * FROM t"), Seq(Row(0, 0), Row(1, 1))) + sql("ALTER TABLE t DROP PARTITION (part=0)") + assert(spark.catalog.isCached("t")) + checkAnswer(sql("SELECT * FROM t"), Seq(Row(1, 1))) + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index 6c0ab2f..ed3ccb4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive import java.io.File -import org.apache.spark.sql.{AnalysisException, Dataset, QueryTest, SaveMode} +import org.apache.spark.sql.{AnalysisException, Dataset, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation} @@ -439,4 +439,19 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto assert(spark.catalog.isCached(t)) } } + + test("SPARK-33950: refresh cache after partition dropping") { + withTable("t") { + sql(s"CREATE TABLE t (id int, part int) USING hive PARTITIONED BY (part)") + sql("INSERT INTO t PARTITION (part=0) SELECT 0") + sql("INSERT INTO t PARTITION (part=1) SELECT 1") + assert(!spark.catalog.isCached("t")) + sql("CACHE TABLE t") + assert(spark.catalog.isCached("t")) + checkAnswer(sql("SELECT * FROM t"), Seq(Row(0, 0), Row(1, 1))) + sql("ALTER TABLE t DROP PARTITION (part=0)") + assert(spark.catalog.isCached("t")) + checkAnswer(sql("SELECT * FROM t"), Seq(Row(1, 1))) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org