HyukjinKwon commented on code in PR #47133:
URL: https://github.com/apache/spark/pull/47133#discussion_r1692328985


##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -358,6 +364,140 @@ def applyInPandasWithState(
         )
         return DataFrame(jdf, self.session)
 
+    def transformWithStateInPandas(
+        self,
+        statefulProcessor: StatefulProcessor,
+        outputStructType: Union[StructType, str],
+        outputMode: str,
+        timeMode: str,
+    ) -> DataFrame:
+        """
+        Invokes methods defined in the stateful processor used in arbitrary 
state API v2.
+        We allow the user to act on per-group set of input rows along with 
keyed state and the
+        user can choose to output/return 0 or more rows.
+
+        For a streaming dataframe, we will repeatedly invoke the interface 
methods for new rows
+        in each trigger and the user's state/state variables will be stored 
persistently across
+        invocations.
+
+        The `statefulProcessor` should be a Python class that implements the 
interface defined in
+        pyspark.sql.streaming.stateful_processor.StatefulProcessor.
+
+        The `outputStructType` should be a :class:`StructType` describing the 
schema of all
+        elements in the returned value, `pandas.DataFrame`. The column labels 
of all elements in
+        returned `pandas.DataFrame` must either match the field names in the 
defined schema if
+        specified as strings, or match the field data types by position if not 
strings,
+        e.g. integer indices.
+
+        The size of each `pandas.DataFrame` in both the input and output can 
be arbitrary. The
+        number of `pandas.DataFrame` in both the input and output can also be 
arbitrary.
+
+        .. versionadded:: 4.0.0
+
+        Parameters
+        ----------
+        statefulProcessor : 
:class:`pyspark.sql.streaming.stateful_processor.StatefulProcessor`
+            Instance of StatefulProcessor whose functions will be invoked by 
the operator.
+        outputStructType : :class:`pyspark.sql.types.DataType` or str
+            The type of the output records. The value can be either a
+            :class:`pyspark.sql.types.DataType` object or a DDL-formatted type 
string.
+        outputMode : str
+            The output mode of the stateful processor.
+        timeMode : str
+            The time mode semantics of the stateful processor for timers and 
TTL.
+
+        Examples
+        --------
+        >>> import pandas as pd
+        >>> from pyspark.sql import Row
+        >>> from pyspark.sql.streaming import StatefulProcessor, 
StatefulProcessorHandle
+        >>> from pyspark.sql.types import StructType, StructField, LongType, 
StringType, IntegerType
+        >>> from typing import Iterator
+        >>> from pyspark.sql.functions import split, col
+        >>> 
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
+        >>> output_schema = StructType([
+        ...     StructField("id", StringType(), True),
+        ...     StructField("count", IntegerType(), True)
+        ... ])
+        >>> state_schema = StructType([
+        ...     StructField("value", IntegerType(), True)
+        ... ])
+        >>> class SimpleStatefulProcessor(StatefulProcessor):
+        ...   def init(self, handle: StatefulProcessorHandle):

Review Comment:
   4 spaced indentation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to