This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 5100b2e5d1aa [SPARK-46960][SS] Testing Multiple Input Streams with TransformWithState operator 5100b2e5d1aa is described below commit 5100b2e5d1aab081e6c5ac9cb3d9a46f5b2b6353 Author: Eric Marnadi <eric.marn...@databricks.com> AuthorDate: Tue Feb 6 12:28:46 2024 +0900 [SPARK-46960][SS] Testing Multiple Input Streams with TransformWithState operator Adding unit tests to test multiple input streams with the TransformWithState operator ### What changes were proposed in this pull request? Added unit tests in TransformWithStateSuite ### Why are the changes needed? These changes are needed to ensure that we can union multiple input streams with the TWS operator, just like any other stateful operator ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? This change is just adding tests. No further tests needed. ### Was this patch authored or co-authored using generative AI tooling? No Closes #45004 from ericm-db/multiple-input-streams. Authored-by: Eric Marnadi <eric.marn...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../sql/streaming/TransformWithStateSuite.scala | 96 ++++++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala index 7a6c3f00fc7a..3efef3b37000 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala @@ -233,6 +233,102 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } } + + test("transformWithState - two input streams") { + withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> + classOf[RocksDBStateStoreProvider].getName, + SQLConf.SHUFFLE_PARTITIONS.key -> + TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString) { + val inputData1 = MemoryStream[String] + val inputData2 = MemoryStream[String] + + val result = inputData1.toDS() + .union(inputData2.toDS()) + .groupByKey(x => x) + .transformWithState(new RunningCountStatefulProcessor(), + TimeoutMode.NoTimeouts(), + OutputMode.Update()) + + testStream(result, OutputMode.Update())( + AddData(inputData1, "a"), + CheckNewAnswer(("a", "1")), + AddData(inputData2, "a", "b"), + CheckNewAnswer(("a", "2"), ("b", "1")), + AddData(inputData1, "a", "b"), // should remove state for "a" and not return anything for a + CheckNewAnswer(("b", "2")), + AddData(inputData1, "d", "e"), + AddData(inputData2, "a", "c"), // should recreate state for "a" and return count as 1 + CheckNewAnswer(("a", "1"), ("c", "1"), ("d", "1"), ("e", "1")), + StopStream + ) + } + } + + test("transformWithState - three input streams") { + withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> + classOf[RocksDBStateStoreProvider].getName, + SQLConf.SHUFFLE_PARTITIONS.key -> + TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString) { + val inputData1 = MemoryStream[String] + val inputData2 = MemoryStream[String] + val inputData3 = MemoryStream[String] + + // union 3 input streams + val result = inputData1.toDS() + .union(inputData2.toDS()) + .union(inputData3.toDS()) + .groupByKey(x => x) + .transformWithState(new RunningCountStatefulProcessor(), + TimeoutMode.NoTimeouts(), + OutputMode.Update()) + + testStream(result, OutputMode.Update())( + AddData(inputData1, "a"), + CheckNewAnswer(("a", "1")), + AddData(inputData2, "a", "b"), + CheckNewAnswer(("a", "2"), ("b", "1")), + AddData(inputData3, "a", "b"), // should remove state for "a" and not return anything for a + CheckNewAnswer(("b", "2")), + AddData(inputData1, "d", "e"), + AddData(inputData2, "a", "c"), // should recreate state for "a" and return count as 1 + CheckNewAnswer(("a", "1"), ("c", "1"), ("d", "1"), ("e", "1")), + AddData(inputData3, "a", "c", "d", "e"), + CheckNewAnswer(("a", "2"), ("c", "2"), ("d", "2"), ("e", "2")), + StopStream + ) + } + } + + test("transformWithState - two input streams, different key type") { + withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> + classOf[RocksDBStateStoreProvider].getName, + SQLConf.SHUFFLE_PARTITIONS.key -> + TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString) { + val inputData1 = MemoryStream[String] + val inputData2 = MemoryStream[Long] + + val result = inputData1.toDS() + // union inputData2 by casting it to a String + .union(inputData2.toDS().map(_.toString)) + .groupByKey(x => x) + .transformWithState(new RunningCountStatefulProcessor(), + TimeoutMode.NoTimeouts(), + OutputMode.Update()) + + testStream(result, OutputMode.Update())( + AddData(inputData1, "1"), + CheckNewAnswer(("1", "1")), + AddData(inputData2, 1L, 2L), + CheckNewAnswer(("1", "2"), ("2", "1")), + AddData(inputData1, "1", "2"), // should remove state for "1" and not return anything. + CheckNewAnswer(("2", "2")), + AddData(inputData1, "4", "5"), + AddData(inputData2, 1L, 3L), // should recreate state for "1" and return count as 1 + CheckNewAnswer(("1", "1"), ("3", "1"), ("4", "1"), ("5", "1")), + StopStream + ) + } + } } class TransformWithStateValidationSuite extends StateStoreMetricsTest { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org