This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b12b317e42d [SPARK-48447][SS] Check state store provider class before 
invoking the constructor
6b12b317e42d is described below

commit 6b12b317e42d0ffd426ef0809c77225298b535de
Author: Yuchen Liu <yuchen....@databricks.com>
AuthorDate: Mon Jun 3 12:57:15 2024 +0900

    [SPARK-48447][SS] Check state store provider class before invoking the 
constructor
    
    ### What changes were proposed in this pull request?
    This PR adds a validity check to the StateStoreProvider class before 
calling its constructor. A new runtime exception was created to report this 
issue.
    
    ### Why are the changes needed?
    This is a security improvement to a user-facing API. Users are only allowed 
to use class that extends 
`org.apache.spark.sql.execution.streaming.state.StateStoreProvider` as 
StateStoreProvider.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    This PR comes with a new test to test that invalid class cannot pass the 
security check. The fact that other tests in StreamingQuerySuite can pass 
implies that valid class can pass the check.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #46791 from eason-yuchen-liu/check-statestoreprovider.
    
    Authored-by: Yuchen Liu <yuchen....@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../src/main/resources/error/error-conditions.json      |  6 ++++++
 .../sql/execution/streaming/state/StateStore.scala      |  8 +++++++-
 .../spark/sql/streaming/StreamingQuerySuite.scala       | 17 +++++++++++++++++
 3 files changed, 30 insertions(+), 1 deletion(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 3dd7a6d65d7f..69965e58fb79 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -3718,6 +3718,12 @@
     ],
     "sqlState" : "42802"
   },
+  "STATE_STORE_INVALID_PROVIDER" : {
+    "message" : [
+      "The given State Store Provider <inputClass> does not extend 
org.apache.spark.sql.execution.streaming.state.StateStoreProvider."
+    ],
+    "sqlState" : "42K06"
+  },
   "STATE_STORE_NULL_TYPE_ORDERING_COLS_NOT_SUPPORTED" : {
     "message" : [
       "Null type ordering column with name=<fieldName> at index=<index> is not 
supported for range scan encoder."
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 8c2170abe311..b59fe65fb14a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -29,7 +29,7 @@ import scala.util.control.NonFatal
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.{SparkContext, SparkEnv, 
SparkUnsupportedOperationException}
+import org.apache.spark.{SparkContext, SparkEnv, SparkException, 
SparkUnsupportedOperationException}
 import org.apache.spark.internal.{Logging, LogKeys, MDC}
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.catalyst.util.UnsafeRowUtils
@@ -396,6 +396,12 @@ object StateStoreProvider {
    */
   def create(providerClassName: String): StateStoreProvider = {
     val providerClass = Utils.classForName(providerClassName)
+    if (!classOf[StateStoreProvider].isAssignableFrom(providerClass)) {
+      throw new SparkException(
+        errorClass = "STATE_STORE_INVALID_PROVIDER",
+        messageParameters = Map("inputClass" -> providerClassName),
+        cause = null)
+    }
     
providerClass.getConstructor().newInstance().asInstanceOf[StateStoreProvider]
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 8b761c24b604..fb118adbe221 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -1432,6 +1432,23 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
     }
   }
 
+  test("SPARK-48447: check state store provider class before invoking the 
constructor") {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[Object].getCanonicalName) {
+      val input = MemoryStream[Int]
+      input.addData(1)
+      val query = input.toDF().limit(2).writeStream
+        .trigger(Trigger.AvailableNow())
+        .format("console")
+        .start()
+      val ex = intercept[StreamingQueryException] {
+        query.processAllAvailable()
+      }
+      assert(ex.getMessage.contains(
+        s"The given State Store Provider ${classOf[Object].getCanonicalName} 
does not " +
+          "extend 
org.apache.spark.sql.execution.streaming.state.StateStoreProvider."))
+    }
+  }
+
   private def checkExceptionMessage(df: DataFrame): Unit = {
     withTempDir { outputDir =>
       withTempDir { checkpointDir =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to