This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 3e8343205c4d [SPARK-47469][SS][TESTS] Add `Trigger.AvailableNow` tests for `transformWithState` operator 3e8343205c4d is described below commit 3e8343205c4d434076c013acd14cbfd8736241d4 Author: jingz-db <jing.z...@databricks.com> AuthorDate: Tue Mar 26 18:24:13 2024 +0900 [SPARK-47469][SS][TESTS] Add `Trigger.AvailableNow` tests for `transformWithState` operator ### What changes were proposed in this pull request? Add tests for AvailableNow for TransformWithState operator. ### Why are the changes needed? Compliance with state-v2 test plan. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unit test suites. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45596 from jingz-db/avaiNow-tests-state-v2. Authored-by: jingz-db <jing.z...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../sql/streaming/TransformWithStateSuite.scala | 101 ++++++++++++++++++++- 1 file changed, 100 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala index 0fd2ef055ffc..24b0d59c45c5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala @@ -17,9 +17,13 @@ package org.apache.spark.sql.streaming +import java.io.File +import java.util.UUID + import org.apache.spark.SparkRuntimeException import org.apache.spark.internal.Logging -import org.apache.spark.sql.Encoders +import org.apache.spark.sql.{Dataset, Encoders} +import org.apache.spark.sql.catalyst.util.stringToFile import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled, RocksDBStateStoreProvider, StatefulProcessorCannotPerformOperationWithInvalidHandleState, StateStoreMultipleColumnFamiliesNotSupportedException} import org.apache.spark.sql.functions.timestamp_seconds @@ -650,6 +654,101 @@ class TransformWithStateSuite extends StateStoreMetricsTest ) } } + + /** Create a text file with a single data item */ + private def createFile(data: String, srcDir: File): File = + stringToFile(new File(srcDir, s"${UUID.randomUUID()}.txt"), data) + + private def createFileStream(srcDir: File): Dataset[(String, String)] = { + spark + .readStream + .option("maxFilesPerTrigger", "1") + .text(srcDir.getCanonicalPath) + .select("value").as[String] + .groupByKey(x => x) + .transformWithState(new RunningCountStatefulProcessor(), + TimeoutMode.NoTimeouts(), + OutputMode.Update()) + } + + test("transformWithState - availableNow trigger mode, rate limit is respected") { + withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> + classOf[RocksDBStateStoreProvider].getName) { + withTempDir { srcDir => + + Seq("a", "b", "c").foreach(createFile(_, srcDir)) + + // Set up a query to read text files one at a time + val df = createFileStream(srcDir) + + testStream(df)( + StartStream(trigger = Trigger.AvailableNow()), + ProcessAllAvailable(), + CheckNewAnswer(("a", "1"), ("b", "1"), ("c", "1")), + StopStream, + Execute { _ => + createFile("a", srcDir) + }, + StartStream(trigger = Trigger.AvailableNow()), + ProcessAllAvailable(), + CheckNewAnswer(("a", "2")) + ) + + var index = 0 + val foreachBatchDf = df.writeStream + .foreachBatch((_: Dataset[(String, String)], _: Long) => { + index += 1 + }) + .trigger(Trigger.AvailableNow()) + .start() + + try { + foreachBatchDf.awaitTermination() + assert(index == 4) + } finally { + foreachBatchDf.stop() + } + } + } + } + + test("transformWithState - availableNow trigger mode, multiple restarts") { + withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> + classOf[RocksDBStateStoreProvider].getName) { + withTempDir { srcDir => + Seq("a", "b", "c").foreach(createFile(_, srcDir)) + val df = createFileStream(srcDir) + + var index = 0 + + def startTriggerAvailableNowQueryAndCheck(expectedIdx: Int): Unit = { + val q = df.writeStream + .foreachBatch((_: Dataset[(String, String)], _: Long) => { + index += 1 + }) + .trigger(Trigger.AvailableNow) + .start() + try { + assert(q.awaitTermination(streamingTimeout.toMillis)) + assert(index == expectedIdx) + } finally { + q.stop() + } + } + // start query for the first time + startTriggerAvailableNowQueryAndCheck(3) + + // add two files and restart + createFile("a", srcDir) + createFile("b", srcDir) + startTriggerAvailableNowQueryAndCheck(8) + + // try restart again + createFile("d", srcDir) + startTriggerAvailableNowQueryAndCheck(14) + } + } + } } class TransformWithStateValidationSuite extends StateStoreMetricsTest { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org