This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ec37e47d13e [SPARK-48233][SS][TESTS] Tests for streaming on columns 
with non-default collations
7ec37e47d13e is described below

commit 7ec37e47d13e7f0ad51e295f99853e9bdee5b7a9
Author: Aleksandar Tomic <aleksandar.to...@databricks.com>
AuthorDate: Wed May 15 14:42:44 2024 +0900

    [SPARK-48233][SS][TESTS] Tests for streaming on columns with non-default 
collations
    
    ### What changes were proposed in this pull request?
    
    This change covers tests for streaming operations under columns of string 
type that are collated with non-utf8-binary collations. PR introduces following 
tests:
    1) Non-stateful streaming for non-binary collated columns. We use 
`UTF8_BINARY_LCASE` non-binary collation as the input and assert that streaming 
propagates collation and that filtering behaves under rules of given collation.
    2) Stateful streaming for binary collations. We use `UNICODE` collation as 
source and make sure that stateful operations (deduplication as taken as the 
example) work.
    3) More tests that assert that stateful operations in combination with 
non-binary collations throw proper exception.
    
    ### Why are the changes needed?
    
    You can find more information about collation effort in document attached 
to root jira ticket.
    
    This PR adds tests for basic non-stateful streaming operations with 
collations (e.g. filtering).
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    PR is test only.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #46247 from dbatomic/streaming_and_collations.
    
    Authored-by: Aleksandar Tomic <aleksandar.to...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../streaming/StreamingDeduplicationSuite.scala    | 48 ++++++++++++++++++++++
 .../spark/sql/streaming/StreamingQuerySuite.scala  | 29 +++++++++++++
 2 files changed, 77 insertions(+)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
index 5c3d8d877f39..5c816c5cddc7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
@@ -21,11 +21,13 @@ import java.io.File
 
 import org.apache.commons.io.FileUtils
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
 import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StringType
 import org.apache.spark.tags.SlowSQLTest
 import org.apache.spark.util.Utils
 
@@ -484,6 +486,52 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
       CheckLastBatch(("c", 9, "c"))
     )
   }
+
+  test("collation aware deduplication") {
+    val inputData = MemoryStream[(String, Int)]
+    val result = inputData.toDF()
+      .select(col("_1")
+        .try_cast(StringType("UNICODE")).as("str"),
+        col("_2").as("int"))
+      .dropDuplicates("str")
+
+    testStream(result, Append)(
+      AddData(inputData, "a" -> 1),
+      CheckLastBatch("a" -> 1),
+      assertNumStateRows(total = 1, updated = 1, droppedByWatermark = 0),
+      AddData(inputData, "a" -> 2), // Dropped
+      CheckLastBatch(),
+      assertNumStateRows(total = 1, updated = 0, droppedByWatermark = 0),
+      // scalastyle:off
+      AddData(inputData, "ä" -> 1),
+      CheckLastBatch("ä" -> 1),
+      // scalastyle:on
+      assertNumStateRows(total = 2, updated = 1, droppedByWatermark = 0)
+    )
+  }
+
+  test("non-binary collation aware deduplication not supported") {
+    val inputData = MemoryStream[(String)]
+    val result = inputData.toDF()
+      .select(col("value")
+        .try_cast(StringType("UTF8_BINARY_LCASE")).as("str"))
+      .dropDuplicates("str")
+
+    val ex = intercept[StreamingQueryException] {
+      testStream(result, Append)(
+        AddData(inputData, "a"),
+        CheckLastBatch("a"))
+    }
+
+    checkError(
+      ex.getCause.asInstanceOf[SparkUnsupportedOperationException],
+      errorClass = "STATE_STORE_UNSUPPORTED_OPERATION_BINARY_INEQUALITY",
+      parameters = Map(
+        "schema" -> ".+\"type\":\"string collate UTF8_BINARY_LCASE\".+"
+      ),
+      matchPVals = true
+    )
+  }
 }
 
 @SlowSQLTest
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 504c0b334e42..227b50509afe 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -1364,6 +1364,35 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
     )
   }
 
+  test("Collation aware streaming") {
+    withTable("parquet_streaming_tbl") {
+      spark.sql(
+        """
+          |CREATE TABLE parquet_streaming_tbl
+          |(
+          |  key STRING COLLATE UTF8_BINARY_LCASE,
+          |  value_stream INTEGER
+          |) USING parquet""".stripMargin)
+
+      val streamDf = spark.readStream.table("parquet_streaming_tbl")
+      val filteredDf = streamDf.filter("key = 'aaa'")
+
+      val clock = new StreamManualClock()
+      testStream(filteredDf)(
+        StartStream(triggerClock = clock, trigger = 
Trigger.ProcessingTime(100)),
+        Execute { _ =>
+          spark.createDataFrame(Seq("aaa" -> 1, "AAA" -> 2, "bbb" -> 3, "aa" 
-> 4))
+            .toDF("key", "value_stream")
+            .write.format("parquet").mode(SaveMode.Append)
+            .saveAsTable("parquet_streaming_tbl")
+        },
+        AdvanceManualClock(150),
+        waitUntilBatchProcessed(clock),
+        CheckLastBatch(("aaa", 1), ("AAA", 2))
+      )
+    }
+  }
+
   test("SPARK-47776: streaming aggregation having binary inequality column in 
the grouping " +
     "key must be disallowed") {
     val tableName = "parquet_dummy_tbl"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to