This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 0620b56764f [SPARK-42928][SQL] Make resolvePersistentFunction synchronized 0620b56764f is described below commit 0620b56764fab4e4d57af10907a471a2ee0970ce Author: allisonwang-db <allison.w...@databricks.com> AuthorDate: Tue Mar 28 16:43:02 2023 +0800 [SPARK-42928][SQL] Make resolvePersistentFunction synchronized ### What changes were proposed in this pull request? This PR makes the function `resolvePersistentFunctionInternal` synchronized. ### Why are the changes needed? To make function resolution thread-safe. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing UTs. Closes #40557 from allisonwang-db/SPARK-42928-sync-func. Authored-by: allisonwang-db <allison.w...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit f62ffde045771b3275acf3dfb24573804e7daf93) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/catalyst/catalog/SessionCatalog.scala | 56 ++++++++++++---------- 1 file changed, 31 insertions(+), 25 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 4f7196e2dc9..cd4b4cfaf6b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -1720,31 +1720,37 @@ class SessionCatalog( arguments: Seq[Expression], registry: FunctionRegistryBase[T], createFunctionBuilder: CatalogFunction => FunctionRegistryBase[T]#FunctionBuilder): T = { - val qualifiedIdent = qualifyIdentifier(name) - val db = qualifiedIdent.database.get - val funcName = qualifiedIdent.funcName - if (registry.functionExists(qualifiedIdent)) { - // This function has been already loaded into the function registry. - registry.lookupFunction(qualifiedIdent, arguments) - } else { - // The function has not been loaded to the function registry, which means - // that the function is a persistent function (if it actually has been registered - // in the metastore). We need to first put the function in the function registry. - val catalogFunction = externalCatalog.getFunction(db, funcName) - loadFunctionResources(catalogFunction.resources) - // Please note that qualifiedName is provided by the user. However, - // catalogFunction.identifier.unquotedString is returned by the underlying - // catalog. So, it is possible that qualifiedName is not exactly the same as - // catalogFunction.identifier.unquotedString (difference is on case-sensitivity). - // At here, we preserve the input from the user. - val funcMetadata = catalogFunction.copy(identifier = qualifiedIdent) - registerFunction( - funcMetadata, - overrideIfExists = false, - registry = registry, - functionBuilder = createFunctionBuilder(funcMetadata)) - // Now, we need to create the Expression. - registry.lookupFunction(qualifiedIdent, arguments) + // `synchronized` is used to prevent multiple threads from concurrently resolving the + // same function that has not yet been loaded into the function registry. This is needed + // because calling `registerFunction` twice with `overrideIfExists = false` can lead to + // a FunctionAlreadyExistsException. + synchronized { + val qualifiedIdent = qualifyIdentifier(name) + val db = qualifiedIdent.database.get + val funcName = qualifiedIdent.funcName + if (registry.functionExists(qualifiedIdent)) { + // This function has been already loaded into the function registry. + registry.lookupFunction(qualifiedIdent, arguments) + } else { + // The function has not been loaded to the function registry, which means + // that the function is a persistent function (if it actually has been registered + // in the metastore). We need to first put the function in the function registry. + val catalogFunction = externalCatalog.getFunction(db, funcName) + loadFunctionResources(catalogFunction.resources) + // Please note that qualifiedName is provided by the user. However, + // catalogFunction.identifier.unquotedString is returned by the underlying + // catalog. So, it is possible that qualifiedName is not exactly the same as + // catalogFunction.identifier.unquotedString (difference is on case-sensitivity). + // At here, we preserve the input from the user. + val funcMetadata = catalogFunction.copy(identifier = qualifiedIdent) + registerFunction( + funcMetadata, + overrideIfExists = false, + registry = registry, + functionBuilder = createFunctionBuilder(funcMetadata)) + // Now, we need to create the Expression. + registry.lookupFunction(qualifiedIdent, arguments) + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org