This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f7cdc9cdaf4 [SPARK-47776][SS] Disallow binary inequality collation be 
used in key schema of stateful operator
2f7cdc9cdaf4 is described below

commit 2f7cdc9cdaf4122ccd41e9f9b3296f4b190fee05
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Wed Apr 10 13:38:07 2024 +0900

    [SPARK-47776][SS] Disallow binary inequality collation be used in key 
schema of stateful operator
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to disallow using binary inequality collation column in 
the key schema of stateful operator. Worth noting that changing the collation 
for the same string column during the query restart was already disallowed at 
the time of introduction of collation.
    
    ### Why are the changes needed?
    
    state store API is heavily relying on the fact that provider implementation 
performs O(1)-like get and put operation. While the actual implementation would 
be dependent on the state store provider, it is intuitive to assume that these 
providers only do lookup of the key based on binary format (implying binary 
equality).
    
    That said, even though the column spec is case insensitive, state store API 
wouldn't take this into consideration, and could lead to produce the wrong 
result. e.g. Determiniing 'a' and 'A' differently while the column is case 
insensitive.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, as it wasn't released yet.
    
    ### How was this patch tested?
    
    New UTs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #45951 from HeartSaVioR/SPARK-47776.
    
    Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../src/main/resources/error/error-classes.json    |  6 ++++
 docs/sql-error-conditions.md                       |  6 ++++
 .../sql/execution/streaming/state/StateStore.scala | 22 +++++++++++-
 .../StateSchemaCompatibilityCheckerSuite.scala     | 24 +++++++++++++
 .../spark/sql/streaming/StreamingQuerySuite.scala  | 41 +++++++++++++++++++++-
 5 files changed, 97 insertions(+), 2 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index c3a01e9dcd90..45a1ec5e1e84 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -3652,6 +3652,12 @@
     ],
     "sqlState" : "XXKST"
   },
