Repository: spark Updated Branches: refs/heads/master d5aefa83a -> ee13f3e3d
[SPARK-21969][SQL] CommandUtils.updateTableStats should call refreshTable ## What changes were proposed in this pull request? Tables in the catalog cache are not invalidated once their statistics are updated. As a consequence, existing sessions will use the cached information even though it is not valid anymore. Consider and an example below. ``` // step 1 spark.range(100).write.saveAsTable("tab1") // step 2 spark.sql("analyze table tab1 compute statistics") // step 3 spark.sql("explain cost select distinct * from tab1").show(false) // step 4 spark.range(100).write.mode("append").saveAsTable("tab1") // step 5 spark.sql("explain cost select distinct * from tab1").show(false) ``` After step 3, the table will be present in the catalog relation cache. Step 4 will correctly update the metadata inside the catalog but will NOT invalidate the cache. By the way, ``spark.sql("analyze table tab1 compute statistics")`` between step 3 and step 4 would also solve the problem. ## How was this patch tested? Current and additional unit tests. Author: aokolnychyi <anton.okolnyc...@sap.com> Closes #19252 from aokolnychyi/spark-21969. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee13f3e3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee13f3e3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee13f3e3 Branch: refs/heads/master Commit: ee13f3e3dc3faa5152cefa91c22f8aaa8e425bb4 Parents: d5aefa8 Author: aokolnychyi <anton.okolnyc...@sap.com> Authored: Tue Sep 19 14:19:13 2017 -0700 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Tue Sep 19 14:19:13 2017 -0700 ---------------------------------------------------------------------- .../sql/catalyst/catalog/SessionCatalog.scala | 2 + .../command/AnalyzeColumnCommand.scala | 3 - .../execution/command/AnalyzeTableCommand.scala | 2 - .../spark/sql/StatisticsCollectionSuite.scala | 73 ++++++++++++++++++++ .../sql/StatisticsCollectionTestBase.scala | 14 +++- 5 files changed, 87 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ee13f3e3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 0908d68..9407b72 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -377,6 +377,8 @@ class SessionCatalog( requireDbExists(db) requireTableExists(tableIdentifier) externalCatalog.alterTableStats(db, table, newStats) + // Invalidate the table relation cache + refreshTable(identifier) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/ee13f3e3/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala index 6588993..caf12ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala @@ -56,9 +56,6 @@ case class AnalyzeColumnCommand( sessionState.catalog.alterTableStats(tableIdentWithDB, Some(statistics)) - // Refresh the cached data source table in the catalog. - sessionState.catalog.refreshTable(tableIdentWithDB) - Seq.empty[Row] } http://git-wip-us.apache.org/repos/asf/spark/blob/ee13f3e3/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala index 04715bd..58b53e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala @@ -48,8 +48,6 @@ case class AnalyzeTableCommand( val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount) if (newStats.isDefined) { sessionState.catalog.alterTableStats(tableIdentWithDB, newStats) - // Refresh the cached data source table in the catalog. - sessionState.catalog.refreshTable(tableIdentWithDB) } Seq.empty[Row] http://git-wip-us.apache.org/repos/asf/spark/blob/ee13f3e3/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala index 9e459ed..2fc92f4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala @@ -261,6 +261,10 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared assert(fetched1.get.sizeInBytes == 0) assert(fetched1.get.colStats.size == 2) + // table lookup will make the table cached + spark.table(table) + assert(isTableInCatalogCache(table)) + // insert into command sql(s"INSERT INTO TABLE $table SELECT 1, 'abc'") if (autoUpdate) { @@ -270,9 +274,78 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } else { checkTableStats(table, hasSizeInBytes = false, expectedRowCounts = None) } + + // check that tableRelationCache inside the catalog was invalidated after insert + assert(!isTableInCatalogCache(table)) + } + } + } + } + + test("invalidation of tableRelationCache after inserts") { + val table = "invalidate_catalog_cache_table" + Seq(false, true).foreach { autoUpdate => + withSQLConf(SQLConf.AUTO_UPDATE_SIZE.key -> autoUpdate.toString) { + withTable(table) { + spark.range(100).write.saveAsTable(table) + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS") + spark.table(table) + val initialSizeInBytes = getTableFromCatalogCache(table).stats.sizeInBytes + spark.range(100).write.mode(SaveMode.Append).saveAsTable(table) + spark.table(table) + assert(getTableFromCatalogCache(table).stats.sizeInBytes == 2 * initialSizeInBytes) + } + } + } + } + + test("invalidation of tableRelationCache after table truncation") { + val table = "invalidate_catalog_cache_table" + Seq(false, true).foreach { autoUpdate => + withSQLConf(SQLConf.AUTO_UPDATE_SIZE.key -> autoUpdate.toString) { + withTable(table) { + spark.range(100).write.saveAsTable(table) + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS") + spark.table(table) + sql(s"TRUNCATE TABLE $table") + spark.table(table) + assert(getTableFromCatalogCache(table).stats.sizeInBytes == 0) } } } } + test("invalidation of tableRelationCache after alter table add partition") { + val table = "invalidate_catalog_cache_table" + Seq(false, true).foreach { autoUpdate => + withSQLConf(SQLConf.AUTO_UPDATE_SIZE.key -> autoUpdate.toString) { + withTempDir { dir => + withTable(table) { + val path = dir.getCanonicalPath + sql(s""" + |CREATE TABLE $table (col1 int, col2 int) + |USING PARQUET + |PARTITIONED BY (col2) + |LOCATION '${dir.toURI}'""".stripMargin) + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS") + spark.table(table) + assert(getTableFromCatalogCache(table).stats.sizeInBytes == 0) + spark.catalog.recoverPartitions(table) + val df = Seq((1, 2), (1, 2)).toDF("col2", "col1") + df.write.parquet(s"$path/col2=1") + sql(s"ALTER TABLE $table ADD PARTITION (col2=1) LOCATION '${dir.toURI}'") + spark.table(table) + val cachedTable = getTableFromCatalogCache(table) + val cachedTableSizeInBytes = cachedTable.stats.sizeInBytes + val defaultSizeInBytes = conf.defaultSizeInBytes + if (autoUpdate) { + assert(cachedTableSizeInBytes != defaultSizeInBytes && cachedTableSizeInBytes > 0) + } else { + assert(cachedTableSizeInBytes == defaultSizeInBytes) + } + } + } + } + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/ee13f3e3/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala index 5916cd7..a2f63ed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala @@ -23,9 +23,9 @@ import java.sql.{Date, Timestamp} import scala.collection.mutable import scala.util.Random -import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, HiveTableRelation} -import org.apache.spark.sql.catalyst.plans.logical.ColumnStat +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.internal.StaticSQLConf @@ -85,6 +85,16 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) } + def getTableFromCatalogCache(tableName: String): LogicalPlan = { + val catalog = spark.sessionState.catalog + val qualifiedTableName = QualifiedTableName(catalog.getCurrentDatabase, tableName) + catalog.getCachedTable(qualifiedTableName) + } + + def isTableInCatalogCache(tableName: String): Boolean = { + getTableFromCatalogCache(tableName) != null + } + def getCatalogStatistics(tableName: String): CatalogStatistics = { getCatalogTable(tableName).stats.get } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org