[ 
https://issues.apache.org/jira/browse/SPARK-57052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anupam Yadav updated SPARK-57052:
---------------------------------
    Description: 
RocksDBStateStoreProvider performs state row format validation (via 
StateStoreProvider.validateStateRowFormat) on most read-path methods to catch 
schema incompatibilities early. However, multiGet() — a read method that 
decodes rows — is missing this validation:

||Method||Decodes rows?||Has validation?||
|get()|Yes|(/)|  
|iterator()|Yes|(/)|
|prefixScan()|Yes|(/)|  
|rangeScan()|Yes|(/)|
|multiGet()|Yes|(x)|

multiGet() decodes values via encodedValues.map(kvEncoder._2.decodeValue) but 
never calls validateStateRowFormat. If a stateful operator evolves its schema 
between restarts, multiGet() will silently return corrupted data instead of 
failing fast with StateStoreValueRowFormatValidationFailure.

h3. Fix

Add the same validateStateRowFormat guard (gated by !isValidated && value != 
null && !useColumnFamilies) to multiGet(), matching the existing pattern in 
get() and iterator().

h3. Acceptance Criteria

* multiGet() calls validateStateRowFormat on the first non-null result, 
consistent with get()
* Add unit tests that verify StateStoreValueRowFormatValidationFailure is 
thrown from multiGet() when a mismatched schema is used
* Existing RocksDBStateStoreProviderSuite tests pass with no regressions

h3. Note on valuesIterator

valuesIterator() also lacks validation, but it requires column families to be 
enabled (useMultipleValuesPerKey=true implies useColumnFamilies=true). Since 
the current validation guard skips when useColumnFamilies=true, adding 
validation to valuesIterator would require relaxing that constraint — a broader 
change tracked separately.

----

*Prior art:* SPARK-56539 / [PR 
#55468|https://github.com/apache/spark/pull/55468] added the same validation to 
prefixScan and rangeScan.

  was:
RocksDBStateStoreProvider performs state row format validation (via 
StateStoreProvider.validateStateRowFormat) on most read-path methods to catch 
schema incompatibilities early. However, two read methods that decode rows are 
missing this validation:

||Method||Decodes rows?||Has validation?||
|get()|Yes|(/)|  
|iterator()|Yes|(/)|
|prefixScan()|Yes|(/)|  
|rangeScan()|Yes|(/)|
|multiGet()|Yes|(x)|
|valuesIterator()|Yes|(x)|

multiGet() decodes values via encodedValues.map(kvEncoder._2.decodeValue) but 
never calls validateStateRowFormat. Similarly, valuesIterator() decodes values 
via valueEncoder.decodeValues() without validation.

If a stateful operator evolves its schema between restarts, these methods will 
silently return corrupted data instead of failing fast with 
StateStoreKeyRowFormatValidationFailure / 
StateStoreValueRowFormatValidationFailure.

h3. Fix

Add the same validateStateRowFormat guard (gated by !isValidated && value != 
null && !useColumnFamilies) to multiGet() and valuesIterator(), matching the 
existing pattern in get() and iterator().

h3. Acceptance Criteria

* multiGet() calls validateStateRowFormat on the first non-null result, 
consistent with get()
* valuesIterator() calls validateStateRowFormat on the first decoded row, 
consistent with iterator()
* Add unit tests that verify StateStoreKeyRowFormatValidationFailure is thrown 
from both methods when a mismatched schema is used
* Existing RocksDBStateStoreProviderSuite tests pass with no regressions

----

*Prior art:* SPARK-56539 / [PR 
#55468|https://github.com/apache/spark/pull/55468] added the same validation to 
prefixScan and rangeScan.

        Summary: [SS] Add state row format validation to multiGet in 
RocksDBStateStoreProvider  (was: [SS] Add state row format validation to 
multiGet and valuesIterator in RocksDBStateStoreProvider)

> [SS] Add state row format validation to multiGet in RocksDBStateStoreProvider
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-57052
>                 URL: https://issues.apache.org/jira/browse/SPARK-57052
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 4.0.0
>            Reporter: Anupam Yadav
>            Priority: Major
>              Labels: pull-request-available
>
> RocksDBStateStoreProvider performs state row format validation (via 
> StateStoreProvider.validateStateRowFormat) on most read-path methods to catch 
> schema incompatibilities early. However, multiGet() — a read method that 
> decodes rows — is missing this validation:
> ||Method||Decodes rows?||Has validation?||
> |get()|Yes|(/)|  
> |iterator()|Yes|(/)|
> |prefixScan()|Yes|(/)|  
> |rangeScan()|Yes|(/)|
> |multiGet()|Yes|(x)|
> multiGet() decodes values via encodedValues.map(kvEncoder._2.decodeValue) but 
> never calls validateStateRowFormat. If a stateful operator evolves its schema 
> between restarts, multiGet() will silently return corrupted data instead of 
> failing fast with StateStoreValueRowFormatValidationFailure.
> h3. Fix
> Add the same validateStateRowFormat guard (gated by !isValidated && value != 
> null && !useColumnFamilies) to multiGet(), matching the existing pattern in 
> get() and iterator().
> h3. Acceptance Criteria
> * multiGet() calls validateStateRowFormat on the first non-null result, 
> consistent with get()
> * Add unit tests that verify StateStoreValueRowFormatValidationFailure is 
> thrown from multiGet() when a mismatched schema is used
> * Existing RocksDBStateStoreProviderSuite tests pass with no regressions
> h3. Note on valuesIterator
> valuesIterator() also lacks validation, but it requires column families to be 
> enabled (useMultipleValuesPerKey=true implies useColumnFamilies=true). Since 
> the current validation guard skips when useColumnFamilies=true, adding 
> validation to valuesIterator would require relaxing that constraint — a 
> broader change tracked separately.
> ----
> *Prior art:* SPARK-56539 / [PR 
> #55468|https://github.com/apache/spark/pull/55468] added the same validation 
> to prefixScan and rangeScan.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to