bogao007 commented on code in PR #47933:
URL: https://github.com/apache/spark/pull/47933#discussion_r1773807929


##########
python/pyspark/sql/streaming/stateful_processor.py:
##########
@@ -99,25 +99,25 @@ def exists(self) -> bool:
         """
         return self._list_state_client.exists(self._state_name)
 
-    def get(self) -> Iterator[Row]:
+    def get(self) -> Iterator[Tuple]:

Review Comment:
   @HeartSaVioR Yep, we store as `Row` in state store. The pickling thing is 
that I followed what ApplyInPandasWithStateWriter does for groupState. It 
serializes `Row` state to bytes in JVM: 
https://github.com/apache/spark/blob/55d0233d19cc52bee91a9619057d9b6f33165a0a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStateWriter.scala#L182
 and then deserialize using pickling in python: 
https://github.com/apache/spark/blob/55d0233d19cc52bee91a9619057d9b6f33165a0a/python/pyspark/sql/pandas/serializers.py#L837
 Are you saying we shouldn't use `pickleSer.loads` in valueState `get()`? If 
so, could you share your concerns here?
   
   An alternative I can think about is to let valueState also use arrow to 
transmit the state row from JVM to python side and deserialize just like list 
state. Let me know your thoughts on this, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to