Neil Ramaswamy created SPARK-50302:
--------------------------------------

             Summary: TransformWithState secondary index sizes should be 
proportional to primary indexes
                 Key: SPARK-50302
                 URL: https://issues.apache.org/jira/browse/SPARK-50302
             Project: Spark
          Issue Type: Improvement
          Components: Structured Streaming
    Affects Versions: 4.0.0
            Reporter: Neil Ramaswamy


Currently, the TWS operator handles TTL state variables in the same approximate 
way:
 # Upsert the value into the primary index
 # Upsert the expiration timestamp into the secondary index, where the 
expiration timestamp is the `batchTimestampMs`

The issue with this approach is that if the same state variable is updated 
across two different micro-batches, there exists 1 entry in the primary index, 
while there exists _two_ entries in the secondary index. Consider the following 
example for a state variable `foo` with value `v1`, and TTL delay of 500:

Batch 0, `batchTimestampMs = 100`, `foo` updates to `v1`:
 * Primary index: `[foo -> (v1, 600)]`
 * Secondary index: `[(600, foo) -> EMPTY]`

Batch 1: `batchTimestampMs = 200`, `foo` updates to `v2`:
 * Primary index: `[foo -> (v2, 700)]`
 * Secondary index: `[(600, foo) -> EMPTY, (700, foo) -> EMPTY]`

You'll notice that the secondary index now has size 2, even though the primary 
index only has size 1. When we clean up `(600, foo)`, we actually _don't_ 
delete it, since we do [another lookup in the primary index to determine if the 
secondary index entry we're dealing with is 
stale|https://github.com/apache/spark/blob/05508cf7cb9da3042fa4b17645102a6406278695/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImplWithTTL.scala#L109-L113].

So, to summarize, the write path is always write to the primary _and_ secondary 
index, and then for deletion, we go through everything in the secondary index, 
and do a lookup on the primary index to see whether to delete it.

While this may not seem like a huge issue, things get way worse for 
`ListState`. Our cleanup logic for list state is as follows:
 # Grab an iterator for the entire list
 # Clear the entire list
 # For each element in the iterator that isn't expired, merge it back into the 
list

This means that having an erroneous entry in your secondary index means that 
you will go through the entire list _several_ times, which will negatively 
impact performance. We should most definitely make sure that the secondary 
index has only as many elements as the primary index, which will prevent us 
from doing unnecessary work during cleanup.

Solutions to this problem will be proposed in the PR.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to