Csanád Bakos created FLINK-39624:
------------------------------------
Summary: `TtlListState.asyncGet()` returns a non-null wrapper
around a null inner iterator, causing NPE in `onNext`/`isEmpty` when the state
is empty
Key: FLINK-39624
URL: https://issues.apache.org/jira/browse/FLINK-39624
Project: Flink
Issue Type: Bug
Components: Runtime / Async State Processing, Runtime / State Backends
Affects Versions: 2.1.1, 2.2.0, 2.0.1, 2.1.0, 2.0-preview, 2.0.0, 2.3.0,
2.4.0
Environment: - Flink 2.2.0
- ForSt state backend
- Async State V2 enabled
- JDK 17
Reporter: Csanád Bakos
`AppendingState#asyncGet()` documents the contract:
> NOTE TO IMPLEMENTERS: if the state is empty, then this method should return
> null wrapped by a StateFuture.
The ForSt backend honors this contract.
`ForStDBListGetRequest.completeStateFuture` (release-2.2.0):
```java
public void completeStateFuture(byte[] bytesValue) throws IOException {
if (bytesValue == null)
{ future.complete(null); // null per the AppendingState contract return; }
...
```
`TtlListState.asyncGet()` does *{*}not{*}* propagate the null — it wraps
unconditionally:
```java
@Override
public StateFuture<StateIterator<T>> asyncGet() {
// 1. The timestamp of elements in list state isn't updated when get even if
updateTsOnRead is true.
// 2. we don't clear state here cause forst is LSM-tree based.
return original.asyncGet().thenApply(stateIter -> new
AsyncIteratorWrapper(stateIter));
}
```
`AsyncFutureImpl#thenApply` invokes the lambda even when the input value is
`null` (no null short-circuit), so when the underlying state is empty, callers
receive a *{*}non-null{*}* `AsyncIteratorWrapper` whose private
`originalIterator` field is `null`.
Both methods of the wrapper then NPE when invoked:
```java
public StateFuture<Void> onNext(ThrowingConsumer<T, Exception> iterating) {
ThrowingConsumer<TtlValue<T>, Exception> ttlIterating = (item) ->
{ ... }
;
return originalIterator.onNext(ttlIterating); // NPE on line 221
}
public boolean isEmpty() {
return originalIterator.isEmpty(); // NPE on line 226
}
```
Notice that the *{*}synchronous{*}* counterpart `TtlListState#get()` correctly
null-checks:
```java
public Iterable<T> get() {
Iterable<TtlValue<T>> ttlValue = original.get();
ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue;
...
```
So the bug is purely in the async path of `TtlListState`.
## Steps to Reproduce
A `KeyedProcessFunction` (or any keyed operator) with async state V2 enabled,
ForSt backend, on a `ListState<E>` whose descriptor has
`enableTimeToLive(ttlConfig)` set:
1. Without ever calling `asyncAdd`, call `state.asyncGet().thenCompose(iter =>
iter.onNext(consumer))`.
2. Or, call `asyncGet` after a previous `asyncClear()` (ForSt's `Delete(key)`
is `Put(key, null)`, so the next read is `null`).
Either path throws:
```
java.lang.NullPointerException
at
org.apache.flink.runtime.state.v2.ttl.TtlListState$AsyncIteratorWrapper.onNext(TtlListState.java:221)
at
org.apache.flink.core.asyncprocessing.AsyncFutureImpl.lambda$thenCompose$6(AsyncFutureImpl.java:163)
at
org.apache.flink.runtime.asyncprocessing.AsyncFutureFactory.lambda$create$0(AsyncFutureFactory.java:49)
at
org.apache.flink.runtime.asyncprocessing.CallbackRunnerWrapper.lambda$submit$0(CallbackRunnerWrapper.java:57)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:118)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:415)
...
```
## Expected Behavior
Per the `AppendingState#asyncGet` javadoc, `iter` is `null` when state is
empty, so `if (iter == null)` (or `StateFutureUtils.toIterable`'s null guard)
is sufficient to handle the empty case.
## Actual Behavior
`iter` is a non-null `AsyncIteratorWrapper` wrapping a null inner iterator. Any
subsequent call NPEs.
## Suggested Fix
Propagate the null in `TtlListState.asyncGet`:
```java
@Override
public StateFuture<StateIterator<T>> asyncGet() {
return original.asyncGet().thenApply(stateIter ->
stateIter == null ? null : new AsyncIteratorWrapper(stateIter));
}
```
This mirrors the synchronous `get()` and brings the async path back in line
with the `AppendingState` contract.
## Source References (release-2.2.0)
- Buggy method:
https://github.com/apache/flink/blob/release-2.2.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlListState.java#L67-L72
- NPE-prone wrapper:
https://github.com/apache/flink/blob/release-2.2.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlListState.java#L189-L228
- Sync `get()` for comparison:
https://github.com/apache/flink/blob/release-2.2.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlListState.java#L79-L85
- Contract source (ForSt):
https://github.com/apache/flink/blob/release-2.2.0/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBListGetRequest.java#L46-L50
- Contract spec:
https://github.com/apache/flink/blob/release-2.2.0/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/AppendingState.java#L40-L51
- `thenApply` does not short-circuit on null:
https://github.com/apache/flink/blob/release-2.2.0/flink-core/src/main/java/org/apache/flink/core/asyncprocessing/AsyncFutureImpl.java#L60-L92
--
This message was sent by Atlassian Jira
(v8.20.10#820010)