This is an automated email from the ASF dual-hosted git repository.

attilapiros pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 02898338cffc [SPARK-52988][SQL] Fix race conditions at CREATE TABLE 
and FUNCTION when IF NOT EXISTS is used
02898338cffc is described below

commit 02898338cffc8c7a16d333257cadb817d85aff65
Author: attilapiros <piros.attila.zs...@gmail.com>
AuthorDate: Thu Aug 14 11:25:39 2025 -0700

    [SPARK-52988][SQL] Fix race conditions at CREATE TABLE and FUNCTION when IF 
NOT EXISTS is used
    
    ### What changes were proposed in this pull request?
    
    Fixing race conditions at create table and create function when IF NOT 
EXISTS is given.
    
    ### Why are the changes needed?
    
    Even when "CREATE FUNCTION IF NOT EXISTS" is used in parallel can fail with 
the following exception:
    
    ```
    2025-07-25 01:22:21,731 [AA-Rule-ThreadPoolExec-2] ERROR ***** - An error 
occured :
    org.apache.spark.sql.AnalysisException: Function default.SparkTestUDF 
already exists; line 1 pos 6734
            at 
org.apache.spark.sql.errors.QueryCompilationErrors$.functionAlreadyExistsError(QueryCompilationErrors.scala:654)
            at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.registerFunction(SessionCatalog.scala:1487)
            at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.resolvePersistentFunctionInternal(SessionCatalog.scala:1719)
            at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.resolvePersistentFunction(SessionCatalog.scala:1675)
            at ...
    ```
    
    Regarding `CREATE TABLE`:
    
    ```
    scala> import scala.collection.parallel.CollectionConverters._
    import scala.collection.parallel.CollectionConverters._
    
    scala> (1 to 5).toList.par.foreach(_ => spark.sql("create table if not 
exists spark52988(a int)"))
         |
    25/08/11 15:47:18 WARN ObjectStore: Version information not found in 
metastore. hive.metastore.schema.verification is not enabled so recording the 
schema version 2.3.0
    25/08/11 15:47:18 WARN ObjectStore: setMetaStoreSchemaVersion called but 
recording version is disabled: version = 2.3.0, comment = Set by MetaStore 
apiros10.96.131.100
    25/08/11 15:47:18 WARN ObjectStore: Failed to get database default, 
returning NoSuchObjectException
    25/08/11 15:47:19 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, 
since hive.security.authorization.manager is set to instance of 
HiveAuthorizerFactory.
    25/08/11 15:47:19 WARN HiveConf: HiveConf of name 
hive.internal.ss.authz.settings.applied.marker does not exist
    org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException: 
[TABLE_OR_VIEW_ALREADY_EXISTS] Cannot create table or view 
`default`.`spark52988` because it already exists.
    Choose a different name, drop or replace the existing object, or add the IF 
NOT EXISTS clause to tolerate pre-existing objects. SQLSTATE: 42P07
      at 
org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$createTable$1(HiveExternalCatalog.scala:226)
      at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
      at 
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:105)
      at 
org.apache.spark.sql.hive.HiveExternalCatalog.createTable(HiveExternalCatalog.scala:218)
      at 
org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.createTable(ExternalCatalogWithListener.scala:94)
      at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.createTable(SessionCatalog.scala:422)
      at 
org.apache.spark.sql.execution.command.CreateDataSourceTableCommand.run(createDataSourceTables.scala:123)
      at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:79)
      at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:77)....
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Manually.
    
    `CREATE FUNCTION` after this change:
    ```
    scala> (1 to 100).foreach { j => (1 to 25).toList.par.foreach(_ => 
spark.sql(s"create function if not exists f$j(i int) returns int return i * 
i")) }
    25/08/12 10:34:20 WARN HiveConf: HiveConf of name 
hive.internal.ss.authz.settings.applied.marker does not exist
    25/08/12 10:34:20 WARN HiveConf: HiveConf of name 
hive.internal.ss.authz.settings.applied.marker does not exist
    25/08/12 10:34:20 WARN HiveConf: HiveConf of name 
hive.internal.ss.authz.settings.applied.marker does not exist
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #51696 from attilapiros/SPARK-52988.
    
    Authored-by: attilapiros <piros.attila.zs...@gmail.com>
    Signed-off-by: attilapiros <piros.attila.zs...@gmail.com>
---
 .../org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala     | 7 ++++++-
 .../spark/sql/execution/command/createDataSourceTables.scala       | 4 +---
 2 files changed, 7 insertions(+), 4 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 0803b7546f76..6cc8d6c0ea4a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -1479,7 +1479,12 @@ class SessionCatalog(
     requireDbExists(db)
     val newFuncDefinition = funcDefinition.copy(identifier = qualifiedIdent)
     if (!functionExists(qualifiedIdent)) {
-      externalCatalog.createFunction(db, newFuncDefinition)
+      try {
+        externalCatalog.createFunction(db, newFuncDefinition)
+      } catch {
+        case e: FunctionAlreadyExistsException if ignoreIfExists =>
+          // Ignore the exception as ignoreIfNotExists is set to true
+      }
     } else if (!ignoreIfExists) {
       throw new FunctionAlreadyExistsException(Seq(db, 
qualifiedIdent.funcName))
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 785dff243f7e..5ef19b832f5b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -118,9 +118,7 @@ case class CreateDataSourceTableCommand(table: 
CatalogTable, ignoreIfExists: Boo
 
     }
 
-    // We will return Nil or throw exception at the beginning if the table 
already exists, so when
-    // we reach here, the table should not exist and we should set 
`ignoreIfExists` to false.
-    sessionState.catalog.createTable(newTable, ignoreIfExists = false)
+    sessionState.catalog.createTable(newTable, ignoreIfExists)
 
     Seq.empty[Row]
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to