Huanli Wang created SPARK-42566:
-----------------------------------

             Summary: RocksDB StateStore lock acquisition should happen before 
getting input iterator from inputRDD
                 Key: SPARK-42566
                 URL: https://issues.apache.org/jira/browse/SPARK-42566
             Project: Spark
          Issue Type: Improvement
          Components: Structured Streaming
    Affects Versions: 3.5.0
            Reporter: Huanli Wang


The current behavior of the `{*}compute{*}` method in both 
`{*}StateStoreRDD{*}` and `{*}ReadStateStoreRDD{*}` is: we first get the state 
store instance and then get the input iterator for the inputRDD.

For RocksDB state store, the running task will acquire and hold the lock for 
this instance. The retried task or speculative task will fail to acquire the 
lock and eventually abort the job if there are some network issues. For 
example, When we shrink the executors, the alive one will try to fetch data 
from the killed ones because it doesn't know the target location (prefetched 
from the driver) is dead until it tries to fetch data. The query might be 
hanging for a long time as the executor will retry 
{{*spark.shuffle.io.maxRetries=3*}} times and for each retry wait for 
{{*spark.shuffle.io.connectionTimeout*}} (default value is 120s) before 
timeout. In total, the task could be hanging for about 6 minutes. And the 
retried or speculative tasks won't be able to acquire the lock in this period.

Making lock acquisition happen after retrieving the input iterator should be 
able to avoid this situation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to