+  "STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY" : {
+    "message" : [
+      "Binary inequality column is not supported with state store. Provided 
schema: <schema>."
+    ],
+    "sqlState" : "XXKST"
+  },
   "STATE_STORE_UNSUPPORTED_OPERATION_ON_MISSING_COLUMN_FAMILY" : {
     "message" : [
       "State store operation=<operationType> not supported on missing column 
family=<colFamilyName>."
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 1887af2e814b..bb25a4c7f9f0 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -2256,6 +2256,12 @@ Null type ordering column with name=`<fieldName>` at 
index=`<index>` is not supp
 
 `<operationType>` operation not supported with `<entity>`
 
+### STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY
+
+[SQLSTATE: XXKST](sql-error-conditions-sqlstates.html#class-XX-internal-error)
+
+Binary inequality column is not supported with state store. Provided schema: 
`<schema>`.
+
 ### STATE_STORE_UNSUPPORTED_OPERATION_ON_MISSING_COLUMN_FAMILY
 
 [SQLSTATE: 
42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 959cbbaef8b0..69c9e0ed85be 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -29,7 +29,7 @@ import scala.util.control.NonFatal
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.{SparkContext, SparkEnv, 
SparkUnsupportedOperationException}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.catalyst.util.UnsafeRowUtils
@@ -635,6 +635,15 @@ object StateStore extends Logging {
     storeProvider.getStore(version)
   }
 
+  private def disallowBinaryInequalityColumn(schema: StructType): Unit = {
+    if (!UnsafeRowUtils.isBinaryStable(schema)) {
+      throw new SparkUnsupportedOperationException(
+        errorClass = "STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY",
+        messageParameters = Map("schema" -> schema.json)
+      )
+    }
+  }
+
   private def getStateStoreProvider(
       storeProviderId: StateStoreProviderId,
       keySchema: StructType,
@@ -649,6 +658,17 @@ object StateStore extends Logging {
 
       if (storeProviderId.storeId.partitionId == PARTITION_ID_TO_CHECK_SCHEMA) 
{
         val result = schemaValidated.getOrElseUpdate(storeProviderId, {
+          // SPARK-47776: collation introduces the concept of binary 
(in)equality, which means
+          // in some collation we no longer be able to just compare the binary 
format of two
+          // UnsafeRows to determine equality. For example, 'aaa' and 'AAA' 
can be "semantically"
+          // same in case insensitive collation.
+          // State store is basically key-value storage, and the most provider 
implementations
+          // rely on the fact that all the columns in the key schema support 
binary equality.
+          // We need to disallow using binary inequality column in the key 
schema, before we
+          // could support this in majority of state store providers (or 
high-level of state
+          // store.)
+          disallowBinaryInequalityColumn(keySchema)
+
           val checker = new StateSchemaCompatibilityChecker(storeProviderId, 
hadoopConf)
           // regardless of configuration, we check compatibility to at least 
write schema file
           // if necessary
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala
index 7ba18a814044..a089a05469f7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala
@@ -63,6 +63,16 @@ class StateSchemaCompatibilityCheckerSuite extends 
SharedSparkSession {
   private val valueSchema65535Bytes = new StructType()
     .add(StructField("v" * (65535 - 87), IntegerType, nullable = true))
 
+  private val keySchemaWithCollation = new StructType()
+    .add(StructField("key1", IntegerType, nullable = true))
+    .add(StructField("key2", StringType("UTF8_BINARY_LCASE"), nullable = true))
+    .add(StructField("key3", structSchema, nullable = true))
+
+  private val valueSchemaWithCollation = new StructType()
+    .add(StructField("value1", IntegerType, nullable = true))
+    .add(StructField("value2", StringType("UTF8_BINARY_LCASE"), nullable = 
true))
+    .add(StructField("value3", structSchema, nullable = true))
+
   // Checks on adding/removing (nested) field.
 
   test("adding field to key should fail") {
@@ -241,6 +251,20 @@ class StateSchemaCompatibilityCheckerSuite extends 
SharedSparkSession {
       ignoreValueSchema = true)
   }
 
+  test("SPARK-47776: checking for compatibility with collation change in key") 
{
+    verifyException(keySchema, valueSchema, keySchemaWithCollation, 
valueSchema,
+      ignoreValueSchema = false)
+    verifyException(keySchemaWithCollation, valueSchema, keySchema, 
valueSchema,
+      ignoreValueSchema = false)
+  }
+
+  test("SPARK-47776: checking for compatibility with collation change in 
value") {
+    verifyException(keySchema, valueSchema, keySchema, 
valueSchemaWithCollation,
+      ignoreValueSchema = false)
+    verifyException(keySchema, valueSchemaWithCollation, keySchema, 
valueSchema,
+      ignoreValueSchema = false)
+  }
+
   private def applyNewSchemaToNestedFieldInKey(newNestedSchema: StructType): 
StructType = {
     applyNewSchemaToNestedField(keySchema, newNestedSchema, "key3")
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 7b3d89979470..504c0b334e42 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -34,7 +34,7 @@ import org.scalatest.BeforeAndAfter
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatestplus.mockito.MockitoSugar
 
-import org.apache.spark.{SparkException, TestUtils}
+import org.apache.spark.{SparkException, SparkUnsupportedOperationException, 
TestUtils}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, 
Row, SaveMode}
 import org.apache.spark.sql.catalyst.InternalRow
@@ -1364,6 +1364,45 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
     )
   }
 
+  test("SPARK-47776: streaming aggregation having binary inequality column in 
the grouping " +
+    "key must be disallowed") {
+    val tableName = "parquet_dummy_tbl"
+    val collationName = "UTF8_BINARY_LCASE"
+
+    withTable(tableName) {
+      sql(
+        s"""
+           |CREATE TABLE $tableName (c1 STRING COLLATE $collationName)
+           |USING PARQUET
+           |""".stripMargin)
+
+      sql(s"INSERT INTO $tableName VALUES ('aaa')")
+      sql(s"INSERT INTO $tableName VALUES ('AAA')")
+
+      val df = spark.readStream.table(tableName)
+        .groupBy("c1")
+        .count()
+
+      val query = df.writeStream
+        .format("memory")
+        .queryName("output")
+        .outputMode("update")
+        .start()
+
+      val ex = intercept[StreamingQueryException] {
+        query.processAllAvailable()
+      }
+      checkError(
+        ex.getCause.asInstanceOf[SparkUnsupportedOperationException],
+        errorClass = "STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY",
+        parameters = Map(
+          "schema" -> ".+\"type\":\"string collate UTF8_BINARY_LCASE\".+"
+        ),
+        matchPVals = true
+      )
+    }
+  }
+
   private def checkExceptionMessage(df: DataFrame): Unit = {
     withTempDir { outputDir =>
       withTempDir { checkpointDir =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to