[ 
https://issues.apache.org/jira/browse/SPARK-50302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-50302:
-----------------------------------
    Labels: pull-request-available  (was: )

> TransformWithState secondary index sizes should be proportional to primary 
> indexes
> ----------------------------------------------------------------------------------
>
>                 Key: SPARK-50302
>                 URL: https://issues.apache.org/jira/browse/SPARK-50302
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 4.0.0
>            Reporter: Neil Ramaswamy
>            Priority: Major
>              Labels: pull-request-available
>
> Currently, the TWS operator handles TTL state variables in the same 
> approximate way:
>  # Upsert the value into the primary index
>  # Upsert the expiration timestamp into the secondary index, where the 
> expiration timestamp is the `batchTimestampMs`
> The issue with this approach is that if the same state variable is updated 
> across two different micro-batches, there exists 1 entry in the primary 
> index, while there exists _two_ entries in the secondary index. Consider the 
> following example for a state variable `foo` with value `v1`, and TTL delay 
> of 500:
> Batch 0, `batchTimestampMs = 100`, `foo` updates to `v1`:
>  * Primary index: `[foo -> (v1, 600)]`
>  * Secondary index: `[(600, foo) -> EMPTY]`
> Batch 1: `batchTimestampMs = 200`, `foo` updates to `v2`:
>  * Primary index: `[foo -> (v2, 700)]`
>  * Secondary index: `[(600, foo) -> EMPTY, (700, foo) -> EMPTY]`
> You'll notice that the secondary index now has size 2, even though the 
> primary index only has size 1. When we clean up `(600, foo)`, we actually 
> _don't_ delete it, since we do [another lookup in the primary index to 
> determine if the secondary index entry we're dealing with is 
> stale|https://github.com/apache/spark/blob/05508cf7cb9da3042fa4b17645102a6406278695/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImplWithTTL.scala#L109-L113].
> So, to summarize, the write path is always write to the primary _and_ 
> secondary index, and then for deletion, we go through everything in the 
> secondary index, and do a lookup on the primary index to see whether to 
> delete it.
> While this may not seem like a huge issue, things get way worse for 
> `ListState`. Our cleanup logic for list state is as follows:
>  # Grab an iterator for the entire list
>  # Clear the entire list
>  # For each element in the iterator that isn't expired, merge it back into 
> the list
> This means that having an erroneous entry in your secondary index means that 
> you will go through the entire list _several_ times, which will negatively 
> impact performance. We should most definitely make sure that the secondary 
> index has only as many elements as the primary index, which will prevent us 
> from doing unnecessary work during cleanup.
> Solutions to this problem will be proposed in the PR.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to