[ https://issues.apache.org/jira/browse/SPARK-50302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated SPARK-50302: ----------------------------------- Labels: pull-request-available (was: ) > TransformWithState secondary index sizes should be proportional to primary > indexes > ---------------------------------------------------------------------------------- > > Key: SPARK-50302 > URL: https://issues.apache.org/jira/browse/SPARK-50302 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming > Affects Versions: 4.0.0 > Reporter: Neil Ramaswamy > Priority: Major > Labels: pull-request-available > > Currently, the TWS operator handles TTL state variables in the same > approximate way: > # Upsert the value into the primary index > # Upsert the expiration timestamp into the secondary index, where the > expiration timestamp is the `batchTimestampMs` > The issue with this approach is that if the same state variable is updated > across two different micro-batches, there exists 1 entry in the primary > index, while there exists _two_ entries in the secondary index. Consider the > following example for a state variable `foo` with value `v1`, and TTL delay > of 500: > Batch 0, `batchTimestampMs = 100`, `foo` updates to `v1`: > * Primary index: `[foo -> (v1, 600)]` > * Secondary index: `[(600, foo) -> EMPTY]` > Batch 1: `batchTimestampMs = 200`, `foo` updates to `v2`: > * Primary index: `[foo -> (v2, 700)]` > * Secondary index: `[(600, foo) -> EMPTY, (700, foo) -> EMPTY]` > You'll notice that the secondary index now has size 2, even though the > primary index only has size 1. When we clean up `(600, foo)`, we actually > _don't_ delete it, since we do [another lookup in the primary index to > determine if the secondary index entry we're dealing with is > stale|https://github.com/apache/spark/blob/05508cf7cb9da3042fa4b17645102a6406278695/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImplWithTTL.scala#L109-L113]. > So, to summarize, the write path is always write to the primary _and_ > secondary index, and then for deletion, we go through everything in the > secondary index, and do a lookup on the primary index to see whether to > delete it. > While this may not seem like a huge issue, things get way worse for > `ListState`. Our cleanup logic for list state is as follows: > # Grab an iterator for the entire list > # Clear the entire list > # For each element in the iterator that isn't expired, merge it back into > the list > This means that having an erroneous entry in your secondary index means that > you will go through the entire list _several_ times, which will negatively > impact performance. We should most definitely make sure that the secondary > index has only as many elements as the primary index, which will prevent us > from doing unnecessary work during cleanup. > Solutions to this problem will be proposed in the PR. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org