[ 
https://issues.apache.org/jira/browse/FLINK-39624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Csanád Bakos updated FLINK-39624:
---------------------------------
    Description: 

`AppendingState#asyncGet()` documents the contract:

bq. NOTE TO IMPLEMENTERS: if the state is empty, then this method should return 
null wrapped by a StateFuture.

The ForSt backend honors this contract. 
`ForStDBListGetRequest.completeStateFuture` (release-2.2.0):

{code:java}
public void completeStateFuture(byte[] bytesValue) throws IOException {
    if (bytesValue == null) {
        future.complete(null);   // null per the AppendingState contract
        return;
    }
    ...
{code}

`TtlListState.asyncGet()` does *not* propagate the null — it wraps 
unconditionally:

{code:java}
@Override
public StateFuture<StateIterator<T>> asyncGet() {
    // 1. The timestamp of elements in list state isn't updated when get even 
if updateTsOnRead is true.
    // 2. we don't clear state here cause forst is LSM-tree based.
    return original.asyncGet().thenApply(stateIter -> new 
AsyncIteratorWrapper(stateIter));
}
{code}

`AsyncFutureImpl#thenApply` invokes the lambda even when the input value is 
`null` (no null short-circuit), so when the underlying state is empty, callers 
receive a *non-null* `AsyncIteratorWrapper` whose private `originalIterator` 
field is `null`.

Both methods of the wrapper then NPE when invoked:

{code:java}
public StateFuture<Void> onNext(ThrowingConsumer<T, Exception> iterating) {
    ThrowingConsumer<TtlValue<T>, Exception> ttlIterating = (item) -> { ... };
    return originalIterator.onNext(ttlIterating);   // NPE on line 221
}

public boolean isEmpty() {
    return originalIterator.isEmpty();              // NPE on line 226
}
{code}

Notice that the *synchronous* counterpart `TtlListState#get()` correctly 
null-checks:

{code:java}
public Iterable<T> get() {
    Iterable<TtlValue<T>> ttlValue = original.get();
    ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue;
    ...
{code}

So the bug is purely in the async path of `TtlListState`.

## Steps to Reproduce

A `KeyedProcessFunction` (or any keyed operator) with async state V2 enabled, 
ForSt backend, on a `ListState<E>` whose descriptor has 
`enableTimeToLive(ttlConfig)` set:

# Without ever calling `asyncAdd`, call `state.asyncGet().thenCompose(iter -> 
iter.onNext(consumer))`.
# Or, call `asyncGet` after a previous `asyncClear()` (ForSt's `Delete(key)` is 
`Put(key, null)`, so the next read is `null`).

Either path throws:

{noformat}
java.lang.NullPointerException
    at 
org.apache.flink.runtime.state.v2.ttl.TtlListState$AsyncIteratorWrapper.onNext(TtlListState.java:221)
    at 
org.apache.flink.core.asyncprocessing.AsyncFutureImpl.lambda$thenCompose$6(AsyncFutureImpl.java:163)
    at 
org.apache.flink.runtime.asyncprocessing.AsyncFutureFactory.lambda$create$0(AsyncFutureFactory.java:49)
    at 
org.apache.flink.runtime.asyncprocessing.CallbackRunnerWrapper.lambda$submit$0(CallbackRunnerWrapper.java:57)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:118)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:415)
    ...
{noformat}

## Expected Behavior

Per the `AppendingState#asyncGet` javadoc, `iter` is `null` when state is 
empty, so `if (iter == null)` (or `StateFutureUtils.toIterable`'s null guard) 
is sufficient to handle the empty case.

## Actual Behavior

`iter` is a non-null `AsyncIteratorWrapper` wrapping a null inner iterator. Any 
subsequent call NPEs.

## Suggested Fix

Propagate the null in `TtlListState.asyncGet`:

{code:java}
@Override
public StateFuture<StateIterator<T>> asyncGet() {
    return original.asyncGet().thenApply(stateIter ->
        stateIter == null ? null : new AsyncIteratorWrapper(stateIter));
}
{code}

This mirrors the synchronous `get()` and brings the async path back in line 
with the `AppendingState` contract.

## Source References (release-2.2.0)

- Buggy method: 
https://github.com/apache/flink/blob/release-2.2.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlListState.java#L67-L72
- NPE-prone wrapper: 
https://github.com/apache/flink/blob/release-2.2.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlListState.java#L189-L228
- Sync `get()` for comparison: 
https://github.com/apache/flink/blob/release-2.2.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlListState.java#L79-L85
- Contract source (ForSt): 
https://github.com/apache/flink/blob/release-2.2.0/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBListGetRequest.java#L46-L50
- Contract spec: 
https://github.com/apache/flink/blob/release-2.2.0/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/AppendingState.java#L40-L51
- `thenApply` does not short-circuit on null: 
https://github.com/apache/flink/blob/release-2.2.0/flink-core/src/main/java/org/apache/flink/core/asyncprocessing/AsyncFutureImpl.java#L60-L92


  was:
`AppendingState#asyncGet()` documents the contract:

> NOTE TO IMPLEMENTERS: if the state is empty, then this method should return 
> null wrapped by a StateFuture.

The ForSt backend honors this contract. 
`ForStDBListGetRequest.completeStateFuture` (release-2.2.0):


{code:java}
public void completeStateFuture(byte[] bytesValue) throws IOException {
if (bytesValue == null)

{ future.complete(null); // null per the AppendingState contract return; }

...
{code}


`TtlListState.asyncGet()` does *not* propagate the null — it wraps 
unconditionally:

```java
@Override
public StateFuture<StateIterator<T>> asyncGet() {
// 1. The timestamp of elements in list state isn't updated when get even if 
updateTsOnRead is true.
// 2. we don't clear state here cause forst is LSM-tree based.
return original.asyncGet().thenApply(stateIter -> new 
AsyncIteratorWrapper(stateIter));
}
```

`AsyncFutureImpl#thenApply` invokes the lambda even when the input value is 
`null` (no null short-circuit), so when the underlying state is empty, callers 
receive a *non-null*`AsyncIteratorWrapper` whose private `originalIterator` 
field is `null`.

Both methods of the wrapper then NPE when invoked:

```java
public StateFuture<Void> onNext(ThrowingConsumer<T, Exception> iterating) {
ThrowingConsumer<TtlValue<T>, Exception> ttlIterating = (item) ->

{ ... }

;
return originalIterator.onNext(ttlIterating); // NPE on line 221
}

public boolean isEmpty() {
return originalIterator.isEmpty(); // NPE on line 226
}
```

Notice that the *synchronous* counterpart `TtlListState#get()` correctly 
null-checks:

```java
public Iterable<T> get() {
Iterable<TtlValue<T>> ttlValue = original.get();
ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue;
...
```

So the bug is purely in the async path of `TtlListState`.

 
## Steps to Reproduce

A `KeyedProcessFunction` (or any keyed operator) with async state V2 enabled, 
ForSt backend, on a `ListState<E>` whose descriptor has 
`enableTimeToLive(ttlConfig)` set:

1. Without ever calling `asyncAdd`, call `state.asyncGet().thenCompose(iter => 
iter.onNext(consumer))`.
2. Or, call `asyncGet` after a previous `asyncClear()` (ForSt's `Delete(key)` 
is `Put(key, null)`, so the next read is `null`).

Either path throws:

```
java.lang.NullPointerException
at 
org.apache.flink.runtime.state.v2.ttl.TtlListState$AsyncIteratorWrapper.onNext(TtlListState.java:221)
at 
org.apache.flink.core.asyncprocessing.AsyncFutureImpl.lambda$thenCompose$6(AsyncFutureImpl.java:163)
at 
org.apache.flink.runtime.asyncprocessing.AsyncFutureFactory.lambda$create$0(AsyncFutureFactory.java:49)
at 
org.apache.flink.runtime.asyncprocessing.CallbackRunnerWrapper.lambda$submit$0(CallbackRunnerWrapper.java:57)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:118)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:415)
...
```

## Expected Behavior

Per the `AppendingState#asyncGet` javadoc, `iter` is `null` when state is 
empty, so `if (iter == null)` (or `StateFutureUtils.toIterable`'s null guard) 
is sufficient to handle the empty case.

## Actual Behavior

`iter` is a non-null `AsyncIteratorWrapper` wrapping a null inner iterator. Any 
subsequent call NPEs.

## Suggested Fix

Propagate the null in `TtlListState.asyncGet`:

```java
@Override
public StateFuture<StateIterator<T>> asyncGet() {
return original.asyncGet().thenApply(stateIter ->
stateIter == null ? null : new AsyncIteratorWrapper(stateIter));
}
```

This mirrors the synchronous `get()` and brings the async path back in line 
with the `AppendingState` contract. 

## Source References (release-2.2.0)

- Buggy method: 
https://github.com/apache/flink/blob/release-2.2.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlListState.java#L67-L72
- NPE-prone wrapper: 
https://github.com/apache/flink/blob/release-2.2.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlListState.java#L189-L228
- Sync `get()` for comparison: 
https://github.com/apache/flink/blob/release-2.2.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlListState.java#L79-L85
- Contract source (ForSt): 
https://github.com/apache/flink/blob/release-2.2.0/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBListGetRequest.java#L46-L50
- Contract spec: 
https://github.com/apache/flink/blob/release-2.2.0/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/AppendingState.java#L40-L51
- `thenApply` does not short-circuit on null: 
https://github.com/apache/flink/blob/release-2.2.0/flink-core/src/main/java/org/apache/flink/core/asyncprocessing/AsyncFutureImpl.java#L60-L92


> `TtlListState.asyncGet()` returns a non-null wrapper around a null inner 
> iterator, causing NPE in `onNext`/`isEmpty` when the state is empty
> --------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39624
>                 URL: https://issues.apache.org/jira/browse/FLINK-39624
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Async State Processing, Runtime / State 
> Backends
>    Affects Versions: 2.0.0, 2.0-preview, 2.1.0, 2.0.1, 2.2.0, 2.1.1, 2.3.0, 
> 2.4.0
>         Environment: - Flink 2.2.0
> - ForSt state backend
> - Async State V2 enabled
> - JDK 17
>            Reporter: Csanád Bakos
>            Priority: Major
>
> `AppendingState#asyncGet()` documents the contract:
> bq. NOTE TO IMPLEMENTERS: if the state is empty, then this method should 
> return null wrapped by a StateFuture.
> The ForSt backend honors this contract. 
> `ForStDBListGetRequest.completeStateFuture` (release-2.2.0):
> {code:java}
> public void completeStateFuture(byte[] bytesValue) throws IOException {
>     if (bytesValue == null) {
>         future.complete(null);   // null per the AppendingState contract
>         return;
>     }
>     ...
> {code}
> `TtlListState.asyncGet()` does *not* propagate the null — it wraps 
> unconditionally:
> {code:java}
> @Override
> public StateFuture<StateIterator<T>> asyncGet() {
>     // 1. The timestamp of elements in list state isn't updated when get even 
> if updateTsOnRead is true.
>     // 2. we don't clear state here cause forst is LSM-tree based.
>     return original.asyncGet().thenApply(stateIter -> new 
> AsyncIteratorWrapper(stateIter));
> }
> {code}
> `AsyncFutureImpl#thenApply` invokes the lambda even when the input value is 
> `null` (no null short-circuit), so when the underlying state is empty, 
> callers receive a *non-null* `AsyncIteratorWrapper` whose private 
> `originalIterator` field is `null`.
> Both methods of the wrapper then NPE when invoked:
> {code:java}
> public StateFuture<Void> onNext(ThrowingConsumer<T, Exception> iterating) {
>     ThrowingConsumer<TtlValue<T>, Exception> ttlIterating = (item) -> { ... };
>     return originalIterator.onNext(ttlIterating);   // NPE on line 221
> }
> public boolean isEmpty() {
>     return originalIterator.isEmpty();              // NPE on line 226
> }
> {code}
> Notice that the *synchronous* counterpart `TtlListState#get()` correctly 
> null-checks:
> {code:java}
> public Iterable<T> get() {
>     Iterable<TtlValue<T>> ttlValue = original.get();
>     ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue;
>     ...
> {code}
> So the bug is purely in the async path of `TtlListState`.
> ## Steps to Reproduce
> A `KeyedProcessFunction` (or any keyed operator) with async state V2 enabled, 
> ForSt backend, on a `ListState<E>` whose descriptor has 
> `enableTimeToLive(ttlConfig)` set:
> # Without ever calling `asyncAdd`, call `state.asyncGet().thenCompose(iter -> 
> iter.onNext(consumer))`.
> # Or, call `asyncGet` after a previous `asyncClear()` (ForSt's `Delete(key)` 
> is `Put(key, null)`, so the next read is `null`).
> Either path throws:
> {noformat}
> java.lang.NullPointerException
>     at 
> org.apache.flink.runtime.state.v2.ttl.TtlListState$AsyncIteratorWrapper.onNext(TtlListState.java:221)
>     at 
> org.apache.flink.core.asyncprocessing.AsyncFutureImpl.lambda$thenCompose$6(AsyncFutureImpl.java:163)
>     at 
> org.apache.flink.runtime.asyncprocessing.AsyncFutureFactory.lambda$create$0(AsyncFutureFactory.java:49)
>     at 
> org.apache.flink.runtime.asyncprocessing.CallbackRunnerWrapper.lambda$submit$0(CallbackRunnerWrapper.java:57)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:118)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:415)
>     ...
> {noformat}
> ## Expected Behavior
> Per the `AppendingState#asyncGet` javadoc, `iter` is `null` when state is 
> empty, so `if (iter == null)` (or `StateFutureUtils.toIterable`'s null guard) 
> is sufficient to handle the empty case.
> ## Actual Behavior
> `iter` is a non-null `AsyncIteratorWrapper` wrapping a null inner iterator. 
> Any subsequent call NPEs.
> ## Suggested Fix
> Propagate the null in `TtlListState.asyncGet`:
> {code:java}
> @Override
> public StateFuture<StateIterator<T>> asyncGet() {
>     return original.asyncGet().thenApply(stateIter ->
>         stateIter == null ? null : new AsyncIteratorWrapper(stateIter));
> }
> {code}
> This mirrors the synchronous `get()` and brings the async path back in line 
> with the `AppendingState` contract.
> ## Source References (release-2.2.0)
> - Buggy method: 
> https://github.com/apache/flink/blob/release-2.2.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlListState.java#L67-L72
> - NPE-prone wrapper: 
> https://github.com/apache/flink/blob/release-2.2.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlListState.java#L189-L228
> - Sync `get()` for comparison: 
> https://github.com/apache/flink/blob/release-2.2.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlListState.java#L79-L85
> - Contract source (ForSt): 
> https://github.com/apache/flink/blob/release-2.2.0/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBListGetRequest.java#L46-L50
> - Contract spec: 
> https://github.com/apache/flink/blob/release-2.2.0/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/AppendingState.java#L40-L51
> - `thenApply` does not short-circuit on null: 
> https://github.com/apache/flink/blob/release-2.2.0/flink-core/src/main/java/org/apache/flink/core/asyncprocessing/AsyncFutureImpl.java#L60-L92



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to