This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 6b12b317e42d [SPARK-48447][SS] Check state store provider class before invoking the constructor 6b12b317e42d is described below commit 6b12b317e42d0ffd426ef0809c77225298b535de Author: Yuchen Liu <yuchen....@databricks.com> AuthorDate: Mon Jun 3 12:57:15 2024 +0900 [SPARK-48447][SS] Check state store provider class before invoking the constructor ### What changes were proposed in this pull request? This PR adds a validity check to the StateStoreProvider class before calling its constructor. A new runtime exception was created to report this issue. ### Why are the changes needed? This is a security improvement to a user-facing API. Users are only allowed to use class that extends `org.apache.spark.sql.execution.streaming.state.StateStoreProvider` as StateStoreProvider. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? This PR comes with a new test to test that invalid class cannot pass the security check. The fact that other tests in StreamingQuerySuite can pass implies that valid class can pass the check. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46791 from eason-yuchen-liu/check-statestoreprovider. Authored-by: Yuchen Liu <yuchen....@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../src/main/resources/error/error-conditions.json | 6 ++++++ .../sql/execution/streaming/state/StateStore.scala | 8 +++++++- .../spark/sql/streaming/StreamingQuerySuite.scala | 17 +++++++++++++++++ 3 files changed, 30 insertions(+), 1 deletion(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 3dd7a6d65d7f..69965e58fb79 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -3718,6 +3718,12 @@ ], "sqlState" : "42802" }, + "STATE_STORE_INVALID_PROVIDER" : { + "message" : [ + "The given State Store Provider <inputClass> does not extend org.apache.spark.sql.execution.streaming.state.StateStoreProvider." + ], + "sqlState" : "42K06" + }, "STATE_STORE_NULL_TYPE_ORDERING_COLS_NOT_SUPPORTED" : { "message" : [ "Null type ordering column with name=<fieldName> at index=<index> is not supported for range scan encoder." diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 8c2170abe311..b59fe65fb14a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -29,7 +29,7 @@ import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.spark.{SparkContext, SparkEnv, SparkUnsupportedOperationException} +import org.apache.spark.{SparkContext, SparkEnv, SparkException, SparkUnsupportedOperationException} import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.util.UnsafeRowUtils @@ -396,6 +396,12 @@ object StateStoreProvider { */ def create(providerClassName: String): StateStoreProvider = { val providerClass = Utils.classForName(providerClassName) + if (!classOf[StateStoreProvider].isAssignableFrom(providerClass)) { + throw new SparkException( + errorClass = "STATE_STORE_INVALID_PROVIDER", + messageParameters = Map("inputClass" -> providerClassName), + cause = null) + } providerClass.getConstructor().newInstance().asInstanceOf[StateStoreProvider] } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 8b761c24b604..fb118adbe221 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -1432,6 +1432,23 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi } } + test("SPARK-48447: check state store provider class before invoking the constructor") { + withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[Object].getCanonicalName) { + val input = MemoryStream[Int] + input.addData(1) + val query = input.toDF().limit(2).writeStream + .trigger(Trigger.AvailableNow()) + .format("console") + .start() + val ex = intercept[StreamingQueryException] { + query.processAllAvailable() + } + assert(ex.getMessage.contains( + s"The given State Store Provider ${classOf[Object].getCanonicalName} does not " + + "extend org.apache.spark.sql.execution.streaming.state.StateStoreProvider.")) + } + } + private def checkExceptionMessage(df: DataFrame): Unit = { withTempDir { outputDir => withTempDir { checkpointDir => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org