Neil Ramaswamy created SPARK-50302:
--------------------------------------
Summary: TransformWithState secondary index sizes should be
proportional to primary indexes
Key: SPARK-50302
URL: https://issues.apache.org/jira/browse/SPARK-50302
Project: Spark
Issue Type: Improvement
Components: Structured Streaming
Affects Versions: 4.0.0
Reporter: Neil Ramaswamy
Currently, the TWS operator handles TTL state variables in the same approximate
way:
# Upsert the value into the primary index
# Upsert the expiration timestamp into the secondary index, where the
expiration timestamp is the `batchTimestampMs`
The issue with this approach is that if the same state variable is updated
across two different micro-batches, there exists 1 entry in the primary index,
while there exists _two_ entries in the secondary index. Consider the following
example for a state variable `foo` with value `v1`, and TTL delay of 500:
Batch 0, `batchTimestampMs = 100`, `foo` updates to `v1`:
* Primary index: `[foo -> (v1, 600)]`
* Secondary index: `[(600, foo) -> EMPTY]`
Batch 1: `batchTimestampMs = 200`, `foo` updates to `v2`:
* Primary index: `[foo -> (v2, 700)]`
* Secondary index: `[(600, foo) -> EMPTY, (700, foo) -> EMPTY]`
You'll notice that the secondary index now has size 2, even though the primary
index only has size 1. When we clean up `(600, foo)`, we actually _don't_
delete it, since we do [another lookup in the primary index to determine if the
secondary index entry we're dealing with is
stale|https://github.com/apache/spark/blob/05508cf7cb9da3042fa4b17645102a6406278695/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImplWithTTL.scala#L109-L113].
So, to summarize, the write path is always write to the primary _and_ secondary
index, and then for deletion, we go through everything in the secondary index,
and do a lookup on the primary index to see whether to delete it.
While this may not seem like a huge issue, things get way worse for
`ListState`. Our cleanup logic for list state is as follows:
# Grab an iterator for the entire list
# Clear the entire list
# For each element in the iterator that isn't expired, merge it back into the
list
This means that having an erroneous entry in your secondary index means that
you will go through the entire list _several_ times, which will negatively
impact performance. We should most definitely make sure that the secondary
index has only as many elements as the primary index, which will prevent us
from doing unnecessary work during cleanup.
Solutions to this problem will be proposed in the PR.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]