This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 2f7cdc9cdaf4 [SPARK-47776][SS] Disallow binary inequality collation be used in key schema of stateful operator 2f7cdc9cdaf4 is described below commit 2f7cdc9cdaf4122ccd41e9f9b3296f4b190fee05 Author: Jungtaek Lim <kabhwan.opensou...@gmail.com> AuthorDate: Wed Apr 10 13:38:07 2024 +0900 [SPARK-47776][SS] Disallow binary inequality collation be used in key schema of stateful operator ### What changes were proposed in this pull request? This PR proposes to disallow using binary inequality collation column in the key schema of stateful operator. Worth noting that changing the collation for the same string column during the query restart was already disallowed at the time of introduction of collation. ### Why are the changes needed? state store API is heavily relying on the fact that provider implementation performs O(1)-like get and put operation. While the actual implementation would be dependent on the state store provider, it is intuitive to assume that these providers only do lookup of the key based on binary format (implying binary equality). That said, even though the column spec is case insensitive, state store API wouldn't take this into consideration, and could lead to produce the wrong result. e.g. Determiniing 'a' and 'A' differently while the column is case insensitive. ### Does this PR introduce _any_ user-facing change? No, as it wasn't released yet. ### How was this patch tested? New UTs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45951 from HeartSaVioR/SPARK-47776. Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../src/main/resources/error/error-classes.json | 6 ++++ docs/sql-error-conditions.md | 6 ++++ .../sql/execution/streaming/state/StateStore.scala | 22 +++++++++++- .../StateSchemaCompatibilityCheckerSuite.scala | 24 +++++++++++++ .../spark/sql/streaming/StreamingQuerySuite.scala | 41 +++++++++++++++++++++- 5 files changed, 97 insertions(+), 2 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index c3a01e9dcd90..45a1ec5e1e84 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -3652,6 +3652,12 @@ ], "sqlState" : "XXKST" }, + "STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY" : { + "message" : [ + "Binary inequality column is not supported with state store. Provided schema: <schema>." + ], + "sqlState" : "XXKST" + }, "STATE_STORE_UNSUPPORTED_OPERATION_ON_MISSING_COLUMN_FAMILY" : { "message" : [ "State store operation=<operationType> not supported on missing column family=<colFamilyName>." diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index 1887af2e814b..bb25a4c7f9f0 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -2256,6 +2256,12 @@ Null type ordering column with name=`<fieldName>` at index=`<index>` is not supp `<operationType>` operation not supported with `<entity>` +### STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY + +[SQLSTATE: XXKST](sql-error-conditions-sqlstates.html#class-XX-internal-error) + +Binary inequality column is not supported with state store. Provided schema: `<schema>`. + ### STATE_STORE_UNSUPPORTED_OPERATION_ON_MISSING_COLUMN_FAMILY [SQLSTATE: 42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 959cbbaef8b0..69c9e0ed85be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -29,7 +29,7 @@ import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.spark.{SparkContext, SparkEnv} +import org.apache.spark.{SparkContext, SparkEnv, SparkUnsupportedOperationException} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.util.UnsafeRowUtils @@ -635,6 +635,15 @@ object StateStore extends Logging { storeProvider.getStore(version) } + private def disallowBinaryInequalityColumn(schema: StructType): Unit = { + if (!UnsafeRowUtils.isBinaryStable(schema)) { + throw new SparkUnsupportedOperationException( + errorClass = "STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY", + messageParameters = Map("schema" -> schema.json) + ) + } + } + private def getStateStoreProvider( storeProviderId: StateStoreProviderId, keySchema: StructType, @@ -649,6 +658,17 @@ object StateStore extends Logging { if (storeProviderId.storeId.partitionId == PARTITION_ID_TO_CHECK_SCHEMA) { val result = schemaValidated.getOrElseUpdate(storeProviderId, { + // SPARK-47776: collation introduces the concept of binary (in)equality, which means + // in some collation we no longer be able to just compare the binary format of two + // UnsafeRows to determine equality. For example, 'aaa' and 'AAA' can be "semantically" + // same in case insensitive collation. + // State store is basically key-value storage, and the most provider implementations + // rely on the fact that all the columns in the key schema support binary equality. + // We need to disallow using binary inequality column in the key schema, before we + // could support this in majority of state store providers (or high-level of state + // store.) + disallowBinaryInequalityColumn(keySchema) + val checker = new StateSchemaCompatibilityChecker(storeProviderId, hadoopConf) // regardless of configuration, we check compatibility to at least write schema file // if necessary diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala index 7ba18a814044..a089a05469f7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala @@ -63,6 +63,16 @@ class StateSchemaCompatibilityCheckerSuite extends SharedSparkSession { private val valueSchema65535Bytes = new StructType() .add(StructField("v" * (65535 - 87), IntegerType, nullable = true)) + private val keySchemaWithCollation = new StructType() + .add(StructField("key1", IntegerType, nullable = true)) + .add(StructField("key2", StringType("UTF8_BINARY_LCASE"), nullable = true)) + .add(StructField("key3", structSchema, nullable = true)) + + private val valueSchemaWithCollation = new StructType() + .add(StructField("value1", IntegerType, nullable = true)) + .add(StructField("value2", StringType("UTF8_BINARY_LCASE"), nullable = true)) + .add(StructField("value3", structSchema, nullable = true)) + // Checks on adding/removing (nested) field. test("adding field to key should fail") { @@ -241,6 +251,20 @@ class StateSchemaCompatibilityCheckerSuite extends SharedSparkSession { ignoreValueSchema = true) } + test("SPARK-47776: checking for compatibility with collation change in key") { + verifyException(keySchema, valueSchema, keySchemaWithCollation, valueSchema, + ignoreValueSchema = false) + verifyException(keySchemaWithCollation, valueSchema, keySchema, valueSchema, + ignoreValueSchema = false) + } + + test("SPARK-47776: checking for compatibility with collation change in value") { + verifyException(keySchema, valueSchema, keySchema, valueSchemaWithCollation, + ignoreValueSchema = false) + verifyException(keySchema, valueSchemaWithCollation, keySchema, valueSchema, + ignoreValueSchema = false) + } + private def applyNewSchemaToNestedFieldInKey(newNestedSchema: StructType): StructType = { applyNewSchemaToNestedField(keySchema, newNestedSchema, "key3") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 7b3d89979470..504c0b334e42 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -34,7 +34,7 @@ import org.scalatest.BeforeAndAfter import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatestplus.mockito.MockitoSugar -import org.apache.spark.{SparkException, TestUtils} +import org.apache.spark.{SparkException, SparkUnsupportedOperationException, TestUtils} import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, Row, SaveMode} import org.apache.spark.sql.catalyst.InternalRow @@ -1364,6 +1364,45 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi ) } + test("SPARK-47776: streaming aggregation having binary inequality column in the grouping " + + "key must be disallowed") { + val tableName = "parquet_dummy_tbl" + val collationName = "UTF8_BINARY_LCASE" + + withTable(tableName) { + sql( + s""" + |CREATE TABLE $tableName (c1 STRING COLLATE $collationName) + |USING PARQUET + |""".stripMargin) + + sql(s"INSERT INTO $tableName VALUES ('aaa')") + sql(s"INSERT INTO $tableName VALUES ('AAA')") + + val df = spark.readStream.table(tableName) + .groupBy("c1") + .count() + + val query = df.writeStream + .format("memory") + .queryName("output") + .outputMode("update") + .start() + + val ex = intercept[StreamingQueryException] { + query.processAllAvailable() + } + checkError( + ex.getCause.asInstanceOf[SparkUnsupportedOperationException], + errorClass = "STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY", + parameters = Map( + "schema" -> ".+\"type\":\"string collate UTF8_BINARY_LCASE\".+" + ), + matchPVals = true + ) + } + } + private def checkExceptionMessage(df: DataFrame): Unit = { withTempDir { outputDir => withTempDir { checkpointDir => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org