This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 7ec37e47d13e [SPARK-48233][SS][TESTS] Tests for streaming on columns with non-default collations 7ec37e47d13e is described below commit 7ec37e47d13e7f0ad51e295f99853e9bdee5b7a9 Author: Aleksandar Tomic <aleksandar.to...@databricks.com> AuthorDate: Wed May 15 14:42:44 2024 +0900 [SPARK-48233][SS][TESTS] Tests for streaming on columns with non-default collations ### What changes were proposed in this pull request? This change covers tests for streaming operations under columns of string type that are collated with non-utf8-binary collations. PR introduces following tests: 1) Non-stateful streaming for non-binary collated columns. We use `UTF8_BINARY_LCASE` non-binary collation as the input and assert that streaming propagates collation and that filtering behaves under rules of given collation. 2) Stateful streaming for binary collations. We use `UNICODE` collation as source and make sure that stateful operations (deduplication as taken as the example) work. 3) More tests that assert that stateful operations in combination with non-binary collations throw proper exception. ### Why are the changes needed? You can find more information about collation effort in document attached to root jira ticket. This PR adds tests for basic non-stateful streaming operations with collations (e.g. filtering). ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? PR is test only. ### Was this patch authored or co-authored using generative AI tooling? No Closes #46247 from dbatomic/streaming_and_collations. Authored-by: Aleksandar Tomic <aleksandar.to...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../streaming/StreamingDeduplicationSuite.scala | 48 ++++++++++++++++++++++ .../spark/sql/streaming/StreamingQuerySuite.scala | 29 +++++++++++++ 2 files changed, 77 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala index 5c3d8d877f39..5c816c5cddc7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala @@ -21,11 +21,13 @@ import java.io.File import org.apache.commons.io.FileUtils +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.DataFrame import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StringType import org.apache.spark.tags.SlowSQLTest import org.apache.spark.util.Utils @@ -484,6 +486,52 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { CheckLastBatch(("c", 9, "c")) ) } + + test("collation aware deduplication") { + val inputData = MemoryStream[(String, Int)] + val result = inputData.toDF() + .select(col("_1") + .try_cast(StringType("UNICODE")).as("str"), + col("_2").as("int")) + .dropDuplicates("str") + + testStream(result, Append)( + AddData(inputData, "a" -> 1), + CheckLastBatch("a" -> 1), + assertNumStateRows(total = 1, updated = 1, droppedByWatermark = 0), + AddData(inputData, "a" -> 2), // Dropped + CheckLastBatch(), + assertNumStateRows(total = 1, updated = 0, droppedByWatermark = 0), + // scalastyle:off + AddData(inputData, "ä" -> 1), + CheckLastBatch("ä" -> 1), + // scalastyle:on + assertNumStateRows(total = 2, updated = 1, droppedByWatermark = 0) + ) + } + + test("non-binary collation aware deduplication not supported") { + val inputData = MemoryStream[(String)] + val result = inputData.toDF() + .select(col("value") + .try_cast(StringType("UTF8_BINARY_LCASE")).as("str")) + .dropDuplicates("str") + + val ex = intercept[StreamingQueryException] { + testStream(result, Append)( + AddData(inputData, "a"), + CheckLastBatch("a")) + } + + checkError( + ex.getCause.asInstanceOf[SparkUnsupportedOperationException], + errorClass = "STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY", + parameters = Map( + "schema" -> ".+\"type\":\"string collate UTF8_BINARY_LCASE\".+" + ), + matchPVals = true + ) + } } @SlowSQLTest diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 504c0b334e42..227b50509afe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -1364,6 +1364,35 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi ) } + test("Collation aware streaming") { + withTable("parquet_streaming_tbl") { + spark.sql( + """ + |CREATE TABLE parquet_streaming_tbl + |( + | key STRING COLLATE UTF8_BINARY_LCASE, + | value_stream INTEGER + |) USING parquet""".stripMargin) + + val streamDf = spark.readStream.table("parquet_streaming_tbl") + val filteredDf = streamDf.filter("key = 'aaa'") + + val clock = new StreamManualClock() + testStream(filteredDf)( + StartStream(triggerClock = clock, trigger = Trigger.ProcessingTime(100)), + Execute { _ => + spark.createDataFrame(Seq("aaa" -> 1, "AAA" -> 2, "bbb" -> 3, "aa" -> 4)) + .toDF("key", "value_stream") + .write.format("parquet").mode(SaveMode.Append) + .saveAsTable("parquet_streaming_tbl") + }, + AdvanceManualClock(150), + waitUntilBatchProcessed(clock), + CheckLastBatch(("aaa", 1), ("AAA", 2)) + ) + } + } + test("SPARK-47776: streaming aggregation having binary inequality column in the grouping " + "key must be disallowed") { val tableName = "parquet_dummy_tbl" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org