Repository: spark Updated Branches: refs/heads/branch-2.1 d489e1dc7 -> 94272a960
[SPARK-19028][SQL] Fixed non-thread-safe functions used in SessionCatalog ### What changes were proposed in this pull request? Fixed non-thread-safe functions used in SessionCatalog: - refreshTable - lookupRelation ### How was this patch tested? N/A Author: gatorsmile <gatorsm...@gmail.com> Closes #16437 from gatorsmile/addSyncToLookUpTable. (cherry picked from commit 35e974076dcbc5afde8d4259ce88cb5f29d94920) Signed-off-by: Wenchen Fan <wenc...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/94272a96 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/94272a96 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/94272a96 Branch: refs/heads/branch-2.1 Commit: 94272a9600405442bfe485b17e55a84b85c25da3 Parents: d489e1d Author: gatorsmile <gatorsm...@gmail.com> Authored: Sat Dec 31 19:40:28 2016 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Tue Jan 3 10:23:47 2017 +0800 ---------------------------------------------------------------------- .../sql/catalyst/catalog/SessionCatalog.scala | 2 +- .../spark/sql/hive/HiveSessionCatalog.scala | 36 +++++++++++--------- 2 files changed, 20 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/94272a96/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 7a3d209..dd8e46d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -632,7 +632,7 @@ class SessionCatalog( /** * Refresh the cache entry for a metastore table, if any. */ - def refreshTable(name: TableIdentifier): Unit = { + def refreshTable(name: TableIdentifier): Unit = synchronized { // Go through temporary tables and invalidate them. // If the database is defined, this is definitely not a temp table. // If the database is not defined, there is a good chance this is a temp table. http://git-wip-us.apache.org/repos/asf/spark/blob/94272a96/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 08bf1cd..462b3c2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -56,23 +56,25 @@ private[sql] class HiveSessionCatalog( hadoopConf) { override def lookupRelation(name: TableIdentifier, alias: Option[String]): LogicalPlan = { - val table = formatTableName(name.table) - val db = formatDatabaseName(name.database.getOrElse(currentDb)) - if (db == globalTempViewManager.database) { - val relationAlias = alias.getOrElse(table) - globalTempViewManager.get(table).map { viewDef => - SubqueryAlias(relationAlias, viewDef, Some(name)) - }.getOrElse(throw new NoSuchTableException(db, table)) - } else if (name.database.isDefined || !tempTables.contains(table)) { - val database = name.database.map(formatDatabaseName) - val newName = name.copy(database = database, table = table) - metastoreCatalog.lookupRelation(newName, alias) - } else { - val relation = tempTables(table) - val tableWithQualifiers = SubqueryAlias(table, relation, None) - // If an alias was specified by the lookup, wrap the plan in a subquery so that - // attributes are properly qualified with this alias. - alias.map(a => SubqueryAlias(a, tableWithQualifiers, None)).getOrElse(tableWithQualifiers) + synchronized { + val table = formatTableName(name.table) + val db = formatDatabaseName(name.database.getOrElse(currentDb)) + if (db == globalTempViewManager.database) { + val relationAlias = alias.getOrElse(table) + globalTempViewManager.get(table).map { viewDef => + SubqueryAlias(relationAlias, viewDef, Some(name)) + }.getOrElse(throw new NoSuchTableException(db, table)) + } else if (name.database.isDefined || !tempTables.contains(table)) { + val database = name.database.map(formatDatabaseName) + val newName = name.copy(database = database, table = table) + metastoreCatalog.lookupRelation(newName, alias) + } else { + val relation = tempTables(table) + val tableWithQualifiers = SubqueryAlias(table, relation, None) + // If an alias was specified by the lookup, wrap the plan in a subquery so that + // attributes are properly qualified with this alias. + alias.map(a => SubqueryAlias(a, tableWithQualifiers, None)).getOrElse(tableWithQualifiers) + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org