HyukjinKwon commented on code in PR #47805:
URL: https://github.com/apache/spark/pull/47805#discussion_r1732045883


##########
python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py:
##########
@@ -211,6 +211,95 @@ def 
test_transform_with_state_in_pandas_query_restarts(self):
             Row(id="1", countAsString="2"),
         }
 
+    # test value state with ttl has the same behavior as value state when
+    # state doesn't expire.
+    def test_value_state_ttl_basic(self):
+        def check_results(batch_df, batch_id):
+            if batch_id == 0:
+                assert set(batch_df.sort("id").collect()) == {
+                    Row(id="0", countAsString="2"),
+                    Row(id="1", countAsString="2"),
+                }
+            else:
+                assert set(batch_df.sort("id").collect()) == {
+                    Row(id="0", countAsString="3"),
+                    Row(id="1", countAsString="2"),
+                }
+
+        self._test_transform_with_state_in_pandas_basic(
+            SimpleTTLStatefulProcessor(), check_results, False, 
"processingTime"
+        )
+
+    def test_value_state_ttl_expiration(self):
+        def check_results(batch_df, batch_id):
+            if batch_id == 0:
+                assertDataFrameEqual(
+                    batch_df,
+                    [
+                        Row(id="ttl-count-0", count=1),
+                        Row(id="count-0", count=1),
+                        Row(id="ttl-count-1", count=1),
+                        Row(id="count-1", count=1),
+                    ],
+                )
+            elif batch_id == 1:
+                assertDataFrameEqual(
+                    batch_df,
+                    [
+                        Row(id="ttl-count-0", count=2),
+                        Row(id="count-0", count=2),
+                        Row(id="ttl-count-1", count=2),
+                        Row(id="count-1", count=2),
+                    ],
+                )
+            elif batch_id == 2:
+                # ttl-count-0 expire and restart from count 0.
+                # ttl-count-1 get reset in batch 1 and keep the state
+                # non-ttl state never expires
+                assertDataFrameEqual(
+                    batch_df,
+                    [
+                        Row(id="ttl-count-0", count=1),
+                        Row(id="count-0", count=3),
+                        Row(id="ttl-count-1", count=3),
+                        Row(id="count-1", count=3),
+                    ],
+                )
+            if batch_id == 0 or batch_id == 1:
+                time.sleep(6)
+
+        input_path = tempfile.mkdtemp()
+        df = self._build_test_df(input_path)
+        self._prepare_input_data(input_path + "/batch1.txt", [1, 0], [0, 0])
+        self._prepare_input_data(input_path + "/batch2.txt", [1, 0], [0, 0])
+        self._prepare_input_data(input_path + "/batch3.txt", [1, 0], [0, 0])
+        for q in self.spark.streams.active:
+            q.stop()
+        output_schema = StructType(
+            [
+                StructField("id", StringType(), True),
+                StructField("count", IntegerType(), True),
+            ]
+        )
+
+        q = (
+            df.groupBy("id")
+            .transformWithStateInPandas(
+                statefulProcessor=TTLStatefulProcessor(),
+                outputStructType=output_schema,
+                outputMode="Update",
+                timeMode="processingTime",
+            )
+            .writeStream.foreachBatch(check_results)
+            .outputMode("update")
+            .start()
+        )
+        self.assertTrue(q.isActive)
+        q.processAllAvailable()
+        q.stop()

Review Comment:
   try-finally



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to