[ https://issues.apache.org/jira/browse/SPARK-42794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jungtaek Lim reassigned SPARK-42794: ------------------------------------ Assignee: Huanli Wang > Increase the lockAcquireTimeoutMs for acquiring the RocksDB state store in > Structure Streaming > ---------------------------------------------------------------------------------------------- > > Key: SPARK-42794 > URL: https://issues.apache.org/jira/browse/SPARK-42794 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming > Affects Versions: 3.5.0 > Reporter: Huanli Wang > Assignee: Huanli Wang > Priority: Minor > > We are seeing query failure which is caused by RocksDB acquisition failure > for the retry tasks. > * at t1, we shrink the cluster to only have one executor > {code:java} > 23/03/05 22:47:21 INFO StandaloneAppClient$ClientEndpoint: Executor updated: > app-20230305224215-0000/2 is now DECOMMISSIONED (worker decommissioned > because of kill request from HTTP endpoint (data migration disabled)) > 23/03/05 22:47:21 INFO StandaloneAppClient$ClientEndpoint: Executor updated: > app-20230305224215-0000/3 is now DECOMMISSIONED (worker decommissioned > because of kill request from HTTP endpoint (data migration disabled)) > {code} > > * at t1+2min, task 7 at its first attempt (i.e. task 7.0) is scheduled to > the alive executor > {code:java} > 23/03/05 22:49:58 INFO TaskSetManager: Starting task 7.0 in stage 133.0 (TID > 685) (10.166.225.249, executor 0, partition 7, ANY, {code} > > It seems that task 7.0 is able to pass [*{{dataRDD.iterator(partition, > ctxt)}}*|https://github.com/apache/spark/blob/4db8e7b7944302a3929dd6a1197ea1385eecc46a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala#L123] > and acquires the rocksdb lock as we are seeing > {code:java} > 23/03/05 22:51:59 WARN TaskSetManager: Lost task 4.1 in stage 133.1 (TID 700) > (10.166.225.249 executor 0): java.lang.IllegalStateException: > StateStoreId(opId=0,partId=7,name=default): RocksDB instance could not be > acquired by [ThreadId: Some(50), task: partition 7.1 in stage 133.1, TID 700] > as it was not released by [ThreadId: Some(449), task: partition 7.0 in stage > 133.0, TID 685] after 60003 ms. > 23/03/05 22:52:59 WARN TaskSetManager: Lost task 4.2 in stage 133.1 (TID 702) > (10.166.225.249 executor 0): java.lang.IllegalStateException: > StateStoreId(opId=0,partId=7,name=default): RocksDB instance could not be > acquired by [ThreadId: Some(1495), task: partition 7.2 in stage 133.1, TID > 702] as it was not released by [ThreadId: Some(449), task: partition 7.0 in > stage 133.0, TID 685] after 60006 ms. > 23/03/05 22:53:59 WARN TaskSetManager: Lost task 4.3 in stage 133.1 (TID 704) > (10.166.225.249 executor 0): java.lang.IllegalStateException: > StateStoreId(opId=0,partId=7,name=default): RocksDB instance could not be > acquired by [ThreadId: Some(46), task: partition 7.3 in stage 133.1, TID 704] > as it was not released by [ThreadId: Some(449), task: partition 7.0 in stage > 133.0, TID 685] after 60003 ms. > {code} > > Increasing the *lockAcquireTimeoutMs* to 2 minutes such that 4 task retries > will give us 8 minutes to acquire the lock and it is larger than > connectionTimeout with retries (3 * 120s). -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org