Sundaram Ananthanarayanan created FLINK-20159:
-------------------------------------------------
Summary: [FLIP-27 source] FutureNotifier does not return a new
future when Future::future() is invoked within the returned future's callback
Key: FLINK-20159
URL: https://issues.apache.org/jira/browse/FLINK-20159
Project: Flink
Issue Type: Bug
Components: API / DataStream
Affects Versions: 1.11.2
Reporter: Sundaram Ananthanarayanan
Fix For: 1.12.0
Here's the *problem*. FutureNotifier::future should return a new future every
time the previous future was completed. That's the expectation. However, if the
future is being requested from within the completion callback of the previous
future, then it, instead of returning a new future, returns the existing
future. This could potentially result in infinite recursions depending on how
the callback method is implemented. Here's an example:
{code:java}
Consumer code:
void consumeDataOnce() {
// get the data from the producer and check if it was empty
Data data = producer.getData();
// if data was empty, then grab the future and attach a callback as below
if (data.isEmpty()) {
producer.getCompletableFuture().thenRun(() -> consumeDataOnce());
}
}
{code}
In the above method, let's say the producer notified the consumer (produced by
FutureNofier::future), thinking that some data was available to be consumed.
Now let's say the data returned from the producer was instead empty during the
callback. In this case, the method goes on in an infinite loop when the future
is completed.
*Issue:* If you observe FutureNotifier::notifyComplete's implementation
closely, you realize that the future is completed before the futureRef is
swapped with null.
{code:java}
public void notifyComplete() {
CompletableFuture<Void> future = futureRef.get();
// If there are multiple threads trying to complete the future, only the
first one succeeds.
if (future != null && future.complete(null)) {
futureRef.compareAndSet(future, null);
}
}
{code}
If we can change the ordering instead, where the future is swapped atomically
first before being completed, then we can guarantee that the future returned by
FutureNotifier::future will always be a new one if the previous one had
completed.
[~sewen] [~jqin] [~stevenz3wu]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)