This is an automated email from the ASF dual-hosted git repository. sarutak pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 6cc4c90 [SPARK-37391][SQL] JdbcConnectionProvider tells if it modifies security context 6cc4c90 is described below commit 6cc4c90cbc09a7729f9c40f440fcdda83e3d8648 Author: Danny Guinther <dguint...@seismic.com> AuthorDate: Fri Dec 24 10:07:16 2021 +0900 [SPARK-37391][SQL] JdbcConnectionProvider tells if it modifies security context Augments the JdbcConnectionProvider API such that a provider can indicate that it will need to modify the global security configuration when establishing a connection, and as such, if access to the global security configuration should be synchronized to prevent races. ### What changes were proposed in this pull request? As suggested by gaborgsomogyi [here](https://github.com/apache/spark/pull/29024/files#r755788709), augments the `JdbcConnectionProvider` API to include a `modifiesSecurityContext` method that can be used by `ConnectionProvider` to determine when `SecurityConfigurationLock.synchronized` is required to avoid race conditions when establishing a JDBC connection. ### Why are the changes needed? Provides a path forward for working around a significant bottleneck introduced by synchronizing `SecurityConfigurationLock` every time a connection is established. The synchronization isn't always needed and it should be at the discretion of the `JdbcConnectionProvider` to determine when locking is necessary. See [SPARK-37391](https://issues.apache.org/jira/browse/SPARK-37391) or [this thread](https://github.com/apache/spark/pull/29024/files#r754441783). ### Does this PR introduce _any_ user-facing change? Any existing implementations of `JdbcConnectionProvider` will need to add a definition of `modifiesSecurityContext`. I'm also open to adding a default implementation, but it seemed to me that requiring an explicit implementation of the method was preferable. A drop-in implementation that would continue the existing behavior is: ```scala override def modifiesSecurityContext( driver: Driver, options: Map[String, String] ): Boolean = true ``` ### How was this patch tested? Unit tests, but I also plan to run a real workflow once I get the initial thumbs up on this implementation. Closes #34745 from tdg5/SPARK-37391-opt-in-security-configuration-sync. Authored-by: Danny Guinther <dguint...@seismic.com> Signed-off-by: Kousuke Saruta <saru...@oss.nttdata.com> --- .../sql/jdbc/ExampleJdbcConnectionProvider.scala | 5 ++ project/MimaExcludes.scala | 5 +- .../jdbc/connection/BasicConnectionProvider.scala | 8 ++++ .../jdbc/connection/ConnectionProvider.scala | 22 +++++---- .../spark/sql/jdbc/JdbcConnectionProvider.scala | 19 +++++++- .../main/scala/org/apache/spark/sql/jdbc/README.md | 5 +- .../jdbc/connection/ConnectionProviderSuite.scala | 55 ++++++++++++++++++++++ .../IntentionallyFaultyConnectionProvider.scala | 4 ++ 8 files changed, 109 insertions(+), 14 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/jdbc/ExampleJdbcConnectionProvider.scala b/examples/src/main/scala/org/apache/spark/examples/sql/jdbc/ExampleJdbcConnectionProvider.scala index 6d275d4..c63467d 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/jdbc/ExampleJdbcConnectionProvider.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/jdbc/ExampleJdbcConnectionProvider.scala @@ -30,4 +30,9 @@ class ExampleJdbcConnectionProvider extends JdbcConnectionProvider with Logging override def canHandle(driver: Driver, options: Map[String, String]): Boolean = false override def getConnection(driver: Driver, options: Map[String, String]): Connection = null + + override def modifiesSecurityContext( + driver: Driver, + options: Map[String, String] + ): Boolean = false } diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 75fa001..6cf639f 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -40,7 +40,10 @@ object MimaExcludes { // The followings are necessary for Scala 2.13. ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.CoarseGrainedExecutorBackend#Arguments.*"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.executor.CoarseGrainedExecutorBackend#Arguments.*"), - ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.executor.CoarseGrainedExecutorBackend$Arguments$") + ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.executor.CoarseGrainedExecutorBackend$Arguments$"), + + // [SPARK-37391][SQL] JdbcConnectionProvider tells if it modifies security context + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.jdbc.JdbcConnectionProvider.modifiesSecurityContext") ) // Exclude rules for 3.2.x from 3.1.1 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/BasicConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/BasicConnectionProvider.scala index 66854f2..aff91c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/BasicConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/BasicConnectionProvider.scala @@ -48,4 +48,12 @@ private[jdbc] class BasicConnectionProvider extends JdbcConnectionProvider with logDebug(s"JDBC connection initiated with URL: ${jdbcOptions.url} and properties: $properties") driver.connect(jdbcOptions.url, properties) } + + override def modifiesSecurityContext( + driver: Driver, + options: Map[String, String] + ): Boolean = { + // BasicConnectionProvider is the default unsecure connection provider, so just return false + false + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala index e3d8275..84a6269 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProvider.scala @@ -86,16 +86,20 @@ protected abstract class ConnectionProviderBase extends Logging { filteredProviders.head } - SecurityConfigurationLock.synchronized { - // Inside getConnection it's safe to get parent again because SecurityConfigurationLock - // makes sure it's untouched - val parent = Configuration.getConfiguration - try { - selectedProvider.getConnection(driver, options) - } finally { - logDebug("Restoring original security configuration") - Configuration.setConfiguration(parent) + if (selectedProvider.modifiesSecurityContext(driver, options)) { + SecurityConfigurationLock.synchronized { + // Inside getConnection it's safe to get parent again because SecurityConfigurationLock + // makes sure it's untouched + val parent = Configuration.getConfiguration + try { + selectedProvider.getConnection(driver, options) + } finally { + logDebug("Restoring original security configuration") + Configuration.setConfiguration(parent) + } } + } else { + selectedProvider.getConnection(driver, options) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcConnectionProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcConnectionProvider.scala index 1e8abca..10eebf3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcConnectionProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcConnectionProvider.scala @@ -53,12 +53,27 @@ abstract class JdbcConnectionProvider { def canHandle(driver: Driver, options: Map[String, String]): Boolean /** - * Opens connection toward the database. Since global JVM security configuration change may needed - * this API is called synchronized by `SecurityConfigurationLock` to avoid race. + * Opens connection to the database. Since global JVM security configuration change may be + * needed this API is called synchronized by `SecurityConfigurationLock` to avoid race when + * `modifiesSecurityContext` returns true for the given driver with the given options. * * @param driver Java driver which initiates the connection * @param options Driver options which initiates the connection * @return a `Connection` object that represents a connection to the URL */ def getConnection(driver: Driver, options: Map[String, String]): Connection + + /** + * Checks if this connection provider instance needs to modify global security configuration to + * handle authentication and thus should synchronize access to the security configuration while + * the given driver is initiating a connection with the given options. + * + * @param driver Java driver which initiates the connection + * @param options Driver options which initiates the connection + * @return True if the connection provider will need to modify the security configuration when + * initiating a connection with the given driver with the given options. + * + * @since 3.1.3 + */ + def modifiesSecurityContext(driver: Driver, options: Map[String, String]): Boolean } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/README.md b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/README.md index 72196be..a09f9a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/README.md +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/README.md @@ -86,5 +86,6 @@ Implementation considerations: * CPs are running in heavy multi-threaded environment and adding a state into a CP is not advised. If any state added then it must be synchronized properly. It could cause quite some headache to hunt down such issues. -* Some of the CPs are modifying the JVM global security context so `getConnection` method is - synchronized by `org.apache.spark.security.SecurityConfigurationLock` to avoid race. +* Some of the CPs are modifying the JVM global security context so if the CP's + `modifiesSecurityContext` method returns `true` then the CP's `getConnection` method will + be called synchronized by `org.apache.spark.security.SecurityConfigurationLock` to avoid race. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProviderSuite.scala index 6674483..0d7b133 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/ConnectionProviderSuite.scala @@ -23,6 +23,7 @@ import javax.security.auth.login.Configuration import org.scalatestplus.mockito.MockitoSugar import org.apache.spark.SparkConf +import org.apache.spark.security.SecurityConfigurationLock import org.apache.spark.sql.internal.StaticSQLConf import org.apache.spark.sql.jdbc.JdbcConnectionProvider import org.apache.spark.sql.test.SharedSparkSession @@ -68,12 +69,20 @@ class ConnectionProviderSuite override def canHandle(driver: Driver, options: Map[String, String]): Boolean = true override def getConnection(driver: Driver, options: Map[String, String]): Connection = throw new RuntimeException() + override def modifiesSecurityContext( + driver: Driver, + options: Map[String, String] + ): Boolean = false } val provider2 = new JdbcConnectionProvider() { override val name: String = "test2" override def canHandle(driver: Driver, options: Map[String, String]): Boolean = true override def getConnection(driver: Driver, options: Map[String, String]): Connection = throw new RuntimeException() + override def modifiesSecurityContext( + driver: Driver, + options: Map[String, String] + ): Boolean = false } val providerBase = new ConnectionProviderBase() { @@ -92,12 +101,20 @@ class ConnectionProviderSuite override def canHandle(driver: Driver, options: Map[String, String]): Boolean = true override def getConnection(driver: Driver, options: Map[String, String]): Connection = throw new RuntimeException() + override def modifiesSecurityContext( + driver: Driver, + options: Map[String, String] + ): Boolean = false } val provider2 = new JdbcConnectionProvider() { override val name: String = "test2" override def canHandle(driver: Driver, options: Map[String, String]): Boolean = true override def getConnection(driver: Driver, options: Map[String, String]): Connection = mock[Connection] + override def modifiesSecurityContext( + driver: Driver, + options: Map[String, String] + ): Boolean = false } val providerBase = new ConnectionProviderBase() { @@ -107,12 +124,50 @@ class ConnectionProviderSuite assert(providerBase.create(mock[Driver], Map.empty, Some("test2")).isInstanceOf[Connection]) } + test("Synchronize on SecurityConfigurationLock when the specified connection provider needs") { + val provider1 = new JdbcConnectionProvider() { + override val name: String = "test1" + override def canHandle(driver: Driver, options: Map[String, String]): Boolean = true + override def getConnection(driver: Driver, options: Map[String, String]): Connection = { + assert(Thread.holdsLock(SecurityConfigurationLock)) + mock[Connection] + } + override def modifiesSecurityContext( + driver: Driver, + options: Map[String, String] + ): Boolean = true + } + val provider2 = new JdbcConnectionProvider() { + override val name: String = "test2" + override def canHandle(driver: Driver, options: Map[String, String]): Boolean = true + override def getConnection(driver: Driver, options: Map[String, String]): Connection = { + assert(!Thread.holdsLock(SecurityConfigurationLock)) + mock[Connection] + } + override def modifiesSecurityContext( + driver: Driver, + options: Map[String, String] + ): Boolean = false + } + + val providerBase = new ConnectionProviderBase() { + override val providers = Seq(provider1, provider2) + } + // We don't expect any exceptions or null here + assert(providerBase.create(mock[Driver], Map.empty, Some("test1")).isInstanceOf[Connection]) + assert(providerBase.create(mock[Driver], Map.empty, Some("test2")).isInstanceOf[Connection]) + } + test("Throw an error when user specified provider that does not exist") { val provider = new JdbcConnectionProvider() { override val name: String = "provider" override def canHandle(driver: Driver, options: Map[String, String]): Boolean = true override def getConnection(driver: Driver, options: Map[String, String]): Connection = throw new RuntimeException() + override def modifiesSecurityContext( + driver: Driver, + options: Map[String, String] + ): Boolean = false } val providerBase = new ConnectionProviderBase() { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/IntentionallyFaultyConnectionProvider.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/IntentionallyFaultyConnectionProvider.scala index 329d79c..f5d6d34 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/IntentionallyFaultyConnectionProvider.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/connection/IntentionallyFaultyConnectionProvider.scala @@ -27,6 +27,10 @@ private class IntentionallyFaultyConnectionProvider extends JdbcConnectionProvid override val name: String = "IntentionallyFaultyConnectionProvider" override def canHandle(driver: Driver, options: Map[String, String]): Boolean = true override def getConnection(driver: Driver, options: Map[String, String]): Connection = null + override def modifiesSecurityContext( + driver: Driver, + options: Map[String, String] + ): Boolean = false } private object IntentionallyFaultyConnectionProvider { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org