This is an automated email from the ASF dual-hosted git repository.

sarutak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6cc4c90  [SPARK-37391][SQL] JdbcConnectionProvider tells if it 
modifies security context
6cc4c90 is described below

commit 6cc4c90cbc09a7729f9c40f440fcdda83e3d8648
Author: Danny Guinther <dguint...@seismic.com>
AuthorDate: Fri Dec 24 10:07:16 2021 +0900

    [SPARK-37391][SQL] JdbcConnectionProvider tells if it modifies security 
context
    
    Augments the JdbcConnectionProvider API such that a provider can indicate 
that it will need to modify the global security configuration when establishing 
a connection, and as such, if access to the global security configuration 
should be synchronized to prevent races.
    
    ### What changes were proposed in this pull request?
    As suggested by gaborgsomogyi 
[here](https://github.com/apache/spark/pull/29024/files#r755788709), augments 
the `JdbcConnectionProvider` API to include a `modifiesSecurityContext` method 
that can be used by `ConnectionProvider` to determine when 
`SecurityConfigurationLock.synchronized` is required to avoid race conditions 
when establishing a JDBC connection.
    
    ### Why are the changes needed?
    Provides a path forward for working around a significant bottleneck 
introduced by synchronizing `SecurityConfigurationLock` every time a connection 
is established. The synchronization isn't always needed and it should be at the 
discretion of the `JdbcConnectionProvider` to determine when locking is 
necessary. See [SPARK-37391](https://issues.apache.org/jira/browse/SPARK-37391) 
or [this thread](https://github.com/apache/spark/pull/29024/files#r754441783).
    
    ### Does this PR introduce _any_ user-facing change?
    Any existing implementations of `JdbcConnectionProvider` will need to add a 
definition of `modifiesSecurityContext`. I'm also open to adding a default 
implementation, but it seemed to me that requiring an explicit implementation 
of the method was preferable.
    
    A drop-in implementation that would continue the existing behavior is:
    ```scala
    override def modifiesSecurityContext(
      driver: Driver,
      options: Map[String, String]
    ): Boolean = true
    ```
    
    ### How was this patch tested?
    Unit tests, but I also plan to run a real workflow once I get the initial 
thumbs up on this implementation.
    
    Closes #34745 from tdg5/SPARK-37391-opt-in-security-configuration-sync.
    
    Authored-by: Danny Guinther <dguint...@seismic.com>
    Signed-off-by: Kousuke Saruta <saru...@oss.nttdata.com>
---
 .../sql/jdbc/ExampleJdbcConnectionProvider.scala   |  5 ++
 project/MimaExcludes.scala                         |  5 +-
 .../jdbc/connection/BasicConnectionProvider.scala  |  8 ++++
 .../jdbc/connection/ConnectionProvider.scala       | 22 +++++----
 .../spark/sql/jdbc/JdbcConnectionProvider.scala    | 19 +++++++-
 .../main/scala/org/apache/spark/sql/jdbc/README.md |  5 +-
 .../jdbc/connection/ConnectionProviderSuite.scala  | 55 ++++++++++++++++++++++
 .../IntentionallyFaultyConnectionProvider.scala    |  4 ++
 8 files changed, 109 insertions(+), 14 deletions(-)

diff --git 
a/examples/src/main/scala/org/apache/spark/examples/sql/jdbc/ExampleJdbcConnectionProvider.scala
 
b/examples/src/main/scala/org/apache/spark/examples/sql/jdbc/ExampleJdbcConnectionProvider.scala
index 6d275d4..c63467d 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/sql/jdbc/ExampleJdbcConnectionProvider.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/sql/jdbc/ExampleJdbcConnectionProvider.scala
@@ -30,4 +30,9 @@ class ExampleJdbcConnectionProvider extends 
JdbcConnectionProvider with Logging
   override def canHandle(driver: Driver, options: Map[String, String]): 
Boolean = false
 
   override def getConnection(driver: Driver, options: Map[String, String]): 
Connection = null
+
+  override def modifiesSecurityContext(
+    driver: Driver,
+    options: Map[String, String]
+  ): Boolean = false
 }
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 75fa001..6cf639f 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -40,7 +40,10 @@ object MimaExcludes {
     // The followings are necessary for Scala 2.13.
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.CoarseGrainedExecutorBackend#Arguments.*"),
     
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.executor.CoarseGrainedExecutorBackend#Arguments.*"),
-    
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.executor.CoarseGrainedExecutorBackend$Arguments$")
+    
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.executor.CoarseGrainedExecutorBackend$Arguments$"),
+
+    // [SPARK-37391][SQL] JdbcConnectionProvider tells if it modifies security 
context
+    
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.jdbc.JdbcConnectionProvider.modifiesSecurityContext")
   )
 
   // Exclude rules for 3.2.x from 3.1.1
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/BasicConnectionProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/BasicConnectionProvider.scala
index 66854f2..aff91c5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/BasicConnectionProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/BasicConnectionProvider.scala
@@ -48,4 +48,12 @@ private[jdbc] class BasicConnectionProvider extends 
JdbcConnectionProvider with
     logDebug(s"JDBC connection initiated with URL: ${jdbcOptions.url} and 
properties: $properties")
     driver.connect(jdbcOptions.url, properties)
   }
+
+  override def modifiesSecurityContext(
+    driver: Driver,
+    options: Map[String, String]
+  ): Boolean = {
+    // BasicConnectionProvider is the default unsecure connection provider, so 
just return false
+    false
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala
index e3d8275..84a6269 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala
@@ -86,16 +86,20 @@ protected abstract class ConnectionProviderBase extends 
Logging {
         filteredProviders.head
     }
 
-    SecurityConfigurationLock.synchronized {
-      // Inside getConnection it's safe to get parent again because 
SecurityConfigurationLock
-      // makes sure it's untouched
-      val parent = Configuration.getConfiguration
-      try {
-        selectedProvider.getConnection(driver, options)
-      } finally {
-        logDebug("Restoring original security configuration")
-        Configuration.setConfiguration(parent)
+    if (selectedProvider.modifiesSecurityContext(driver, options)) {
+      SecurityConfigurationLock.synchronized {
+        // Inside getConnection it's safe to get parent again because 
SecurityConfigurationLock
+        // makes sure it's untouched
+        val parent = Configuration.getConfiguration
+        try {
+          selectedProvider.getConnection(driver, options)
+        } finally {
+          logDebug("Restoring original security configuration")
+          Configuration.setConfiguration(parent)
+        }
       }
+    } else {
+      selectedProvider.getConnection(driver, options)
     }
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcConnectionProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcConnectionProvider.scala
index 1e8abca..10eebf3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcConnectionProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcConnectionProvider.scala
@@ -53,12 +53,27 @@ abstract class JdbcConnectionProvider {
   def canHandle(driver: Driver, options: Map[String, String]): Boolean
 
   /**
-   * Opens connection toward the database. Since global JVM security 
configuration change may needed
-   * this API is called synchronized by `SecurityConfigurationLock` to avoid 
race.
+   * Opens connection to the database. Since global JVM security configuration 
change may be
+   * needed this API is called synchronized by `SecurityConfigurationLock` to 
avoid race when
+   * `modifiesSecurityContext` returns true for the given driver with the 
given options.
    *
    * @param driver  Java driver which initiates the connection
    * @param options Driver options which initiates the connection
    * @return a `Connection` object that represents a connection to the URL
    */
   def getConnection(driver: Driver, options: Map[String, String]): Connection
+
+  /**
+   * Checks if this connection provider instance needs to modify global 
security configuration to
+   * handle authentication and thus should synchronize access to the security 
configuration while
+   * the given driver is initiating a connection with the given options.
+   *
+   * @param driver  Java driver which initiates the connection
+   * @param options Driver options which initiates the connection
+   * @return True if the connection provider will need to modify the security 
configuration when
+   * initiating a connection with the given driver with the given options.
+   *
+   * @since 3.1.3
+   */
+  def modifiesSecurityContext(driver: Driver, options: Map[String, String]): 
Boolean
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/README.md 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/README.md
index 72196be..a09f9a5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/README.md
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/README.md
@@ -86,5 +86,6 @@ Implementation considerations:
 * CPs are running in heavy multi-threaded environment and adding a state into 
a CP is not advised.
   If any state added then it must be synchronized properly. It could cause 
quite some headache to
   hunt down such issues.
-* Some of the CPs are modifying the JVM global security context so 
`getConnection` method is
-  synchronized by `org.apache.spark.security.SecurityConfigurationLock` to 
avoid race.
+* Some of the CPs are modifying the JVM global security context so if the CP's
+  `modifiesSecurityContext` method returns `true` then the CP's 
`getConnection` method will
+  be called synchronized by 
`org.apache.spark.security.SecurityConfigurationLock` to avoid race.
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProviderSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProviderSuite.scala
index 6674483..0d7b133 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProviderSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProviderSuite.scala
@@ -23,6 +23,7 @@ import javax.security.auth.login.Configuration
 import org.scalatestplus.mockito.MockitoSugar
 
 import org.apache.spark.SparkConf
+import org.apache.spark.security.SecurityConfigurationLock
 import org.apache.spark.sql.internal.StaticSQLConf
 import org.apache.spark.sql.jdbc.JdbcConnectionProvider
 import org.apache.spark.sql.test.SharedSparkSession
@@ -68,12 +69,20 @@ class ConnectionProviderSuite
       override def canHandle(driver: Driver, options: Map[String, String]): 
Boolean = true
       override def getConnection(driver: Driver, options: Map[String, 
String]): Connection =
         throw new RuntimeException()
+      override def modifiesSecurityContext(
+        driver: Driver,
+        options: Map[String, String]
+      ): Boolean = false
     }
     val provider2 = new JdbcConnectionProvider() {
       override val name: String = "test2"
       override def canHandle(driver: Driver, options: Map[String, String]): 
Boolean = true
       override def getConnection(driver: Driver, options: Map[String, 
String]): Connection =
         throw new RuntimeException()
+      override def modifiesSecurityContext(
+        driver: Driver,
+        options: Map[String, String]
+      ): Boolean = false
     }
 
     val providerBase = new ConnectionProviderBase() {
@@ -92,12 +101,20 @@ class ConnectionProviderSuite
       override def canHandle(driver: Driver, options: Map[String, String]): 
Boolean = true
       override def getConnection(driver: Driver, options: Map[String, 
String]): Connection =
         throw new RuntimeException()
+      override def modifiesSecurityContext(
+        driver: Driver,
+        options: Map[String, String]
+      ): Boolean = false
     }
     val provider2 = new JdbcConnectionProvider() {
       override val name: String = "test2"
       override def canHandle(driver: Driver, options: Map[String, String]): 
Boolean = true
       override def getConnection(driver: Driver, options: Map[String, 
String]): Connection =
         mock[Connection]
+      override def modifiesSecurityContext(
+        driver: Driver,
+        options: Map[String, String]
+      ): Boolean = false
     }
 
     val providerBase = new ConnectionProviderBase() {
@@ -107,12 +124,50 @@ class ConnectionProviderSuite
     assert(providerBase.create(mock[Driver], Map.empty, 
Some("test2")).isInstanceOf[Connection])
   }
 
+  test("Synchronize on SecurityConfigurationLock when the specified connection 
provider needs") {
+    val provider1 = new JdbcConnectionProvider() {
+      override val name: String = "test1"
+      override def canHandle(driver: Driver, options: Map[String, String]): 
Boolean = true
+      override def getConnection(driver: Driver, options: Map[String, 
String]): Connection = {
+        assert(Thread.holdsLock(SecurityConfigurationLock))
+        mock[Connection]
+      }
+      override def modifiesSecurityContext(
+        driver: Driver,
+        options: Map[String, String]
+      ): Boolean = true
+    }
+    val provider2 = new JdbcConnectionProvider() {
+      override val name: String = "test2"
+      override def canHandle(driver: Driver, options: Map[String, String]): 
Boolean = true
+      override def getConnection(driver: Driver, options: Map[String, 
String]): Connection = {
+        assert(!Thread.holdsLock(SecurityConfigurationLock))
+        mock[Connection]
+      }
+      override def modifiesSecurityContext(
+        driver: Driver,
+        options: Map[String, String]
+      ): Boolean = false
+    }
+
+    val providerBase = new ConnectionProviderBase() {
+      override val providers = Seq(provider1, provider2)
+    }
+    // We don't expect any exceptions or null here
+    assert(providerBase.create(mock[Driver], Map.empty, 
Some("test1")).isInstanceOf[Connection])
+    assert(providerBase.create(mock[Driver], Map.empty, 
Some("test2")).isInstanceOf[Connection])
+  }
+
   test("Throw an error when user specified provider that does not exist") {
     val provider = new JdbcConnectionProvider() {
       override val name: String = "provider"
       override def canHandle(driver: Driver, options: Map[String, String]): 
Boolean = true
       override def getConnection(driver: Driver, options: Map[String, 
String]): Connection =
         throw new RuntimeException()
+      override def modifiesSecurityContext(
+        driver: Driver,
+        options: Map[String, String]
+      ): Boolean = false
     }
 
     val providerBase = new ConnectionProviderBase() {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/IntentionallyFaultyConnectionProvider.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/IntentionallyFaultyConnectionProvider.scala
index 329d79c..f5d6d34 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/IntentionallyFaultyConnectionProvider.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/IntentionallyFaultyConnectionProvider.scala
@@ -27,6 +27,10 @@ private class IntentionallyFaultyConnectionProvider extends 
JdbcConnectionProvid
   override val name: String = "IntentionallyFaultyConnectionProvider"
   override def canHandle(driver: Driver, options: Map[String, String]): 
Boolean = true
   override def getConnection(driver: Driver, options: Map[String, String]): 
Connection = null
+  override def modifiesSecurityContext(
+    driver: Driver,
+    options: Map[String, String]
+  ): Boolean = false
 }
 
 private object IntentionallyFaultyConnectionProvider {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to