This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e8343205c4d [SPARK-47469][SS][TESTS] Add `Trigger.AvailableNow` tests 
for `transformWithState` operator
3e8343205c4d is described below

commit 3e8343205c4d434076c013acd14cbfd8736241d4
Author: jingz-db <jing.z...@databricks.com>
AuthorDate: Tue Mar 26 18:24:13 2024 +0900

    [SPARK-47469][SS][TESTS] Add `Trigger.AvailableNow` tests for 
`transformWithState` operator
    
    ### What changes were proposed in this pull request?
    
    Add tests for AvailableNow for TransformWithState operator.
    ### Why are the changes needed?
    
    Compliance with state-v2 test plan.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    ### How was this patch tested?
    
    Added unit test suites.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #45596 from jingz-db/avaiNow-tests-state-v2.
    
    Authored-by: jingz-db <jing.z...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../sql/streaming/TransformWithStateSuite.scala    | 101 ++++++++++++++++++++-
 1 file changed, 100 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
index 0fd2ef055ffc..24b0d59c45c5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
@@ -17,9 +17,13 @@
 
 package org.apache.spark.sql.streaming
 
+import java.io.File
+import java.util.UUID
+
 import org.apache.spark.SparkRuntimeException
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.Encoders
+import org.apache.spark.sql.{Dataset, Encoders}
+import org.apache.spark.sql.catalyst.util.stringToFile
 import org.apache.spark.sql.execution.streaming._
 import 
org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled,
 RocksDBStateStoreProvider, 
StatefulProcessorCannotPerformOperationWithInvalidHandleState, 
StateStoreMultipleColumnFamiliesNotSupportedException}
 import org.apache.spark.sql.functions.timestamp_seconds
@@ -650,6 +654,101 @@ class TransformWithStateSuite extends 
StateStoreMetricsTest
       )
     }
   }
+
+  /** Create a text file with a single data item */
+  private def createFile(data: String, srcDir: File): File =
+    stringToFile(new File(srcDir, s"${UUID.randomUUID()}.txt"), data)
+
+  private def createFileStream(srcDir: File): Dataset[(String, String)] = {
+    spark
+      .readStream
+      .option("maxFilesPerTrigger", "1")
+      .text(srcDir.getCanonicalPath)
+      .select("value").as[String]
+      .groupByKey(x => x)
+      .transformWithState(new RunningCountStatefulProcessor(),
+        TimeoutMode.NoTimeouts(),
+        OutputMode.Update())
+  }
+
+  test("transformWithState - availableNow trigger mode, rate limit is 
respected") {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName) {
+      withTempDir { srcDir =>
+
+        Seq("a", "b", "c").foreach(createFile(_, srcDir))
+
+        // Set up a query to read text files one at a time
+        val df = createFileStream(srcDir)
+
+        testStream(df)(
+          StartStream(trigger = Trigger.AvailableNow()),
+          ProcessAllAvailable(),
+          CheckNewAnswer(("a", "1"), ("b", "1"), ("c", "1")),
+          StopStream,
+          Execute { _ =>
+            createFile("a", srcDir)
+          },
+          StartStream(trigger = Trigger.AvailableNow()),
+          ProcessAllAvailable(),
+          CheckNewAnswer(("a", "2"))
+        )
+
+        var index = 0
+        val foreachBatchDf = df.writeStream
+          .foreachBatch((_: Dataset[(String, String)], _: Long) => {
+            index += 1
+          })
+          .trigger(Trigger.AvailableNow())
+          .start()
+
+        try {
+          foreachBatchDf.awaitTermination()
+          assert(index == 4)
+        } finally {
+          foreachBatchDf.stop()
+        }
+      }
+    }
+  }
+
+  test("transformWithState - availableNow trigger mode, multiple restarts") {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName) {
+      withTempDir { srcDir =>
+        Seq("a", "b", "c").foreach(createFile(_, srcDir))
+        val df = createFileStream(srcDir)
+
+        var index = 0
+
+        def startTriggerAvailableNowQueryAndCheck(expectedIdx: Int): Unit = {
+          val q = df.writeStream
+            .foreachBatch((_: Dataset[(String, String)], _: Long) => {
+              index += 1
+            })
+            .trigger(Trigger.AvailableNow)
+            .start()
+          try {
+            assert(q.awaitTermination(streamingTimeout.toMillis))
+            assert(index == expectedIdx)
+          } finally {
+            q.stop()
+          }
+        }
+        // start query for the first time
+        startTriggerAvailableNowQueryAndCheck(3)
+
+        // add two files and restart
+        createFile("a", srcDir)
+        createFile("b", srcDir)
+        startTriggerAvailableNowQueryAndCheck(8)
+
+        // try restart again
+        createFile("d", srcDir)
+        startTriggerAvailableNowQueryAndCheck(14)
+      }
+    }
+  }
 }
 
 class TransformWithStateValidationSuite extends StateStoreMetricsTest {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to