[ 
https://issues.apache.org/jira/browse/SPARK-53743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim updated SPARK-53743:
---------------------------------
    Description: 
We got a report that TWS PySpark with Row type API failed on requesting 
ListState.put(), weirdly ran fine and eventually failed.

>From stack trace of the report, we figured out it took the code path of 
>fetchWithArrow (which is only triggered when the list size is exactly 100 - 
>which was a bug) and the conversion somehow failed on below stack trace:
{code:java}
  File "/databricks/spark/python/pyspark/sql/streaming/stateful_processor.py", 
line 147, in put
    self._listStateClient.put(self._stateName, newState)
  File "/databricks/spark/python/pyspark/sql/streaming/list_state_client.py", 
line 195, in put
    self._stateful_processor_api_client._send_arrow_state(self.schema, values)
  File "/spark/python/pyspark/sql/streaming/stateful_processor_api_client.py", 
line 604, in _send_arrow_state
    pandas_df = convert_pandas_using_numpy_type(
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/spark/python/pyspark/sql/pandas/types.py", line 1599, in 
convert_pandas_using_numpy_type
    df[field.name] = df[field.name].astype(np_type)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/python/lib/python3.12/site-packages/pandas/core/generic.py", line 
6643, in astype
    new_data = self._mgr.astype(dtype=dtype, copy=copy, errors=errors)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/python/lib/python3.12/site-packages/pandas/core/internals/managers.py", line 
430, in astype
    return self.apply(
           ^^^^^^^^^^^
  File 
"/python/lib/python3.12/site-packages/pandas/core/internals/managers.py", line 
363, in apply
    applied = getattr(b, f)(**kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^
  File "/python/lib/python3.12/site-packages/pandas/core/internals/blocks.py", 
line 758, in astype
    new_values = astype_array_safe(values, dtype, copy=copy, errors=errors)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/python/lib/python3.12/site-packages/pandas/core/dtypes/astype.py", 
line 237, in astype_array_safe
    new_values = astype_array(values, dtype, copy=copy)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/python/lib/python3.12/site-packages/pandas/core/dtypes/astype.py", 
line 182, in astype_array
    values = _astype_nansafe(values, dtype, copy=copy)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/python/lib/python3.12/site-packages/pandas/core/dtypes/astype.py", 
line 133, in _astype_nansafe
    return arr.astype(dtype, copy=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: int() argument must be a string, a bytes-like object or a real 
number, not 'NoneType' {code}
The report stated that they don't have an issue when they workaround to do 
clear() and put element separately (not triggering the path of fetchWithArrow).

  was:
We got a report that TWS PySpark with Row type API failed on requesting 
ListState.put(), weirdly ran fine and eventually failed.

>From stack trace of the report, we figured out it took the code path of 
>fetchWithArrow (which is only triggered when the list size is exactly 100 - 
>which was a bug) and the conversion somehow failed on below stack trace:

 


> ListState fetchWithArrow option does not work with PySpark Row type API
> -----------------------------------------------------------------------
>
>                 Key: SPARK-53743
>                 URL: https://issues.apache.org/jira/browse/SPARK-53743
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 4.1.0
>            Reporter: Jungtaek Lim
>            Priority: Major
>
> We got a report that TWS PySpark with Row type API failed on requesting 
> ListState.put(), weirdly ran fine and eventually failed.
> From stack trace of the report, we figured out it took the code path of 
> fetchWithArrow (which is only triggered when the list size is exactly 100 - 
> which was a bug) and the conversion somehow failed on below stack trace:
> {code:java}
>   File 
> "/databricks/spark/python/pyspark/sql/streaming/stateful_processor.py", line 
> 147, in put
>     self._listStateClient.put(self._stateName, newState)
>   File "/databricks/spark/python/pyspark/sql/streaming/list_state_client.py", 
> line 195, in put
>     self._stateful_processor_api_client._send_arrow_state(self.schema, values)
>   File 
> "/spark/python/pyspark/sql/streaming/stateful_processor_api_client.py", line 
> 604, in _send_arrow_state
>     pandas_df = convert_pandas_using_numpy_type(
>                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
>   File "/spark/python/pyspark/sql/pandas/types.py", line 1599, in 
> convert_pandas_using_numpy_type
>     df[field.name] = df[field.name].astype(np_type)
>                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
>   File "/python/lib/python3.12/site-packages/pandas/core/generic.py", line 
> 6643, in astype
>     new_data = self._mgr.astype(dtype=dtype, copy=copy, errors=errors)
>                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
>   File 
> "/python/lib/python3.12/site-packages/pandas/core/internals/managers.py", 
> line 430, in astype
>     return self.apply(
>            ^^^^^^^^^^^
>   File 
> "/python/lib/python3.12/site-packages/pandas/core/internals/managers.py", 
> line 363, in apply
>     applied = getattr(b, f)(**kwargs)
>               ^^^^^^^^^^^^^^^^^^^^^^^
>   File 
> "/python/lib/python3.12/site-packages/pandas/core/internals/blocks.py", line 
> 758, in astype
>     new_values = astype_array_safe(values, dtype, copy=copy, errors=errors)
>                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
>   File "/python/lib/python3.12/site-packages/pandas/core/dtypes/astype.py", 
> line 237, in astype_array_safe
>     new_values = astype_array(values, dtype, copy=copy)
>                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
>   File "/python/lib/python3.12/site-packages/pandas/core/dtypes/astype.py", 
> line 182, in astype_array
>     values = _astype_nansafe(values, dtype, copy=copy)
>              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
>   File "/python/lib/python3.12/site-packages/pandas/core/dtypes/astype.py", 
> line 133, in _astype_nansafe
>     return arr.astype(dtype, copy=True)
>            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
> TypeError: int() argument must be a string, a bytes-like object or a real 
> number, not 'NoneType' {code}
> The report stated that they don't have an issue when they workaround to do 
> clear() and put element separately (not triggering the path of 
> fetchWithArrow).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to