srielau commented on code in PR #55977:
URL: https://github.com/apache/spark/pull/55977#discussion_r3267795313


##########
sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala:
##########
@@ -977,6 +977,83 @@ class SetPathSuite extends SharedSparkSession {
     }
   }
 
+  test("SPARK-56939: concurrent USE SCHEMA and unqualified function lookups do 
not deadlock") {
+    // Regression for SPARK-56939. Prior to the fix, 
[[CatalogManager.setCurrentNamespace]]
+    // held the manager's intrinsic lock while calling into
+    // [[SessionCatalog.setCurrentDatabaseWithNameCheck]] (which takes the 
catalog's
+    // intrinsic lock), while concurrent unqualified function resolution 
acquired the
+    // catalog's intrinsic lock and then reached back into the manager via
+    // [[CatalogManager.sqlResolutionPathEntries]]. That lock-order inversion 
deadlocked the
+    // session whenever a `USE SCHEMA` raced with any unqualified function 
reference.
+    //
+    // The hazard is independent of [[SQLConf.PATH_ENABLED]] and the
+    // resolution-order setting, so this test exercises the default 
configuration.
+    sql("CREATE SCHEMA IF NOT EXISTS spark_56939_s1")
+    sql("CREATE SCHEMA IF NOT EXISTS spark_56939_s2")
+    try {
+      val budget = 200
+      val iterations = new java.util.concurrent.atomic.AtomicInteger(0)
+      val barrier = new java.util.concurrent.CyclicBarrier(2)
+      val errors = new java.util.concurrent.ConcurrentLinkedQueue[Throwable]()
+
+      val useThread = new Thread(() => {
+        try {
+          barrier.await()
+          var i = 0
+          while (i < budget && errors.isEmpty) {
+            sql(if ((i % 2) == 0) "USE SCHEMA spark_56939_s1" else "USE SCHEMA 
spark_56939_s2")

Review Comment:
   Added a third thread (ecf913ab) that toggles `USE spark_56939_testcat` <-> 
`USE spark_catalog`, exercising the `setCurrentCatalog` arm symmetrically with 
the existing `USE SCHEMA` thread. Registered an `InMemoryCatalog` under 
`spark_56939_testcat` in the test setup. The schema-thread now tolerates 
`NoSuchNamespaceException` since the catalog-thread can switch to the v2 
testcat (which doesn't have those schemas) -- that's an expected interleaving 
outcome, not a deadlock symptom. Test completes in ~5s on the default config.
   
   _Aside on the syntax: `USE CATALOG <name>` isn't a real Spark SQL form; the 
grammar has just `USE identifierReference` and resolves a single identifier to 
a catalog if one is registered. So the test uses `USE $v2Catalog` / `USE 
spark_catalog` (parser-accepted) while still describing the operation as "USE 
CATALOG" in the test name/comments since that is the conceptual operation._



##########
sql/core/src/test/scala/org/apache/spark/sql/SetPathSuite.scala:
##########
@@ -977,6 +977,83 @@ class SetPathSuite extends SharedSparkSession {
     }
   }
 
+  test("SPARK-56939: concurrent USE SCHEMA and unqualified function lookups do 
not deadlock") {
+    // Regression for SPARK-56939. Prior to the fix, 
[[CatalogManager.setCurrentNamespace]]
+    // held the manager's intrinsic lock while calling into
+    // [[SessionCatalog.setCurrentDatabaseWithNameCheck]] (which takes the 
catalog's
+    // intrinsic lock), while concurrent unqualified function resolution 
acquired the
+    // catalog's intrinsic lock and then reached back into the manager via
+    // [[CatalogManager.sqlResolutionPathEntries]]. That lock-order inversion 
deadlocked the
+    // session whenever a `USE SCHEMA` raced with any unqualified function 
reference.
+    //
+    // The hazard is independent of [[SQLConf.PATH_ENABLED]] and the
+    // resolution-order setting, so this test exercises the default 
configuration.
+    sql("CREATE SCHEMA IF NOT EXISTS spark_56939_s1")
+    sql("CREATE SCHEMA IF NOT EXISTS spark_56939_s2")
+    try {
+      val budget = 200
+      val iterations = new java.util.concurrent.atomic.AtomicInteger(0)
+      val barrier = new java.util.concurrent.CyclicBarrier(2)
+      val errors = new java.util.concurrent.ConcurrentLinkedQueue[Throwable]()
+
+      val useThread = new Thread(() => {
+        try {
+          barrier.await()
+          var i = 0
+          while (i < budget && errors.isEmpty) {
+            sql(if ((i % 2) == 0) "USE SCHEMA spark_56939_s1" else "USE SCHEMA 
spark_56939_s2")
+            i += 1
+          }
+        } catch {
+          case t: Throwable => errors.add(t)
+        }
+      }, "SPARK-56939-use-schema")
+
+      val lookupThread = new Thread(() => {
+        try {
+          barrier.await()
+          var i = 0
+          while (i < budget && errors.isEmpty) {
+            // Unqualified `count(*)` exercises the kinds-order provider that 
resolves
+            // against the live PATH via [[CatalogManager]] -- the side of the 
cycle
+            // that previously acquired the catalog lock first and then the 
manager lock.
+            val n = sql("SELECT count(*) FROM VALUES (1), (2), (3) AS t(a)")
+              .head().getLong(0)
+            assert(n == 3L, s"unexpected count: $n at iteration $i")
+            iterations.incrementAndGet()
+            i += 1
+          }
+        } catch {
+          case t: Throwable => errors.add(t)
+        }
+      }, "SPARK-56939-lookup")
+
+      useThread.start()
+      lookupThread.start()
+
+      // Generous join: 30s is plenty for 200 cheap queries on either side and 
gives a
+      // clear failure signal if the implementation regresses into a deadlock.
+      val joinMillis = 30000L
+      useThread.join(joinMillis)
+      lookupThread.join(joinMillis)
+
+      assert(!useThread.isAlive,
+        "USE SCHEMA thread did not finish; lock-order inversion between 
SessionCatalog and " +
+          "CatalogManager likely returned (SPARK-56939).")

Review Comment:
   Applied (ecf913ab) at both call sites.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to