This is an automated email from the ASF dual-hosted git repository. attilapiros pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 02898338cffc [SPARK-52988][SQL] Fix race conditions at CREATE TABLE and FUNCTION when IF NOT EXISTS is used 02898338cffc is described below commit 02898338cffc8c7a16d333257cadb817d85aff65 Author: attilapiros <piros.attila.zs...@gmail.com> AuthorDate: Thu Aug 14 11:25:39 2025 -0700 [SPARK-52988][SQL] Fix race conditions at CREATE TABLE and FUNCTION when IF NOT EXISTS is used ### What changes were proposed in this pull request? Fixing race conditions at create table and create function when IF NOT EXISTS is given. ### Why are the changes needed? Even when "CREATE FUNCTION IF NOT EXISTS" is used in parallel can fail with the following exception: ``` 2025-07-25 01:22:21,731 [AA-Rule-ThreadPoolExec-2] ERROR ***** - An error occured : org.apache.spark.sql.AnalysisException: Function default.SparkTestUDF already exists; line 1 pos 6734 at org.apache.spark.sql.errors.QueryCompilationErrors$.functionAlreadyExistsError(QueryCompilationErrors.scala:654) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.registerFunction(SessionCatalog.scala:1487) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.resolvePersistentFunctionInternal(SessionCatalog.scala:1719) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.resolvePersistentFunction(SessionCatalog.scala:1675) at ... ``` Regarding `CREATE TABLE`: ``` scala> import scala.collection.parallel.CollectionConverters._ import scala.collection.parallel.CollectionConverters._ scala> (1 to 5).toList.par.foreach(_ => spark.sql("create table if not exists spark52988(a int)")) | 25/08/11 15:47:18 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0 25/08/11 15:47:18 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore apiros10.96.131.100 25/08/11 15:47:18 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException 25/08/11 15:47:19 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory. 25/08/11 15:47:19 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException: [TABLE_OR_VIEW_ALREADY_EXISTS] Cannot create table or view `default`.`spark52988` because it already exists. Choose a different name, drop or replace the existing object, or add the IF NOT EXISTS clause to tolerate pre-existing objects. SQLSTATE: 42P07 at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$createTable$1(HiveExternalCatalog.scala:226) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:105) at org.apache.spark.sql.hive.HiveExternalCatalog.createTable(HiveExternalCatalog.scala:218) at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.createTable(ExternalCatalogWithListener.scala:94) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createTable(SessionCatalog.scala:422) at org.apache.spark.sql.execution.command.CreateDataSourceTableCommand.run(createDataSourceTables.scala:123) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:79) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:77).... ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually. `CREATE FUNCTION` after this change: ``` scala> (1 to 100).foreach { j => (1 to 25).toList.par.foreach(_ => spark.sql(s"create function if not exists f$j(i int) returns int return i * i")) } 25/08/12 10:34:20 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist 25/08/12 10:34:20 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist 25/08/12 10:34:20 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #51696 from attilapiros/SPARK-52988. Authored-by: attilapiros <piros.attila.zs...@gmail.com> Signed-off-by: attilapiros <piros.attila.zs...@gmail.com> --- .../org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 7 ++++++- .../spark/sql/execution/command/createDataSourceTables.scala | 4 +--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 0803b7546f76..6cc8d6c0ea4a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -1479,7 +1479,12 @@ class SessionCatalog( requireDbExists(db) val newFuncDefinition = funcDefinition.copy(identifier = qualifiedIdent) if (!functionExists(qualifiedIdent)) { - externalCatalog.createFunction(db, newFuncDefinition) + try { + externalCatalog.createFunction(db, newFuncDefinition) + } catch { + case e: FunctionAlreadyExistsException if ignoreIfExists => + // Ignore the exception as ignoreIfNotExists is set to true + } } else if (!ignoreIfExists) { throw new FunctionAlreadyExistsException(Seq(db, qualifiedIdent.funcName)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 785dff243f7e..5ef19b832f5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -118,9 +118,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo } - // We will return Nil or throw exception at the beginning if the table already exists, so when - // we reach here, the table should not exist and we should set `ignoreIfExists` to false. - sessionState.catalog.createTable(newTable, ignoreIfExists = false) + sessionState.catalog.createTable(newTable, ignoreIfExists) Seq.empty[Row] } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org