This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e85cf8c4cdf [FLINK-27878][datastream] Add Retry Support For Async I/O 
In DataStream API
e85cf8c4cdf is described below

commit e85cf8c4cdf417b47f8d53bf3bb202f79e92b205
Author: lincoln lee <lincoln.8...@gmail.com>
AuthorDate: Fri Apr 29 14:56:23 2022 +0800

    [FLINK-27878][datastream] Add Retry Support For Async I/O In DataStream API
    
    This closes #19983.
---
 .../docs/dev/datastream/operators/asyncio.md       |  59 ++-
 .../docs/dev/datastream/operators/asyncio.md       |  72 +++-
 .../streaming/api/datastream/AsyncDataStream.java  | 176 ++++++++-
 .../api/functions/async/AsyncRetryPredicate.java   |  47 +++
 .../api/functions/async/AsyncRetryStrategy.java    |  37 ++
 .../api/operators/async/AsyncWaitOperator.java     | 227 +++++++++++-
 .../operators/async/AsyncWaitOperatorFactory.java  |  15 +
 .../util/retryable/AsyncRetryStrategies.java       | 240 +++++++++++++
 .../streaming/util/retryable/RetryPredicates.java  |  85 +++++
 .../api/operators/async/AsyncWaitOperatorTest.java | 240 ++++++++++++-
 .../streaming/api/scala/AsyncDataStream.scala      | 397 ++++++++++++++++++++-
 .../api/scala/async/AsyncRetryPredicate.scala      |  47 +++
 .../api/scala/async/AsyncRetryStrategy.scala       |  34 ++
 .../api/scala/AsyncDataStreamITCase.scala          | 122 ++++++-
 14 files changed, 1755 insertions(+), 43 deletions(-)

diff --git a/docs/content.zh/docs/dev/datastream/operators/asyncio.md 
b/docs/content.zh/docs/dev/datastream/operators/asyncio.md
index d1054879bef..6dbddab5824 100644
--- a/docs/content.zh/docs/dev/datastream/operators/asyncio.md
+++ b/docs/content.zh/docs/dev/datastream/operators/asyncio.md
@@ -30,6 +30,8 @@ under the License.
 对于不熟悉异步或者事件驱动编程的用户,建议先储备一些关于 Future 和事件驱动编程的知识。
 
 提示:这篇文档 [FLIP-12: 异步 I/O 
的设计和实现](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673)介绍了关于设计和实现异步
 I/O 功能的细节。
+对于新增的重试支持的实现细节可以参考[FLIP-232: 为 DataStream API 异步 I/O 
操作增加重试支持](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963)。
+
 
 ## 对于异步 I/O 操作的需求
 
@@ -60,7 +62,7 @@ Flink 的异步 I/O API 允许用户在流处理中使用异步请求客户端
 
 - 实现分发请求的 `AsyncFunction`
 - 获取数据库交互的结果并发送给 `ResultFuture` 的 *回调* 函数
-- 将异步 I/O 操作应用于 `DataStream` 作为 `DataStream` 的一次转换操作。
+- 将异步 I/O 操作应用于 `DataStream` 作为 `DataStream` 的一次转换操作, 启用或者不启用重试。
 
 下面是基本的代码模板:
 
@@ -115,10 +117,21 @@ class AsyncDatabaseRequest extends 
RichAsyncFunction<String, Tuple2<String, Stri
 // 创建初始 DataStream
 DataStream<String> stream = ...;
 
-// 应用异步 I/O 转换操作
+// 应用异步 I/O 转换操作,不启用重试
 DataStream<Tuple2<String, String>> resultStream =
     AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, 
TimeUnit.MILLISECONDS, 100);
 
+// 或 应用异步 I/O 转换操作并启用重试
+// 通过工具类创建一个异步重试策略, 或用户实现自定义的策略
+AsyncRetryStrategy asyncRetryStrategy =
+       new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // 
maxAttempts=3, fixedDelay=100ms
+               .retryIfResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
+               .retryIfException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
+               .build();
+
+// 应用异步 I/O 转换操作并启用重试
+DataStream<Tuple2<String, String>> resultStream =
+       AsyncDataStream.unorderedWaitWithRetry(stream, new 
AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100, asyncRetryStrategy);
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
@@ -151,10 +164,17 @@ class AsyncDatabaseRequest extends AsyncFunction[String, 
(String, String)] {
 // 创建初始 DataStream
 val stream: DataStream[String] = ...
 
-// 应用异步 I/O 转换操作
+// 应用异步 I/O 转换操作,不启用重试
 val resultStream: DataStream[(String, String)] =
     AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, 
TimeUnit.MILLISECONDS, 100)
 
+// 或 应用异步 I/O 转换操作并启用重试
+// 创建一个异步重试策略
+val asyncRetryStrategy: AsyncRetryStrategy[OUT] = ...
+
+// 应用异步 I/O 转换操作并启用重试
+val resultStream: DataStream[(String, String)] =
+  AsyncDataStream.unorderedWaitWithRetry(stream, new AsyncDatabaseRequest(), 
1000, TimeUnit.MILLISECONDS, 100, asyncRetryStrategy)
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -164,11 +184,12 @@ val resultStream: DataStream[(String, String)] =
 
 下面两个参数控制异步操作:
 
-  - **Timeout**: 超时参数定义了异步请求发出多久后未得到响应即被认定为失败。 它可以防止一直等待得不到响应的请求。
+  - **Timeout**: 超时参数定义了异步操作执行多久未完成、最终认定为失败的时长,如果启用重试,则可能包括多个重试请求。 
它可以防止一直等待得不到响应的请求。
 
   - **Capacity**: 容量参数定义了可以同时进行的异步请求数。
     即使异步 I/O 通常带来更高的吞吐量,执行异步 I/O 操作的算子仍然可能成为流处理的瓶颈。 
限制并发请求的数量可以确保算子不会持续累积待处理的请求进而造成积压,而是在容量耗尽时触发反压。
 
+  - **AsyncRetryStrategy**: 重试策略参数定义了什么条件会触发延迟重试以及延迟的策略,例如,固定延迟、指数后退延迟、自定义实现等。
 
 ### 超时处理
 
@@ -211,6 +232,16 @@ Flink 提供两种模式控制结果记录以何种顺序发出。
 异步 I/O 算子提供了完全的精确一次容错保证。它将在途的异步请求的记录保存在 checkpoint 中,在故障恢复时重新触发请求。
 
 
+### 重试支持
+
+重试支持为异步 I/O 操作引入了一个内置重试机制,它对用户的异步函数实现逻辑是透明的。
+
+  - **AsyncRetryStrategy**: 异步重试策略包含了触发重试条件 `AsyncRetryPredicate` 
定义,以及根据当前已尝试次数判断是否继续重试、下次重试间隔时长的接口方法。
+    
需要注意,在满足触发重试条件后,有可能因为当前重试次数超过预设的上限放弃重试,或是在任务结束时被强制终止重试(这种情况下,系统以最后一次执行的结果或异常作为最终状态)。
+
+  - **AsyncRetryPredicate**: 触发重试条件可以选择基于返回结果、 执行异常来定义条件,两种条件是或的关系,满足其一即会触发。
+
+
 ### 实现提示
 
 在实现使用 *Executor*(或者 Scala 中的 *ExecutionContext*)和回调的 *Futures* 时,建议使用 
`DirectExecutor`,因为通常回调的工作量很小,`DirectExecutor` 避免了额外的线程切换开销。回调通常只是把结果发送给 
`ResultFuture`,也就是把它添加进输出缓冲。从这里开始,包括发送记录和与 chenkpoint 交互在内的繁重逻辑都将在专有的线程池中进行处理。
@@ -233,4 +264,24 @@ Flink 提供两种模式控制结果记录以何种顺序发出。
 
 **默认情况下,AsyncFunction 
的算子(异步等待算子)可以在作业图的任意处使用,但它不能与`SourceFunction`/`SourceStreamTask`组成算子链**
 
+**启用重试后可能需要更大的缓冲队列容量**
+
+新的重试功能可能会导致更大的队列容量要求,最大数量可以近似地评估如下。
+
+```
+inputRate * retryRate * avgRetryDuration
+```
+
+例如,对于一个输入率=100条记录/秒的任务,其中1%的元素将平均触发1次重试,平均重试时间为60秒,额外的队列容量要求为:
+
+```
+100条记录/秒 * 1% * 60s = 60
+```
+
+也就是说,在无序输出模式下,给工作队列增加 60 个容量可能不会影响吞吐量; 而在有序模式下,头部元素是关键点,它未完成的时间越长,算子提供的处理延迟就越长,
+在相同的超时约束下,如果头元素事实上获得了更多的重试, 
那重试功能可能会增加头部元素的处理时间即未完成时间,也就是说在有序模式下,增大队列容量并不是总能提升吞吐。
+
+当队列容量增长时( 这是缓解背压的常用方法),OOM 的风险会随之增加。对于 `ListState` 存储来说,理论的上限是 
`Integer.MAX_VALUE`,
+所以, 虽然事实上队列容量的限制是一样的,但我们在生产中不能把队列容量增加到太大,这种情况下增加任务的并行性也许更可行。
+
 {{< top >}}
diff --git a/docs/content/docs/dev/datastream/operators/asyncio.md 
b/docs/content/docs/dev/datastream/operators/asyncio.md
index 8be36601dad..f16424ce4d1 100644
--- a/docs/content/docs/dev/datastream/operators/asyncio.md
+++ b/docs/content/docs/dev/datastream/operators/asyncio.md
@@ -32,7 +32,8 @@ event-driven programming may be useful preparation.
 
 Note: Details about the design and implementation of the asynchronous I/O 
utility can be found in the proposal and design document
 [FLIP-12: Asynchronous I/O Design and 
Implementation](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673).
-
+Details about the new retry support can be found in document
+[FLIP-232: Add Retry Support For Async I/O In DataStream 
API](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963).
 
 ## The need for Asynchronous I/O Operations
 
@@ -68,14 +69,14 @@ efficient than a proper asynchronous client.
 ## Async I/O API
 
 Flink's Async I/O API allows users to use asynchronous request clients with 
data streams. The API handles the integration with
-data streams, well as handling order, event time, fault tolerance, etc.
+data streams, well as handling order, event time, fault tolerance, retry 
support, etc.
 
 Assuming one has an asynchronous client for the target database, three parts 
are needed to implement a stream transformation
 with asynchronous I/O against the database:
 
   - An implementation of `AsyncFunction` that dispatches the requests
   - A *callback* that takes the result of the operation and hands it to the 
`ResultFuture`
-  - Applying the async I/O operation on a DataStream as a transformation
+  - Applying the async I/O operation on a DataStream as a transformation with 
or without retry
 
 The following code example illustrates the basic pattern:
 
@@ -131,10 +132,21 @@ class AsyncDatabaseRequest extends 
RichAsyncFunction<String, Tuple2<String, Stri
 // create the original stream
 DataStream<String> stream = ...;
 
-// apply the async I/O transformation
+// apply the async I/O transformation without retry
 DataStream<Tuple2<String, String>> resultStream =
     AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, 
TimeUnit.MILLISECONDS, 100);
 
+// or apply the async I/O transformation with retry
+// create an async retry strategy via utility class or a user defined strategy
+AsyncRetryStrategy asyncRetryStrategy =
+       new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // 
maxAttempts=3, fixedDelay=100ms
+               .retryIfResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
+               .retryIfException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
+               .build();
+
+// apply the async I/O transformation with retry
+DataStream<Tuple2<String, String>> resultStream =
+       AsyncDataStream.unorderedWaitWithRetry(stream, new 
AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100, asyncRetryStrategy);
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
@@ -167,10 +179,18 @@ class AsyncDatabaseRequest extends AsyncFunction[String, 
(String, String)] {
 // create the original stream
 val stream: DataStream[String] = ...
 
-// apply the async I/O transformation
+// apply the async I/O transformation without retry
 val resultStream: DataStream[(String, String)] =
     AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, 
TimeUnit.MILLISECONDS, 100)
 
+// apply the async I/O transformation with retry
+// create an AsyncRetryStrategy
+val asyncRetryStrategy: AsyncRetryStrategy[OUT] = ...
+
+// apply the async I/O transformation with retry
+val resultStream: DataStream[(String, String)] =
+  AsyncDataStream.unorderedWaitWithRetry(stream, new AsyncDatabaseRequest(), 
1000, TimeUnit.MILLISECONDS, 100, asyncRetryStrategy)
+
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -178,10 +198,10 @@ val resultStream: DataStream[(String, String)] =
 **Important note**: The `ResultFuture` is completed with the first call of 
`ResultFuture.complete`.
 All subsequent `complete` calls will be ignored.
 
-The following two parameters control the asynchronous operations:
+The following three parameters control the asynchronous operations:
 
-  - **Timeout**: The timeout defines how long an asynchronous request may take 
before it is considered failed. This parameter
-    guards against dead/failed requests.
+  - **Timeout**: The timeout defines how long an asynchronous operation take 
before it is finally considered failed,
+    may include multiple retry requests if retry enabled. This parameter 
guards against dead/failed requests.
 
   - **Capacity**: This parameter defines how many asynchronous requests may be 
in progress at the same time.
     Even though the async I/O approach leads typically to much better 
throughput, the operator can still be the bottleneck in
@@ -189,6 +209,8 @@ The following two parameters control the asynchronous 
operations:
     accumulate an ever-growing backlog of pending requests, but that it will 
trigger backpressure once the capacity
     is exhausted.
 
+  - **AsyncRetryStrategy**: The asyncRetryStrategy defines what conditions 
will trigger a delayed retry and the delay strategy,
+    e.g., fixed-delay, exponential-backoff-delay, custom implementation, etc.
 
 ### Timeout Handling
 
@@ -243,6 +265,18 @@ The asynchronous I/O operator offers full exactly-once 
fault tolerance guarantee
 asynchronous requests in checkpoints and restores/re-triggers the requests 
when recovering from a failure.
 
 
+### Retry Support
+
+The retry support introduces a built-in mechanism for async operator which 
being transparently to the user's AsyncFunction.
+
+  - **AsyncRetryStrategy**: The `AsyncRetryStrategy` contains the definition 
of the retry condition `AsyncRetryPredicate` and the interfaces
+    to determine whether to continue retry and the retry interval based on the 
current attempt number.
+    Note that after the trigger retry condition is met, it is possible to 
abandon the retry because the current attempt number exceeds the preset limit,
+    or to be forced to terminate the retry at the end of the task (in this 
case, the system takes the last execution result or exception as the final 
state).
+
+  - **AsyncRetryPredicate**: The retry condition can be triggered based on the 
return result or the execution exception.
+
+
 ### Implementation Tips
 
 For implementations with *Futures* that have an *Executor* (or 
*ExecutionContext* in Scala) for callbacks, we suggests to use a 
`DirectExecutor`, because the
@@ -271,4 +305,26 @@ For example, the following patterns result in a blocking 
`asyncInvoke(...)` func
   
 **An AsyncFunction(AsyncWaitOperator) can be used anywhere in the job graph, 
except that it cannot be chained to a `SourceFunction`/`SourceStreamTask`.**
 
+**May Need Larger Queue Capacity If Retry Enabled**
+
+The new retry feature may result in larger queue capacity requirements, the 
maximum number can be approximately evaluated as below:
+
+```
+inputRate * retryRate * avgRetryDuration
+```
+
+For example, for a task with inputRate = 100 records/sec, where 1% of the 
elements will trigger 1 retry on average, and the average retry time is 60s,
+the additional queue capacity requirement will be:
+
+```
+100 records/sec * 1% * 60s = 60
+```
+
+That is, adding more 60 capacity to the work queue may not affect the 
throughput in unordered output mode , in case of ordered mode, the head element 
is the key point,
+and the longer it stays uncompleted, the longer the processing delay provided 
by the operator, the retry feature may increase the incomplete time of the head 
element,
+if in fact more retries are obtained with the same timeout constraint.
+
+When the queue capacity grows(common way to ease the backpressure), the risk 
of OOM increases. Though in fact, for `ListState` storage, the theoretical 
upper limit is `Integer.MAX_VALUE`,
+so the queue capacity's limit is the same, but we can't increase the queue 
capacity too big in production, increase the task parallelism maybe a more 
viable way.
+
 {{< top >}}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
index ae6a0bad00e..53f3e81dae8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
@@ -22,11 +22,15 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
 import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
 import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;
+import org.apache.flink.util.Preconditions;
 
 import java.util.concurrent.TimeUnit;
 
+import static 
org.apache.flink.streaming.util.retryable.AsyncRetryStrategies.NO_RETRY_STRATEGY;
+
 /**
  * A helper class to apply {@link AsyncFunction} to a data stream.
  *
@@ -56,6 +60,7 @@ public class AsyncDataStream {
      * @param timeout for the asynchronous operation to complete
      * @param bufSize The max number of inputs the {@link AsyncWaitOperator} 
can hold inside.
      * @param mode Processing mode for {@link AsyncWaitOperator}.
+     * @param asyncRetryStrategy AsyncRetryStrategy for {@link AsyncFunction}.
      * @param <IN> Input type.
      * @param <OUT> Output type.
      * @return A new {@link SingleOutputStreamOperator}
@@ -65,7 +70,12 @@ public class AsyncDataStream {
             AsyncFunction<IN, OUT> func,
             long timeout,
             int bufSize,
-            OutputMode mode) {
+            OutputMode mode,
+            AsyncRetryStrategy<OUT> asyncRetryStrategy) {
+        if (asyncRetryStrategy != NO_RETRY_STRATEGY) {
+            Preconditions.checkArgument(
+                    timeout > 0, "Timeout should be configured when do async 
with retry.");
+        }
 
         TypeInformation<OUT> outTypeInfo =
                 TypeExtractor.getUnaryOperatorReturnType(
@@ -81,13 +91,17 @@ public class AsyncDataStream {
         // create transform
         AsyncWaitOperatorFactory<IN, OUT> operatorFactory =
                 new AsyncWaitOperatorFactory<>(
-                        in.getExecutionEnvironment().clean(func), timeout, 
bufSize, mode);
+                        in.getExecutionEnvironment().clean(func),
+                        timeout,
+                        bufSize,
+                        mode,
+                        asyncRetryStrategy);
 
         return in.transform("async wait operator", outTypeInfo, 
operatorFactory);
     }
 
     /**
-     * Add an AsyncWaitOperator. The order of output stream records may be 
reordered.
+     * Adds an AsyncWaitOperator. The order of output stream records may be 
reordered.
      *
      * @param in Input {@link DataStream}
      * @param func {@link AsyncFunction}
@@ -104,11 +118,17 @@ public class AsyncDataStream {
             long timeout,
             TimeUnit timeUnit,
             int capacity) {
-        return addOperator(in, func, timeUnit.toMillis(timeout), capacity, 
OutputMode.UNORDERED);
+        return addOperator(
+                in,
+                func,
+                timeUnit.toMillis(timeout),
+                capacity,
+                OutputMode.UNORDERED,
+                NO_RETRY_STRATEGY);
     }
 
     /**
-     * Add an AsyncWaitOperator. The order of output stream records may be 
reordered.
+     * Adds an AsyncWaitOperator. The order of output stream records may be 
reordered.
      *
      * @param in Input {@link DataStream}
      * @param func {@link AsyncFunction}
@@ -121,11 +141,16 @@ public class AsyncDataStream {
     public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
             DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, 
TimeUnit timeUnit) {
         return addOperator(
-                in, func, timeUnit.toMillis(timeout), DEFAULT_QUEUE_CAPACITY, 
OutputMode.UNORDERED);
+                in,
+                func,
+                timeUnit.toMillis(timeout),
+                DEFAULT_QUEUE_CAPACITY,
+                OutputMode.UNORDERED,
+                NO_RETRY_STRATEGY);
     }
 
     /**
-     * Add an AsyncWaitOperator. The order to process input records is 
guaranteed to be the same as
+     * Adds an AsyncWaitOperator. The order to process input records is 
guaranteed to be the same as
      * input ones.
      *
      * @param in Input {@link DataStream}
@@ -143,11 +168,17 @@ public class AsyncDataStream {
             long timeout,
             TimeUnit timeUnit,
             int capacity) {
-        return addOperator(in, func, timeUnit.toMillis(timeout), capacity, 
OutputMode.ORDERED);
+        return addOperator(
+                in,
+                func,
+                timeUnit.toMillis(timeout),
+                capacity,
+                OutputMode.ORDERED,
+                NO_RETRY_STRATEGY);
     }
 
     /**
-     * Add an AsyncWaitOperator. The order to process input records is 
guaranteed to be the same as
+     * Adds an AsyncWaitOperator. The order to process input records is 
guaranteed to be the same as
      * input ones.
      *
      * @param in Input {@link DataStream}
@@ -161,6 +192,131 @@ public class AsyncDataStream {
     public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
             DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, 
TimeUnit timeUnit) {
         return addOperator(
-                in, func, timeUnit.toMillis(timeout), DEFAULT_QUEUE_CAPACITY, 
OutputMode.ORDERED);
+                in,
+                func,
+                timeUnit.toMillis(timeout),
+                DEFAULT_QUEUE_CAPACITY,
+                OutputMode.ORDERED,
+                NO_RETRY_STRATEGY);
+    }
+
+    /**
+     * Adds an AsyncWaitOperator with an AsyncRetryStrategy to support retry 
of AsyncFunction. The
+     * order of output stream records may be reordered.
+     *
+     * @param in Input {@link DataStream}
+     * @param func {@link AsyncFunction}
+     * @param timeout from first invoke to final completion of asynchronous 
operation, may include
+     *     multiple retries, and will be reset in case of restart
+     * @param timeUnit of the given timeout
+     * @param asyncRetryStrategy The strategy of reattempt async i/o operation 
that can be triggered
+     * @param <IN> Type of input record
+     * @param <OUT> Type of output record
+     * @return A new {@link SingleOutputStreamOperator}.
+     */
+    public static <IN, OUT> SingleOutputStreamOperator<OUT> 
unorderedWaitWithRetry(
+            DataStream<IN> in,
+            AsyncFunction<IN, OUT> func,
+            long timeout,
+            TimeUnit timeUnit,
+            AsyncRetryStrategy<OUT> asyncRetryStrategy) {
+        return addOperator(
+                in,
+                func,
+                timeUnit.toMillis(timeout),
+                DEFAULT_QUEUE_CAPACITY,
+                OutputMode.UNORDERED,
+                asyncRetryStrategy);
+    }
+
+    /**
+     * Adds an AsyncWaitOperator with an AsyncRetryStrategy to support retry 
of AsyncFunction. The
+     * order of output stream records may be reordered.
+     *
+     * @param in Input {@link DataStream}
+     * @param func {@link AsyncFunction}
+     * @param timeout from first invoke to final completion of asynchronous 
operation, may include
+     *     multiple retries, and will be reset in case of restart
+     * @param timeUnit of the given timeout
+     * @param capacity The max number of async i/o operation that can be 
triggered
+     * @param asyncRetryStrategy The strategy of reattempt async i/o operation 
that can be triggered
+     * @param <IN> Type of input record
+     * @param <OUT> Type of output record
+     * @return A new {@link SingleOutputStreamOperator}.
+     */
+    public static <IN, OUT> SingleOutputStreamOperator<OUT> 
unorderedWaitWithRetry(
+            DataStream<IN> in,
+            AsyncFunction<IN, OUT> func,
+            long timeout,
+            TimeUnit timeUnit,
+            int capacity,
+            AsyncRetryStrategy<OUT> asyncRetryStrategy) {
+        return addOperator(
+                in,
+                func,
+                timeUnit.toMillis(timeout),
+                capacity,
+                OutputMode.UNORDERED,
+                asyncRetryStrategy);
+    }
+
+    /**
+     * Adds an AsyncWaitOperator with an AsyncRetryStrategy to support retry 
of AsyncFunction. The
+     * order to process input records is guaranteed to be the same as * input 
ones.
+     *
+     * @param in Input {@link DataStream}
+     * @param func {@link AsyncFunction}
+     * @param timeout from first invoke to final completion of asynchronous 
operation, may include
+     *     multiple retries, and will be reset in case of restart
+     * @param timeUnit of the given timeout
+     * @param asyncRetryStrategy The strategy of reattempt async i/o operation 
that can be triggered
+     * @param <IN> Type of input record
+     * @param <OUT> Type of output record
+     * @return A new {@link SingleOutputStreamOperator}.
+     */
+    public static <IN, OUT> SingleOutputStreamOperator<OUT> 
orderedWaitWithRetry(
+            DataStream<IN> in,
+            AsyncFunction<IN, OUT> func,
+            long timeout,
+            TimeUnit timeUnit,
+            AsyncRetryStrategy<OUT> asyncRetryStrategy) {
+        return addOperator(
+                in,
+                func,
+                timeUnit.toMillis(timeout),
+                DEFAULT_QUEUE_CAPACITY,
+                OutputMode.ORDERED,
+                asyncRetryStrategy);
+    }
+
+    /**
+     * Adds an AsyncWaitOperator with an AsyncRetryStrategy to support retry 
of AsyncFunction. The
+     * order to process input records is guaranteed to be the same as * input 
ones.
+     *
+     * @param in Input {@link DataStream}
+     * @param func {@link AsyncFunction}
+     * @param timeout from first invoke to final completion of asynchronous 
operation, may include
+     *     multiple retries, and will be reset in case of restart
+     * @param timeUnit of the given timeout
+     * @param capacity The max number of async i/o operation that can be 
triggered
+     * @param asyncRetryStrategy The strategy of reattempt async i/o operation 
that can be triggered
+     * @param <IN> Type of input record
+     * @param <OUT> Type of output record
+     * @return A new {@link SingleOutputStreamOperator}.
+     */
+    public static <IN, OUT> SingleOutputStreamOperator<OUT> 
orderedWaitWithRetry(
+            DataStream<IN> in,
+            AsyncFunction<IN, OUT> func,
+            long timeout,
+            TimeUnit timeUnit,
+            int capacity,
+            AsyncRetryStrategy<OUT> asyncRetryStrategy) {
+        return addOperator(
+                in,
+                func,
+                timeUnit.toMillis(timeout),
+                capacity,
+                OutputMode.ORDERED,
+                asyncRetryStrategy);
     }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncRetryPredicate.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncRetryPredicate.java
new file mode 100644
index 00000000000..1873b43f4ba
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncRetryPredicate.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.async;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+/** Interface encapsulates an asynchronous retry predicate. */
+@PublicEvolving
+public interface AsyncRetryPredicate<OUT> {
+
+    /**
+     * An Optional Java {@Predicate} that defines a condition on 
asyncFunction's future result which
+     * will trigger a later reattempt operation, will be called before user's 
ResultFuture#complete.
+     *
+     * @return predicate on result of {@link Collection}
+     */
+    Optional<Predicate<Collection<OUT>>> resultPredicate();
+
+    /**
+     * An Optional Java {@Predicate} that defines a condition on 
asyncFunction's exception which
+     * will trigger a later reattempt operation, will be called before user's
+     * ResultFuture#completeExceptionally.
+     *
+     * @return predicate on {@link Throwable} exception
+     */
+    Optional<Predicate<Throwable>> exceptionPredicate();
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncRetryStrategy.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncRetryStrategy.java
new file mode 100644
index 00000000000..95efd51d224
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncRetryStrategy.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.async;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+
+/** Interface encapsulates an asynchronous retry strategy. */
+@PublicEvolving
+public interface AsyncRetryStrategy<OUT> extends Serializable {
+
+    /** @return whether the next attempt can happen */
+    boolean canRetry(int currentAttempts);
+
+    /** @return the delay time of next attempt */
+    long getBackoffTimeMillis(int currentAttempts);
+
+    /** @return the defined retry predicate {@link AsyncRetryPredicate} */
+    AsyncRetryPredicate<OUT> getRetryPredicate();
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index eeaf3418b87..ba3f1c3ad87 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.datastream.AsyncDataStream;
 import org.apache.flink.streaming.api.datastream.AsyncDataStream.OutputMode;
 import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
 import org.apache.flink.streaming.api.functions.async.ResultFuture;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
@@ -51,10 +52,15 @@ import javax.annotation.Nonnull;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Predicate;
+
+import static 
org.apache.flink.streaming.util.retryable.AsyncRetryStrategies.NO_RETRY_STRATEGY;
 
 /**
  * The {@link AsyncWaitOperator} allows to asynchronously process incoming 
stream records. For that
@@ -68,6 +74,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
  * {@link StreamElement} in it's operator state. Upon recovery the recorded 
set of stream elements
  * is replayed.
  *
+ * <p>The operator also support retry on predefined condition and strategy 
{@link
+ * AsyncRetryStrategy}. If given strategy is NO_RETRY_STRATEGY or a custom 
implemented {@link
+ * org.apache.flink.streaming.api.functions.async.AsyncRetryPredicate} which 
defines empty retry
+ * predicate, then no retry will happen, everything keeps the same as the 
{@link AsyncWaitOperator}
+ * without retry support.
+ *
  * <p>In case of chaining of this operator, it has to be made sure that the 
operators in the chain
  * are opened tail to head. The reason for this is that an opened {@link 
AsyncWaitOperator} starts
  * already emitting recovered {@link StreamElement} to downstream operators.
@@ -92,6 +104,12 @@ public class AsyncWaitOperator<IN, OUT>
     /** Timeout for the async collectors. */
     private final long timeout;
 
+    /** AsyncRetryStrategy for the async function. */
+    private final AsyncRetryStrategy<OUT> asyncRetryStrategy;
+
+    /** If the retry strategy is not no_retry. */
+    private final boolean retryEnabled;
+
     /** {@link TypeSerializer} for inputs while making snapshots. */
     private transient StreamElementSerializer<IN> inStreamElementSerializer;
 
@@ -101,6 +119,9 @@ public class AsyncWaitOperator<IN, OUT>
     /** Queue, into which to store the currently in-flight stream elements. */
     private transient StreamElementQueue<OUT> queue;
 
+    /** In-flight delay retry handlers, will be removed when a retry has been 
triggered. */
+    private transient Set<RetryableResultHandlerDelegator> 
inFlightDelayRetryHandlers;
+
     /** Mailbox executor used to yield while waiting for buffers to empty. */
     private final transient MailboxExecutor mailboxExecutor;
 
@@ -109,11 +130,19 @@ public class AsyncWaitOperator<IN, OUT>
     /** Whether object reuse has been enabled or disabled. */
     private transient boolean isObjectReuseEnabled;
 
+    private transient Predicate<Collection<OUT>> retryResultPredicate;
+
+    private transient Predicate<Throwable> retryExceptionPredicate;
+
+    /** Whether retry is disabled due to task finish, initially set to false. 
*/
+    private transient AtomicBoolean retryDisabledOnFinish;
+
     public AsyncWaitOperator(
             @Nonnull AsyncFunction<IN, OUT> asyncFunction,
             long timeout,
             int capacity,
             @Nonnull AsyncDataStream.OutputMode outputMode,
+            @Nonnull AsyncRetryStrategy<OUT> asyncRetryStrategy,
             @Nonnull ProcessingTimeService processingTimeService,
             @Nonnull MailboxExecutor mailboxExecutor) {
         super(asyncFunction);
@@ -128,6 +157,17 @@ public class AsyncWaitOperator<IN, OUT>
 
         this.timeout = timeout;
 
+        this.asyncRetryStrategy = asyncRetryStrategy;
+
+        // enables retry only when the resultPredicate or exceptionPredicate 
is present.
+        this.retryEnabled =
+                asyncRetryStrategy != NO_RETRY_STRATEGY
+                        && 
(asyncRetryStrategy.getRetryPredicate().resultPredicate().isPresent()
+                                || asyncRetryStrategy
+                                        .getRetryPredicate()
+                                        .exceptionPredicate()
+                                        .isPresent());
+
         this.processingTimeService = 
Preconditions.checkNotNull(processingTimeService);
 
         this.mailboxExecutor = mailboxExecutor;
@@ -143,7 +183,6 @@ public class AsyncWaitOperator<IN, OUT>
         this.inStreamElementSerializer =
                 new StreamElementSerializer<>(
                         
getOperatorConfig().<IN>getTypeSerializerIn1(getUserCodeClassloader()));
-
         switch (outputMode) {
             case ORDERED:
                 queue = new OrderedStreamElementQueue<>(capacity);
@@ -154,6 +193,18 @@ public class AsyncWaitOperator<IN, OUT>
             default:
                 throw new IllegalStateException("Unknown async mode: " + 
outputMode + '.');
         }
+        if (retryEnabled) {
+            this.retryResultPredicate =
+                    asyncRetryStrategy
+                            .getRetryPredicate()
+                            .resultPredicate()
+                            .orElse(ignore -> false);
+            this.retryExceptionPredicate =
+                    asyncRetryStrategy
+                            .getRetryPredicate()
+                            .exceptionPredicate()
+                            .orElse(ignore -> false);
+        }
 
         this.timestampedCollector = new TimestampedCollector<>(super.output);
     }
@@ -163,6 +214,10 @@ public class AsyncWaitOperator<IN, OUT>
         super.open();
 
         this.isObjectReuseEnabled = 
getExecutionConfig().isObjectReuseEnabled();
+        if (retryEnabled) {
+            this.inFlightDelayRetryHandlers = new HashSet<>();
+            this.retryDisabledOnFinish = new AtomicBoolean(false);
+        }
 
         if (recoveredStreamElements != null) {
             for (StreamElement element : recoveredStreamElements.get()) {
@@ -197,14 +252,26 @@ public class AsyncWaitOperator<IN, OUT>
         // add element first to the queue
         final ResultFuture<OUT> entry = addToWorkQueue(element);
 
-        final ResultHandler resultHandler = new ResultHandler(element, entry);
+        if (retryEnabled) {
+            final RetryableResultHandlerDelegator resultHandler =
+                    new RetryableResultHandlerDelegator(element, entry, 
getProcessingTimeService());
 
-        // register a timeout for the entry if timeout is configured
-        if (timeout > 0L) {
-            resultHandler.registerTimeout(getProcessingTimeService(), timeout);
-        }
+            // register a timeout for the entry
+            assert timeout > 0L;
+            resultHandler.registerTimeout(timeout);
 
-        userFunction.asyncInvoke(element.getValue(), resultHandler);
+            userFunction.asyncInvoke(element.getValue(), resultHandler);
+
+        } else {
+            final ResultHandler resultHandler = new ResultHandler(element, 
entry);
+
+            // register a timeout for the entry if timeout is configured
+            if (timeout > 0L) {
+                resultHandler.registerTimeout(getProcessingTimeService(), 
timeout);
+            }
+
+            userFunction.asyncInvoke(element.getValue(), resultHandler);
+        }
     }
 
     @Override
@@ -252,6 +319,9 @@ public class AsyncWaitOperator<IN, OUT>
 
     @Override
     public void endInput() throws Exception {
+        // we should finish all in fight delayed retry immediately.
+        finishInFlightDelayedRetry();
+
         // we should wait here for the data in flight to be finished. the 
reason is that the
         // timer not in running will be forbidden to fire after this, so that 
when the async
         // operation is stuck, it results in deadlock due to what the timeout 
timer is not fired
@@ -282,6 +352,23 @@ public class AsyncWaitOperator<IN, OUT>
         return queueEntry.get();
     }
 
+    private void finishInFlightDelayedRetry() throws Exception {
+        if (retryEnabled) {
+            // disable delayed retry, after which all data will give up retry 
and complete normally.
+            this.retryDisabledOnFinish.set(true);
+            if (inFlightDelayRetryHandlers.size() > 0) {
+                for (RetryableResultHandlerDelegator delegator : 
inFlightDelayRetryHandlers) {
+                    assert delegator.delayedRetryTimer != null;
+                    // cancel retry timer, cancel failure means retry action 
already being executed
+                    if (delegator.delayedRetryTimer.cancel(true)) {
+                        tryOnce(delegator);
+                    }
+                }
+                inFlightDelayRetryHandlers.clear();
+            }
+        }
+    }
+
     private void waitInFlightInputsFinished() throws InterruptedException {
 
         while (!queue.isEmpty()) {
@@ -318,6 +405,128 @@ public class AsyncWaitOperator<IN, OUT>
         }
     }
 
+    /** Increments number of attempts and fire the attempt. */
+    private void tryOnce(RetryableResultHandlerDelegator 
resultHandlerDelegator) throws Exception {
+        // increment current attempt number
+        resultHandlerDelegator.currentAttempts++;
+
+        // fire a new attempt
+        userFunction.asyncInvoke(
+                resultHandlerDelegator.resultHandler.inputRecord.getValue(),
+                resultHandlerDelegator);
+    }
+
+    /** A delegator holds the real {@link ResultHandler} to handle retries. */
+    private class RetryableResultHandlerDelegator implements ResultFuture<OUT> 
{
+
+        private final ResultHandler resultHandler;
+        private final ProcessingTimeService processingTimeService;
+
+        private ScheduledFuture<?> delayedRetryTimer;
+
+        /** start from 1, when this entry created, the first attempt will 
happen. */
+        private int currentAttempts = 1;
+
+        /**
+         * A guard similar to ResultHandler#complete to prevent repeated 
complete calls from
+         * ill-written AsyncFunction. This flag indicates a retry is 
in-flight, new retry will be
+         * rejected if it is ture, and it will be reset to false after the 
retry fired.
+         */
+        private final AtomicBoolean retryAwaiting = new AtomicBoolean(false);
+
+        public RetryableResultHandlerDelegator(
+                StreamRecord<IN> inputRecord,
+                ResultFuture<OUT> resultFuture,
+                ProcessingTimeService processingTimeService) {
+            this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+            this.processingTimeService = processingTimeService;
+        }
+
+        public void registerTimeout(long timeout) {
+            resultHandler.registerTimeout(processingTimeService, timeout);
+        }
+
+        @Override
+        public void complete(Collection<OUT> results) {
+            Preconditions.checkNotNull(
+                    results, "Results must not be null, use empty collection 
to emit nothing");
+            if (!retryDisabledOnFinish.get() && 
resultHandler.inputRecord.isRecord()) {
+                // ignore repeated call(s)
+                if (!retryAwaiting.compareAndSet(false, true)) {
+                    return;
+                }
+
+                processRetryInMailBox(results, null);
+            } else {
+                resultHandler.complete(results);
+            }
+        }
+
+        @Override
+        public void completeExceptionally(Throwable error) {
+            if (!retryDisabledOnFinish.get() && 
resultHandler.inputRecord.isRecord()) {
+                // ignore repeated call(s)
+                if (!retryAwaiting.compareAndSet(false, true)) {
+                    return;
+                }
+
+                processRetryInMailBox(null, error);
+            } else {
+                resultHandler.completeExceptionally(error);
+            }
+        }
+
+        private void processRetryInMailBox(Collection<OUT> results, Throwable 
error) {
+            mailboxExecutor.submit(() -> processRetry(results, error), 
"delayed retry or complete");
+        }
+
+        private void processRetry(Collection<OUT> results, Throwable error) {
+            boolean satisfy =
+                    (null != results && retryResultPredicate.test(results))
+                            || (null != error && 
retryExceptionPredicate.test(error));
+
+            if (satisfy
+                    && asyncRetryStrategy.canRetry(currentAttempts)
+                    && !retryDisabledOnFinish.get()) {
+                long nextBackoffTimeMillis =
+                        
asyncRetryStrategy.getBackoffTimeMillis(currentAttempts);
+                final long delayedRetry =
+                        nextBackoffTimeMillis
+                                + 
getProcessingTimeService().getCurrentProcessingTime();
+
+                // timer thread will finally dispatch the task to mailbox 
executor,
+                // and it can only be submitted once for one attempt.
+                delayedRetryTimer =
+                        processingTimeService.registerTimer(delayedRetry, 
timestamp -> doRetry());
+
+                // add to incomplete retry handlers only for first time
+                if (currentAttempts == 1) {
+                    inFlightDelayRetryHandlers.add(this);
+                }
+            } else {
+                // remove handle that has been tried from incomplete retry 
handlers, and ignore the
+                // retryAwaiting flag due to no more retry will happen.
+                if (currentAttempts > 1) {
+                    inFlightDelayRetryHandlers.remove(this);
+                }
+                // retry unsatisfied, complete it
+                if (null != results) {
+                    resultHandler.complete(results);
+                } else {
+                    resultHandler.completeExceptionally(error);
+                }
+            }
+        }
+
+        private void doRetry() throws Exception {
+            // fire the retry
+            tryOnce(this);
+
+            // reset for next possible retry
+            retryAwaiting.set(false);
+        }
+    }
+
     /** A handler for the results of a specific input record. */
     private class ResultHandler implements ResultFuture<OUT> {
         /** Optional timeout timer used to signal the timeout to the 
AsyncFunction. */
@@ -343,8 +552,6 @@ public class AsyncWaitOperator<IN, OUT>
 
         @Override
         public void complete(Collection<OUT> results) {
-            Preconditions.checkNotNull(
-                    results, "Results must not be null, use empty collection 
to emit nothing");
 
             // already completed (exceptionally or with previous complete call 
from ill-written
             // AsyncFunction), so
@@ -402,7 +609,7 @@ public class AsyncWaitOperator<IN, OUT>
             processInMailbox(Collections.emptyList());
         }
 
-        public void registerTimeout(ProcessingTimeService 
processingTimeService, long timeout) {
+        private void registerTimeout(ProcessingTimeService 
processingTimeService, long timeout) {
             final long timeoutTimestamp =
                     timeout + processingTimeService.getCurrentProcessingTime();
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorFactory.java
index 79c5e54dd4f..e5b2df779bc 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorFactory.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorFactory.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.operators.async;
 
 import org.apache.flink.streaming.api.datastream.AsyncDataStream;
 import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
@@ -26,6 +27,8 @@ import 
org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
 import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
 
+import static 
org.apache.flink.streaming.util.retryable.AsyncRetryStrategies.NO_RETRY_STRATEGY;
+
 /**
  * The factory of {@link AsyncWaitOperator}.
  *
@@ -38,17 +41,28 @@ public class AsyncWaitOperatorFactory<IN, OUT> extends 
AbstractStreamOperatorFac
     private final long timeout;
     private final int capacity;
     private final AsyncDataStream.OutputMode outputMode;
+    private final AsyncRetryStrategy<OUT> asyncRetryStrategy;
 
     public AsyncWaitOperatorFactory(
             AsyncFunction<IN, OUT> asyncFunction,
             long timeout,
             int capacity,
             AsyncDataStream.OutputMode outputMode) {
+        this(asyncFunction, timeout, capacity, outputMode, NO_RETRY_STRATEGY);
+    }
+
+    public AsyncWaitOperatorFactory(
+            AsyncFunction<IN, OUT> asyncFunction,
+            long timeout,
+            int capacity,
+            AsyncDataStream.OutputMode outputMode,
+            AsyncRetryStrategy<OUT> asyncRetryStrategy) {
         this.asyncFunction = asyncFunction;
         this.timeout = timeout;
         this.capacity = capacity;
         this.outputMode = outputMode;
         this.chainingStrategy = ChainingStrategy.ALWAYS;
+        this.asyncRetryStrategy = asyncRetryStrategy;
     }
 
     @Override
@@ -60,6 +74,7 @@ public class AsyncWaitOperatorFactory<IN, OUT> extends 
AbstractStreamOperatorFac
                         timeout,
                         capacity,
                         outputMode,
+                        asyncRetryStrategy,
                         processingTimeService,
                         getMailboxExecutor());
         asyncWaitOperator.setup(
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/AsyncRetryStrategies.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/AsyncRetryStrategies.java
new file mode 100644
index 00000000000..519945d5559
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/AsyncRetryStrategies.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.retryable;
+
+import org.apache.flink.streaming.api.functions.async.AsyncRetryPredicate;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+/** Utility class to create concrete {@link AsyncRetryStrategy}. */
+public class AsyncRetryStrategies {
+    public static final NoRetryStrategy NO_RETRY_STRATEGY = new 
NoRetryStrategy();
+
+    /** NoRetryStrategy. */
+    private static class NoRetryStrategy implements AsyncRetryStrategy {
+        private static final long serialVersionUID = 1L;
+
+        private NoRetryStrategy() {}
+
+        @Override
+        public boolean canRetry(int currentAttempts) {
+            return false;
+        }
+
+        @Override
+        public long getBackoffTimeMillis(int currentAttempts) {
+            return -1;
+        }
+
+        @Override
+        public AsyncRetryPredicate getRetryPredicate() {
+            return new RetryPredicate(null, null);
+        }
+    }
+
+    private static class RetryPredicate<OUT> implements 
AsyncRetryPredicate<OUT> {
+        final Predicate<Collection<OUT>> resultPredicate;
+        final Predicate<Throwable> exceptionPredicate;
+
+        public RetryPredicate(
+                Predicate<Collection<OUT>> resultPredicate,
+                Predicate<Throwable> exceptionPredicate) {
+            this.resultPredicate = resultPredicate;
+            this.exceptionPredicate = exceptionPredicate;
+        }
+
+        @Override
+        public Optional<Predicate<Collection<OUT>>> resultPredicate() {
+            return Optional.ofNullable(resultPredicate);
+        }
+
+        @Override
+        public Optional<Predicate<Throwable>> exceptionPredicate() {
+            return Optional.ofNullable(exceptionPredicate);
+        }
+    }
+
+    /** FixedDelayRetryStrategy. */
+    public static class FixedDelayRetryStrategy<OUT> implements 
AsyncRetryStrategy<OUT> {
+        private static final long serialVersionUID = 1L;
+        private final int maxAttempts;
+        private final long backoffTimeMillis;
+        private final Predicate<Collection<OUT>> resultPredicate;
+        private final Predicate<Throwable> exceptionPredicate;
+
+        private FixedDelayRetryStrategy(
+                int maxAttempts,
+                long backoffTimeMillis,
+                Predicate<Collection<OUT>> resultPredicate,
+                Predicate<Throwable> exceptionPredicate) {
+            this.maxAttempts = maxAttempts;
+            this.backoffTimeMillis = backoffTimeMillis;
+            this.resultPredicate = resultPredicate;
+            this.exceptionPredicate = exceptionPredicate;
+        }
+
+        @Override
+        public boolean canRetry(int currentAttempts) {
+            return currentAttempts <= maxAttempts;
+        }
+
+        @Override
+        public AsyncRetryPredicate<OUT> getRetryPredicate() {
+            return new RetryPredicate(resultPredicate, exceptionPredicate);
+        }
+
+        @Override
+        public long getBackoffTimeMillis(int currentAttempts) {
+            return backoffTimeMillis;
+        }
+    }
+
+    /** FixedDelayRetryStrategyBuilder for building a FixedDelayRetryStrategy. 
*/
+    public static class FixedDelayRetryStrategyBuilder<OUT> {
+        private int maxAttempts;
+        private long backoffTimeMillis;
+        private Predicate<Collection<OUT>> resultPredicate;
+        private Predicate<Throwable> exceptionPredicate;
+
+        public FixedDelayRetryStrategyBuilder(int maxAttempts, long 
backoffTimeMillis) {
+            Preconditions.checkArgument(
+                    maxAttempts > 0, "maxAttempts should be greater than 
zero.");
+            Preconditions.checkArgument(
+                    backoffTimeMillis > 0, "backoffTimeMillis should be 
greater than zero.");
+            this.maxAttempts = maxAttempts;
+            this.backoffTimeMillis = backoffTimeMillis;
+        }
+
+        public FixedDelayRetryStrategyBuilder<OUT> ifResult(
+                @Nonnull Predicate<Collection<OUT>> resultRetryPredicate) {
+            this.resultPredicate = resultRetryPredicate;
+            return this;
+        }
+
+        public FixedDelayRetryStrategyBuilder<OUT> ifException(
+                @Nonnull Predicate<Throwable> exceptionRetryPredicate) {
+            this.exceptionPredicate = exceptionRetryPredicate;
+            return this;
+        }
+
+        public FixedDelayRetryStrategy<OUT> build() {
+            return new FixedDelayRetryStrategy<OUT>(
+                    maxAttempts, backoffTimeMillis, resultPredicate, 
exceptionPredicate);
+        }
+    }
+
+    /** ExponentialBackoffDelayRetryStrategy. */
+    public static class ExponentialBackoffDelayRetryStrategy<OUT>
+            implements AsyncRetryStrategy<OUT> {
+        private static final long serialVersionUID = 1L;
+        private final int maxAttempts;
+        private final long maxRetryDelay;
+        private final double multiplier;
+        private final Predicate<Collection<OUT>> resultPredicate;
+        private final Predicate<Throwable> exceptionPredicate;
+
+        private long lastRetryDelay;
+
+        public ExponentialBackoffDelayRetryStrategy(
+                int maxAttempts,
+                long initialDelay,
+                long maxRetryDelay,
+                double multiplier,
+                Predicate<Collection<OUT>> resultPredicate,
+                Predicate<Throwable> exceptionPredicate) {
+            this.maxAttempts = maxAttempts;
+            this.maxRetryDelay = maxRetryDelay;
+            this.multiplier = multiplier;
+            this.resultPredicate = resultPredicate;
+            this.exceptionPredicate = exceptionPredicate;
+            this.lastRetryDelay = initialDelay;
+        }
+
+        @Override
+        public boolean canRetry(int currentAttempts) {
+            return currentAttempts <= maxAttempts;
+        }
+
+        @Override
+        public long getBackoffTimeMillis(int currentAttempts) {
+            if (currentAttempts <= 1) {
+                // equivalent to initial delay
+                return lastRetryDelay;
+            }
+            long backoff = Math.min((long) (lastRetryDelay * multiplier), 
maxRetryDelay);
+            this.lastRetryDelay = backoff;
+            return backoff;
+        }
+
+        @Override
+        public AsyncRetryPredicate<OUT> getRetryPredicate() {
+            return new RetryPredicate<OUT>(resultPredicate, 
exceptionPredicate);
+        }
+    }
+
+    /**
+     * ExponentialBackoffDelayRetryStrategyBuilder for building a
+     * ExponentialBackoffDelayRetryStrategy.
+     */
+    public static class ExponentialBackoffDelayRetryStrategyBuilder<OUT> {
+        private final int maxAttempts;
+        private final long initialDelay;
+        private final long maxRetryDelay;
+        private final double multiplier;
+
+        private Predicate<Collection<OUT>> resultPredicate;
+        private Predicate<Throwable> exceptionPredicate;
+
+        public ExponentialBackoffDelayRetryStrategyBuilder(
+                int maxAttempts, long initialDelay, long maxRetryDelay, double 
multiplier) {
+            this.maxAttempts = maxAttempts;
+            this.initialDelay = initialDelay;
+            this.maxRetryDelay = maxRetryDelay;
+            this.multiplier = multiplier;
+        }
+
+        public ExponentialBackoffDelayRetryStrategyBuilder<OUT> ifResult(
+                @Nonnull Predicate<Collection<OUT>> resultRetryPredicate) {
+            this.resultPredicate = resultRetryPredicate;
+            return this;
+        }
+
+        public ExponentialBackoffDelayRetryStrategyBuilder<OUT> ifException(
+                @Nonnull Predicate<Throwable> exceptionRetryPredicate) {
+            this.exceptionPredicate = exceptionRetryPredicate;
+            return this;
+        }
+
+        public ExponentialBackoffDelayRetryStrategy<OUT> build() {
+            return new ExponentialBackoffDelayRetryStrategy<OUT>(
+                    maxAttempts,
+                    initialDelay,
+                    maxRetryDelay,
+                    multiplier,
+                    resultPredicate,
+                    exceptionPredicate);
+        }
+    }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/RetryPredicates.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/RetryPredicates.java
new file mode 100644
index 00000000000..531da422a90
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/RetryPredicates.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.retryable;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.function.Predicate;
+
+/** Utility class to create concrete retry predicates. */
+public class RetryPredicates {
+
+    /** A predicate matches empty result which means an empty {@link 
Collection}. */
+    public static final EmptyResultPredicate EMPTY_RESULT_PREDICATE = new 
EmptyResultPredicate();
+
+    /** A predicate matches any exception which means a non-null{@link 
Throwable}. */
+    public static final HasExceptionPredicate HAS_EXCEPTION_PREDICATE = new 
HasExceptionPredicate();
+
+    /**
+     * Creates a predicate on given exception type.
+     *
+     * @param exceptionClass
+     * @return predicate on exception type.
+     */
+    public static ExceptionTypePredicate createExceptionTypePredicate(
+            @Nonnull Class<? extends Throwable> exceptionClass) {
+        return new ExceptionTypePredicate(exceptionClass);
+    }
+
+    private static final class EmptyResultPredicate<T>
+            implements Predicate<Collection<T>>, Serializable {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public boolean test(Collection<T> ts) {
+            if (null == ts || ts.isEmpty()) {
+                return true;
+            }
+            return false;
+        }
+    }
+
+    private static final class HasExceptionPredicate implements 
Predicate<Throwable>, Serializable {
+        private static final long serialVersionUID = 1L;
+
+        private HasExceptionPredicate() {}
+
+        @Override
+        public boolean test(Throwable throwable) {
+            return null != throwable;
+        }
+    }
+
+    private static final class ExceptionTypePredicate
+            implements Predicate<Throwable>, Serializable {
+        private static final long serialVersionUID = 1L;
+        private final Class<? extends Throwable> exceptionClass;
+
+        public ExceptionTypePredicate(@Nonnull Class<? extends Throwable> 
exceptionClass) {
+            this.exceptionClass = exceptionClass;
+        }
+
+        @Override
+        public boolean test(@Nonnull Throwable throwable) {
+            return exceptionClass.isAssignableFrom(throwable.getClass());
+        }
+    }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 3d8ec3d72a9..dfa1624b30b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -40,6 +40,7 @@ import 
org.apache.flink.streaming.api.datastream.AsyncDataStream;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
 import org.apache.flink.streaming.api.functions.async.ResultFuture;
 import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
@@ -54,6 +55,8 @@ import 
org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
 import 
org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies;
+import org.apache.flink.streaming.util.retryable.RetryPredicates;
 import org.apache.flink.testutils.junit.SharedObjects;
 import org.apache.flink.testutils.junit.SharedReference;
 import org.apache.flink.util.ExceptionUtils;
@@ -101,6 +104,7 @@ import static org.junit.Assert.assertTrue;
  * <ul>
  *   <li>Process StreamRecords and Watermarks in ORDERED mode
  *   <li>Process StreamRecords and Watermarks in UNORDERED mode
+ *   <li>Process StreamRecords with retry
  *   <li>AsyncWaitOperator in operator chain
  *   <li>Snapshot state and restore state
  * </ul>
@@ -111,6 +115,16 @@ public class AsyncWaitOperatorTest extends TestLogger {
     @Rule public Timeout timeoutRule = new Timeout(100, TimeUnit.SECONDS);
     @Rule public final SharedObjects sharedObjects = SharedObjects.create();
 
+    private static AsyncRetryStrategy emptyResultFixedDelayRetryStrategy =
+            new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(2, 10L)
+                    .ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
+                    .build();
+
+    private static AsyncRetryStrategy exceptionRetryStrategy =
+            new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(2, 10L)
+                    .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
+                    .build();
+
     private abstract static class MyAbstractAsyncFunction<IN>
             extends RichAsyncFunction<IN, Integer> {
         private static final long serialVersionUID = 8522411971886428444L;
@@ -270,6 +284,60 @@ public class AsyncWaitOperatorTest extends TestLogger {
         }
     }
 
+    private static class OddInputEmptyResultAsyncFunction extends 
MyAbstractAsyncFunction<Integer> {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public void asyncInvoke(final Integer input, final 
ResultFuture<Integer> resultFuture)
+                throws Exception {
+            executorService.submit(
+                    new Runnable() {
+                        @Override
+                        public void run() {
+                            try {
+                                Thread.sleep(3);
+                            } catch (InterruptedException e) {
+                                throw new RuntimeException(e);
+                            }
+                            if (input % 2 == 1) {
+                                resultFuture.complete(Collections.EMPTY_LIST);
+                            } else {
+                                
resultFuture.complete(Collections.singletonList(input * 2));
+                            }
+                        }
+                    });
+        }
+    }
+
+    private static class IllWrittenOddInputEmptyResultAsyncFunction
+            extends MyAbstractAsyncFunction<Integer> {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public void asyncInvoke(final Integer input, final 
ResultFuture<Integer> resultFuture)
+                throws Exception {
+            executorService.submit(
+                    new Runnable() {
+                        @Override
+                        public void run() {
+                            try {
+                                Thread.sleep(3);
+                            } catch (InterruptedException e) {
+                                throw new RuntimeException(e);
+                            }
+                            if (input % 2 == 1) {
+                                // repeated calling complete
+                                for (int i = 0; i < 10; i++) {
+                                    
resultFuture.complete(Collections.EMPTY_LIST);
+                                }
+                            } else {
+                                
resultFuture.complete(Collections.singletonList(input * 2));
+                            }
+                        }
+                    });
+        }
+    }
+
     /** A {@link Comparator} to compare {@link StreamRecord} while sorting 
them. */
     private class StreamRecordComparator implements Comparator<Object> {
         @Override
@@ -859,7 +927,13 @@ public class AsyncWaitOperatorTest extends TestLogger {
      */
     @Test
     public void testOrderedWaitUserExceptionHandling() throws Exception {
-        testUserExceptionHandling(AsyncDataStream.OutputMode.ORDERED);
+        testUserExceptionHandling(
+                AsyncDataStream.OutputMode.ORDERED, 
AsyncRetryStrategies.NO_RETRY_STRATEGY);
+    }
+
+    @Test
+    public void testOrderedWaitUserExceptionHandlingWithRetry() throws 
Exception {
+        testUserExceptionHandling(AsyncDataStream.OutputMode.ORDERED, 
exceptionRetryStrategy);
     }
 
     /**
@@ -871,12 +945,25 @@ public class AsyncWaitOperatorTest extends TestLogger {
      */
     @Test
     public void testUnorderedWaitUserExceptionHandling() throws Exception {
-        testUserExceptionHandling(AsyncDataStream.OutputMode.UNORDERED);
+        testUserExceptionHandling(
+                AsyncDataStream.OutputMode.UNORDERED, 
AsyncRetryStrategies.NO_RETRY_STRATEGY);
+    }
+
+    @Test
+    public void testUnorderedWaitUserExceptionHandlingWithRetry() throws 
Exception {
+        testUserExceptionHandling(AsyncDataStream.OutputMode.UNORDERED, 
exceptionRetryStrategy);
     }
 
-    private void testUserExceptionHandling(AsyncDataStream.OutputMode 
outputMode) throws Exception {
+    private void testUserExceptionHandling(
+            AsyncDataStream.OutputMode outputMode, AsyncRetryStrategy 
asyncRetryStrategy)
+            throws Exception {
         OneInputStreamOperatorTestHarness<Integer, Integer> harness =
-                createTestHarness(new UserExceptionAsyncFunction(), TIMEOUT, 
2, outputMode);
+                createTestHarnessWithRetry(
+                        new UserExceptionAsyncFunction(),
+                        TIMEOUT,
+                        2,
+                        outputMode,
+                        asyncRetryStrategy);
 
         
harness.getEnvironment().setExpectedExternalFailureCause(Throwable.class);
         harness.open();
@@ -886,6 +973,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
         }
 
         synchronized (harness.getCheckpointLock()) {
+            harness.endInput();
             harness.close();
         }
 
@@ -912,7 +1000,14 @@ public class AsyncWaitOperatorTest extends TestLogger {
      */
     @Test
     public void testOrderedWaitTimeoutHandling() throws Exception {
-        testTimeoutExceptionHandling(AsyncDataStream.OutputMode.ORDERED);
+        testTimeoutExceptionHandling(
+                AsyncDataStream.OutputMode.ORDERED, 
AsyncRetryStrategies.NO_RETRY_STRATEGY);
+    }
+
+    @Test
+    public void testOrderedWaitTimeoutHandlingWithRetry() throws Exception {
+        testTimeoutExceptionHandling(
+                AsyncDataStream.OutputMode.ORDERED, 
emptyResultFixedDelayRetryStrategy);
     }
 
     /**
@@ -923,13 +1018,22 @@ public class AsyncWaitOperatorTest extends TestLogger {
      */
     @Test
     public void testUnorderedWaitTimeoutHandling() throws Exception {
-        testTimeoutExceptionHandling(AsyncDataStream.OutputMode.UNORDERED);
+        testTimeoutExceptionHandling(
+                AsyncDataStream.OutputMode.UNORDERED, 
AsyncRetryStrategies.NO_RETRY_STRATEGY);
     }
 
-    private void testTimeoutExceptionHandling(AsyncDataStream.OutputMode 
outputMode)
+    @Test
+    public void testUnorderedWaitTimeoutHandlingWithRetry() throws Exception {
+        testTimeoutExceptionHandling(
+                AsyncDataStream.OutputMode.UNORDERED, 
emptyResultFixedDelayRetryStrategy);
+    }
+
+    private void testTimeoutExceptionHandling(
+            AsyncDataStream.OutputMode outputMode, AsyncRetryStrategy 
asyncRetryStrategy)
             throws Exception {
         OneInputStreamOperatorTestHarness<Integer, Integer> harness =
-                createTestHarness(new NoOpAsyncFunction<>(), 10L, 2, 
outputMode);
+                createTestHarnessWithRetry(
+                        new NoOpAsyncFunction<>(), 10L, 2, outputMode, 
asyncRetryStrategy);
 
         
harness.getEnvironment().setExpectedExternalFailureCause(Throwable.class);
         harness.open();
@@ -1024,6 +1128,16 @@ public class AsyncWaitOperatorTest extends TestLogger {
 
     @Test
     public void testIgnoreAsyncOperatorRecordsOnDrain() throws Exception {
+        
testIgnoreAsyncOperatorRecordsOnDrain(AsyncRetryStrategies.NO_RETRY_STRATEGY);
+    }
+
+    @Test
+    public void testIgnoreAsyncOperatorRecordsOnDrainWithRetry() throws 
Exception {
+        
testIgnoreAsyncOperatorRecordsOnDrain(emptyResultFixedDelayRetryStrategy);
+    }
+
+    private void testIgnoreAsyncOperatorRecordsOnDrain(AsyncRetryStrategy 
asyncRetryStrategy)
+            throws Exception {
         // given: Async wait operator which are able to collect result futures.
         StreamTaskMailboxTestHarnessBuilder<Integer> builder =
                 new StreamTaskMailboxTestHarnessBuilder<>(
@@ -1036,7 +1150,8 @@ public class AsyncWaitOperatorTest extends TestLogger {
                                         new 
CollectableFuturesAsyncFunction<>(resultFutures),
                                         TIMEOUT,
                                         5,
-                                        AsyncDataStream.OutputMode.ORDERED))
+                                        AsyncDataStream.OutputMode.ORDERED,
+                                        asyncRetryStrategy))
                         .build()) {
             // when: Processing at least two elements in reverse order to keep 
completed queue not
             // empty.
@@ -1054,6 +1169,99 @@ public class AsyncWaitOperatorTest extends TestLogger {
         }
     }
 
+    /** Test the AsyncWaitOperator with ordered mode and processing time. */
+    @Test
+    public void testProcessingTimeOrderedWithRetry() throws Exception {
+        testProcessingTimeWithRetry(
+                AsyncDataStream.OutputMode.ORDERED, new 
OddInputEmptyResultAsyncFunction());
+    }
+
+    /** Test the AsyncWaitOperator with unordered mode and processing time. */
+    @Test
+    public void testProcessingTimeUnorderedWithRetry() throws Exception {
+        testProcessingTimeWithRetry(
+                AsyncDataStream.OutputMode.UNORDERED, new 
OddInputEmptyResultAsyncFunction());
+    }
+
+    /**
+     * Test the AsyncWaitOperator with an ill-written async function under 
unordered mode and
+     * processing time.
+     */
+    @Test
+    public void testProcessingTimeRepeatedCompleteUnorderedWithRetry() throws 
Exception {
+        testProcessingTimeWithRetry(
+                AsyncDataStream.OutputMode.UNORDERED,
+                new IllWrittenOddInputEmptyResultAsyncFunction());
+    }
+
+    /**
+     * Test the AsyncWaitOperator with an ill-written async function under 
ordered mode and
+     * processing time.
+     */
+    @Test
+    public void testProcessingTimeRepeatedCompleteOrderedWithRetry() throws 
Exception {
+        testProcessingTimeWithRetry(
+                AsyncDataStream.OutputMode.ORDERED,
+                new IllWrittenOddInputEmptyResultAsyncFunction());
+    }
+
+    private void testProcessingTimeWithRetry(
+            AsyncDataStream.OutputMode mode, RichAsyncFunction asyncFunction) 
throws Exception {
+
+        StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO);
+
+        try (StreamTaskMailboxTestHarness<Integer> testHarness =
+                builder.setupOutputForSingletonOperatorChain(
+                                new AsyncWaitOperatorFactory<>(
+                                        asyncFunction,
+                                        TIMEOUT,
+                                        6,
+                                        mode,
+                                        emptyResultFixedDelayRetryStrategy))
+                        .build()) {
+
+            final long initialTime = 0L;
+            final Queue<Object> expectedOutput = new ArrayDeque<>();
+
+            testHarness.processElement(new StreamRecord<>(1, initialTime + 1));
+            testHarness.processElement(new StreamRecord<>(2, initialTime + 2));
+            testHarness.processElement(new StreamRecord<>(3, initialTime + 3));
+            testHarness.processElement(new StreamRecord<>(4, initialTime + 4));
+            testHarness.processElement(new StreamRecord<>(5, initialTime + 5));
+            testHarness.processElement(new StreamRecord<>(6, initialTime + 6));
+
+            ScheduledFuture<?> testTimer =
+                    testHarness
+                            .getTimerService()
+                            .registerTimer(
+                                    
testHarness.getTimerService().getCurrentProcessingTime()
+                                            + TIMEOUT,
+                                    ts -> {});
+
+            expectedOutput.add(new StreamRecord<>(4, initialTime + 2));
+            expectedOutput.add(new StreamRecord<>(8, initialTime + 4));
+            expectedOutput.add(new StreamRecord<>(12, initialTime + 6));
+
+            // wait until all timers have been processed
+            testTimer.get();
+
+            testHarness.processAll();
+            if (mode == AsyncDataStream.OutputMode.ORDERED) {
+                TestHarnessUtil.assertOutputEquals(
+                        "ORDERED Output was not correct.", expectedOutput, 
testHarness.getOutput());
+            } else {
+                TestHarnessUtil.assertOutputEqualsSorted(
+                        "UNORDERED Output was not correct.",
+                        expectedOutput,
+                        testHarness.getOutput(),
+                        new StreamRecordComparator());
+            }
+        }
+    }
+
     private static class CollectableFuturesAsyncFunction<IN> implements 
AsyncFunction<IN, IN> {
 
         private static final long serialVersionUID = -4214078239227288637L;
@@ -1107,4 +1315,18 @@ public class AsyncWaitOperatorTest extends TestLogger {
                 new AsyncWaitOperatorFactory<>(function, timeout, capacity, 
outputMode),
                 IntSerializer.INSTANCE);
     }
+
+    private static <OUT> OneInputStreamOperatorTestHarness<Integer, OUT> 
createTestHarnessWithRetry(
+            AsyncFunction<Integer, OUT> function,
+            long timeout,
+            int capacity,
+            AsyncDataStream.OutputMode outputMode,
+            AsyncRetryStrategy<OUT> asyncRetryStrategy)
+            throws Exception {
+
+        return new OneInputStreamOperatorTestHarness<>(
+                new AsyncWaitOperatorFactory<>(
+                        function, timeout, capacity, outputMode, 
asyncRetryStrategy),
+                IntSerializer.INSTANCE);
+    }
 }
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala
index 4387ce7e37c..9906d65510e 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala
@@ -20,10 +20,14 @@ package org.apache.flink.streaming.api.scala
 import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.datastream.{AsyncDataStream => 
JavaAsyncDataStream}
-import org.apache.flink.streaming.api.functions.async.{AsyncFunction => 
JavaAsyncFunction, ResultFuture => JavaResultFuture}
+import org.apache.flink.streaming.api.functions.async.{AsyncFunction => 
JavaAsyncFunction, AsyncRetryPredicate => JavaAsyncRetryPredicate, 
AsyncRetryStrategy => JavaAsyncRetryStrategy, ResultFuture => JavaResultFuture}
 import org.apache.flink.streaming.api.scala.async._
 import org.apache.flink.util.Preconditions
 
+import java.util
+import java.util.Optional
+import java.util.function.Predicate
+
 import scala.concurrent.duration.TimeUnit
 
 /**
@@ -321,6 +325,363 @@ object AsyncDataStream {
     orderedWait(input, timeout, timeUnit, 
DEFAULT_QUEUE_CAPACITY)(asyncFunction)
   }
 
+  /**
+   * Adds an AsyncWaitOperator with an AsyncRetryStrategy to support retry of 
AsyncFunction. The
+   * order of output stream records may be reordered.
+   *
+   * @param input
+   *   Input {@link DataStream}
+   * @param asyncFunction
+   *   {@link AsyncFunction}
+   * @param timeout
+   *   from first invoke to final completion of asynchronous operation, may 
include multiple
+   *   retries, and will be reset in case of restart
+   * @param timeUnit
+   *   of the given timeout
+   * @param capacity
+   *   The max number of async i/o operation that can be triggered
+   * @param asyncRetryStrategy
+   *   The strategy of reattempt async i/o operation that can be triggered
+   * @tparam IN
+   *   Type of input record
+   * @tparam OUT
+   *   Type of output record
+   * @return
+   *   the resulting stream containing the asynchronous results
+   */
+  def unorderedWaitWithRetry[IN, OUT: TypeInformation](
+      input: DataStream[IN],
+      asyncFunction: AsyncFunction[IN, OUT],
+      timeout: Long,
+      timeUnit: TimeUnit,
+      capacity: Int,
+      asyncRetryStrategy: AsyncRetryStrategy[OUT]): DataStream[OUT] = {
+
+    Preconditions.checkArgument(timeout > 0)
+    Preconditions.checkNotNull(asyncFunction)
+
+    val javaAsyncFunction = wrapAsJavaAsyncFunction(asyncFunction)
+    val javaAsyncRetryStrategy = 
wrapAsJavaAsyncRetryStrategy(asyncRetryStrategy)
+
+    val outType: TypeInformation[OUT] = implicitly[TypeInformation[OUT]]
+
+    asScalaStream(
+      JavaAsyncDataStream
+        .unorderedWaitWithRetry[IN, OUT](
+          input.javaStream,
+          javaAsyncFunction,
+          timeout,
+          timeUnit,
+          capacity,
+          javaAsyncRetryStrategy)
+        .returns(outType))
+  }
+
+  /**
+   * Adds an AsyncWaitOperator with an AsyncRetryStrategy to support retry of 
AsyncFunction. The
+   * order of output stream records may be reordered.
+   *
+   * @param input
+   *   Input {@link DataStream}
+   * @param asyncFunction
+   *   {@link AsyncFunction}
+   * @param timeout
+   *   from first invoke to final completion of asynchronous operation, may 
include multiple
+   *   retries, and will be reset in case of restart
+   * @param timeUnit
+   *   of the given timeout
+   * @param asyncRetryStrategy
+   *   The strategy of reattempt async i/o operation that can be triggered
+   * @tparam IN
+   *   Type of input record
+   * @tparam OUT
+   *   Type of output record
+   * @return
+   *   the resulting stream containing the asynchronous results
+   */
+  def unorderedWaitWithRetry[IN, OUT: TypeInformation](
+      input: DataStream[IN],
+      asyncFunction: AsyncFunction[IN, OUT],
+      timeout: Long,
+      timeUnit: TimeUnit,
+      asyncRetryStrategy: AsyncRetryStrategy[OUT]): DataStream[OUT] = {
+
+    unorderedWaitWithRetry(
+      input,
+      asyncFunction,
+      timeout,
+      timeUnit,
+      DEFAULT_QUEUE_CAPACITY,
+      asyncRetryStrategy)
+  }
+
+  /**
+   * Apply an asynchronous function on the input data stream with an 
AsyncRetryStrategy to support
+   * retry. The output order is only maintained with respect to watermarks. 
Stream records which lie
+   * between the same two watermarks, can be re-ordered.
+   *
+   * @param input
+   *   to apply the async function on
+   * @param timeout
+   *   from first invoke to final completion of asynchronous operation, may 
include multiple
+   *   retries, and will be reset in case of restart
+   * @param timeUnit
+   *   of the timeout
+   * @param capacity
+   *   of the operator which is equivalent to the number of concurrent 
asynchronous operations
+   * @param asyncFunction
+   *   to use
+   * @tparam IN
+   *   Type of the input record
+   * @tparam OUT
+   *   Type of the output record
+   * @return
+   *   the resulting stream containing the asynchronous results
+   */
+  def unorderedWaitWithRetry[IN, OUT: TypeInformation](
+      input: DataStream[IN],
+      timeout: Long,
+      timeUnit: TimeUnit,
+      capacity: Int,
+      asyncRetryStrategy: AsyncRetryStrategy[OUT])(
+      asyncFunction: (IN, ResultFuture[OUT]) => Unit): DataStream[OUT] = {
+
+    Preconditions.checkNotNull(asyncFunction)
+    Preconditions.checkArgument(timeout > 0)
+
+    val cleanAsyncFunction = 
input.executionEnvironment.scalaClean(asyncFunction)
+
+    val func = new JavaAsyncFunction[IN, OUT] {
+      override def asyncInvoke(input: IN, resultFuture: 
JavaResultFuture[OUT]): Unit = {
+
+        cleanAsyncFunction(input, new 
JavaResultFutureWrapper[OUT](resultFuture))
+      }
+    }
+    val javaAsyncRetryStrategy = 
wrapAsJavaAsyncRetryStrategy(asyncRetryStrategy)
+
+    val outType: TypeInformation[OUT] = implicitly[TypeInformation[OUT]]
+
+    asScalaStream(
+      JavaAsyncDataStream
+        .unorderedWaitWithRetry[IN, OUT](
+          input.javaStream,
+          func,
+          timeout,
+          timeUnit,
+          capacity,
+          javaAsyncRetryStrategy)
+        .returns(outType))
+  }
+
+  /**
+   * Apply an asynchronous function on the input data stream with an 
AsyncRetryStrategy to support
+   * retry. The output order is only maintained with respect to watermarks. 
Stream records which lie
+   * between the same two watermarks, can be re-ordered.
+   *
+   * @param input
+   *   to apply the async function on
+   * @param timeout
+   *   from first invoke to final completion of asynchronous operation, may 
include multiple
+   *   retries, and will be reset in case of restart
+   * @param timeUnit
+   *   of the timeout
+   * @param asyncFunction
+   *   to use
+   * @tparam IN
+   *   Type of the input record
+   * @tparam OUT
+   *   Type of the output record
+   * @return
+   *   the resulting stream containing the asynchronous results
+   */
+  def unorderedWaitWithRetry[IN, OUT: TypeInformation](
+      input: DataStream[IN],
+      timeout: Long,
+      timeUnit: TimeUnit,
+      asyncRetryStrategy: AsyncRetryStrategy[OUT])(
+      asyncFunction: (IN, ResultFuture[OUT]) => Unit): DataStream[OUT] = {
+
+    unorderedWaitWithRetry(input, timeout, timeUnit, DEFAULT_QUEUE_CAPACITY, 
asyncRetryStrategy)(
+      asyncFunction)
+  }
+
+  /**
+   * Adds an AsyncWaitOperator with an AsyncRetryStrategy to support retry of 
AsyncFunction. The
+   * output order is the same as the input order of the elements.
+   *
+   * @param input
+   *   Input {@link DataStream}
+   * @param asyncFunction
+   *   {@link AsyncFunction}
+   * @param timeout
+   *   from first invoke to final completion of asynchronous operation, may 
include multiple
+   *   retries, and will be reset in case of restart
+   * @param timeUnit
+   *   of the given timeout
+   * @param capacity
+   *   The max number of async i/o operation that can be triggered
+   * @param asyncRetryStrategy
+   *   The strategy of reattempt async i/o operation that can be triggered
+   * @tparam IN
+   *   Type of input record
+   * @tparam OUT
+   *   Type of output record
+   * @return
+   *   the resulting stream containing the asynchronous results
+   */
+  def orderedWaitWithRetry[IN, OUT: TypeInformation](
+      input: DataStream[IN],
+      asyncFunction: AsyncFunction[IN, OUT],
+      timeout: Long,
+      timeUnit: TimeUnit,
+      capacity: Int,
+      asyncRetryStrategy: AsyncRetryStrategy[OUT]): DataStream[OUT] = {
+
+    Preconditions.checkArgument(timeout > 0)
+    Preconditions.checkNotNull(asyncFunction)
+
+    val javaAsyncFunction = wrapAsJavaAsyncFunction(asyncFunction)
+    val javaAsyncRetryStrategy = 
wrapAsJavaAsyncRetryStrategy(asyncRetryStrategy)
+
+    val outType: TypeInformation[OUT] = implicitly[TypeInformation[OUT]]
+
+    asScalaStream(
+      JavaAsyncDataStream
+        .orderedWaitWithRetry[IN, OUT](
+          input.javaStream,
+          javaAsyncFunction,
+          timeout,
+          timeUnit,
+          capacity,
+          javaAsyncRetryStrategy)
+        .returns(outType))
+  }
+
+  /**
+   * Adds an AsyncWaitOperator with an AsyncRetryStrategy to support retry of 
AsyncFunction. The
+   * output order is the same as the input order of the elements.
+   *
+   * @param input
+   *   Input {@link DataStream}
+   * @param asyncFunction
+   *   {@link AsyncFunction}
+   * @param timeout
+   *   from first invoke to final completion of asynchronous operation, may 
include multiple
+   *   retries, and will be reset in case of restart
+   * @param timeUnit
+   *   of the given timeout
+   * @param asyncRetryStrategy
+   *   The strategy of reattempt async i/o operation that can be triggered
+   * @tparam IN
+   *   Type of input record
+   * @tparam OUT
+   *   Type of output record
+   * @return
+   *   the resulting stream containing the asynchronous results
+   */
+  def orderedWaitWithRetry[IN, OUT: TypeInformation](
+      input: DataStream[IN],
+      asyncFunction: AsyncFunction[IN, OUT],
+      timeout: Long,
+      timeUnit: TimeUnit,
+      asyncRetryStrategy: AsyncRetryStrategy[OUT]): DataStream[OUT] = {
+
+    orderedWaitWithRetry(
+      input,
+      asyncFunction,
+      timeout,
+      timeUnit,
+      DEFAULT_QUEUE_CAPACITY,
+      asyncRetryStrategy)
+  }
+
+  /**
+   * Apply an asynchronous function on the input data stream with an 
AsyncRetryStrategy to support
+   * retry. The output order is the same as the input order of the elements.
+   *
+   * @param input
+   *   to apply the async function on
+   * @param timeout
+   *   from first invoke to final completion of asynchronous operation, may 
include multiple
+   *   retries, and will be reset in case of restart
+   * @param timeUnit
+   *   of the timeout
+   * @param capacity
+   *   of the operator which is equivalent to the number of concurrent 
asynchronous operations
+   * @param asyncFunction
+   *   to use
+   * @tparam IN
+   *   Type of the input record
+   * @tparam OUT
+   *   Type of the output record
+   * @return
+   *   the resulting stream containing the asynchronous results
+   */
+  def orderedWaitWithRetry[IN, OUT: TypeInformation](
+      input: DataStream[IN],
+      timeout: Long,
+      timeUnit: TimeUnit,
+      capacity: Int,
+      asyncRetryStrategy: AsyncRetryStrategy[OUT])(
+      asyncFunction: (IN, ResultFuture[OUT]) => Unit): DataStream[OUT] = {
+
+    Preconditions.checkArgument(timeout > 0)
+    Preconditions.checkNotNull(asyncFunction)
+
+    val cleanAsyncFunction = 
input.executionEnvironment.scalaClean(asyncFunction)
+
+    val func = new JavaAsyncFunction[IN, OUT] {
+      override def asyncInvoke(input: IN, resultFuture: 
JavaResultFuture[OUT]): Unit = {
+        cleanAsyncFunction(input, new 
JavaResultFutureWrapper[OUT](resultFuture))
+      }
+    }
+    val javaAsyncRetryStrategy = 
wrapAsJavaAsyncRetryStrategy(asyncRetryStrategy)
+
+    val outType: TypeInformation[OUT] = implicitly[TypeInformation[OUT]]
+
+    asScalaStream(
+      JavaAsyncDataStream
+        .orderedWaitWithRetry[IN, OUT](
+          input.javaStream,
+          func,
+          timeout,
+          timeUnit,
+          capacity,
+          javaAsyncRetryStrategy)
+        .returns(outType))
+  }
+
+  /**
+   * Apply an asynchronous function on the input data stream with an 
AsyncRetryStrategy to support
+   * retry. The output order is the same as the input order of the elements.
+   *
+   * @param input
+   *   to apply the async function on
+   * @param timeout
+   *   from first invoke to final completion of asynchronous operation, may 
include multiple
+   *   retries, and will be reset in case of restart
+   * @param timeUnit
+   *   of the timeout
+   * @param asyncFunction
+   *   to use
+   * @tparam IN
+   *   Type of the input record
+   * @tparam OUT
+   *   Type of the output record
+   * @return
+   *   the resulting stream containing the asynchronous results
+   */
+  def orderedWaitWithRetry[IN, OUT: TypeInformation](
+      input: DataStream[IN],
+      timeout: Long,
+      timeUnit: TimeUnit,
+      asyncRetryStrategy: AsyncRetryStrategy[OUT])(
+      asyncFunction: (IN, ResultFuture[OUT]) => Unit): DataStream[OUT] = {
+
+    orderedWaitWithRetry(input, timeout, timeUnit, DEFAULT_QUEUE_CAPACITY, 
asyncRetryStrategy)(
+      asyncFunction)
+  }
+
   private def wrapAsJavaAsyncFunction[IN, OUT: TypeInformation](
       asyncFunction: AsyncFunction[IN, OUT]): JavaAsyncFunction[IN, OUT] = 
asyncFunction match {
     case richAsyncFunction: RichAsyncFunction[IN, OUT] =>
@@ -336,4 +697,38 @@ object AsyncDataStream {
         }
       }
   }
+
+  private def wrapAsJavaAsyncRetryStrategy[OUT](
+      asyncRetryStrategy: AsyncRetryStrategy[OUT]): 
JavaAsyncRetryStrategy[OUT] = {
+    new JavaAsyncRetryStrategy[OUT] {
+
+      override def canRetry(currentAttempts: Int): Boolean =
+        asyncRetryStrategy.canRetry(currentAttempts)
+
+      override def getBackoffTimeMillis(currentAttempts: Int): Long =
+        asyncRetryStrategy.getBackoffTimeMillis(currentAttempts)
+
+      override def getRetryPredicate: JavaAsyncRetryPredicate[OUT] = {
+        new JavaAsyncRetryPredicate[OUT] {
+          override def resultPredicate(): 
Optional[Predicate[util.Collection[OUT]]] = {
+            asyncRetryStrategy.getRetryPredicate().resultPredicate match {
+              case Some(_) =>
+                
Optional.of(asyncRetryStrategy.getRetryPredicate().resultPredicate.get)
+              case None =>
+                Optional.empty()
+            }
+          }
+
+          override def exceptionPredicate(): Optional[Predicate[Throwable]] = {
+            asyncRetryStrategy.getRetryPredicate().exceptionPredicate match {
+              case Some(_) =>
+                
Optional.of(asyncRetryStrategy.getRetryPredicate().exceptionPredicate.get)
+              case None =>
+                Optional.empty()
+            }
+          }
+        }
+      }
+    }
+  }
 }
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncRetryPredicate.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncRetryPredicate.scala
new file mode 100644
index 00000000000..5ee3fd93b9d
--- /dev/null
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncRetryPredicate.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala.async
+
+import org.apache.flink.annotation.PublicEvolving
+
+import java.util
+import java.util.function.Predicate
+
+/** Interface encapsulates an asynchronous retry predicate. */
+@PublicEvolving
+trait AsyncRetryPredicate[OUT] {
+
+  /**
+   * An Optional Java {@Predicate } that defines a condition on 
asyncFunction's future result which
+   * will trigger a later reattempt operation, will be called before user's 
ResultFuture#complete.
+   *
+   * @return
+   *   predicate on result of {@link util.Collection}
+   */
+  def resultPredicate: Option[Predicate[util.Collection[OUT]]]
+
+  /**
+   * An Optional Java {@Predicate } that defines a condition on 
asyncFunction's exception which will
+   * trigger a later reattempt operation, will be called before user's
+   * ResultFuture#completeExceptionally.
+   *
+   * @return
+   *   predicate on {@link Throwable} exception
+   */
+  def exceptionPredicate: Option[Predicate[Throwable]]
+}
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncRetryStrategy.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncRetryStrategy.scala
new file mode 100644
index 00000000000..bcdc8faa86d
--- /dev/null
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncRetryStrategy.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala.async
+
+import org.apache.flink.annotation.PublicEvolving
+
+/** Interface encapsulates an asynchronous retry strategy. */
+@PublicEvolving
+trait AsyncRetryStrategy[OUT] extends Serializable {
+
+  /** @return whether the next attempt can happen */
+  def canRetry(currentAttempts: Int): Boolean
+
+  /** @return the delay time of next attempt */
+  def getBackoffTimeMillis(currentAttempts: Int): Long
+
+  /** @return the defined retry predicate {@link AsyncRetryPredicate} */
+  def getRetryPredicate(): AsyncRetryPredicate[OUT]
+}
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala
index c3ac343a0f8..f09e499be55 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.scala
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.scala.AsyncDataStreamITCase._
-import org.apache.flink.streaming.api.scala.async.{ResultFuture, 
RichAsyncFunction}
+import org.apache.flink.streaming.api.scala.async.{AsyncRetryPredicate, 
AsyncRetryStrategy, ResultFuture, RichAsyncFunction}
 import org.apache.flink.test.util.AbstractTestBase
 
 import org.junit.Assert._
@@ -31,6 +31,7 @@ import org.junit.runners.Parameterized.Parameters
 
 import java.{util => ju}
 import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.function.Predicate
 
 import scala.collection.mutable
 import scala.concurrent.{ExecutionContext, Future}
@@ -164,6 +165,109 @@ class AsyncDataStreamITCase(ordered: Boolean) extends 
AbstractTestBase {
     executeAndValidate(ordered, env, asyncMapped, mutable.ArrayBuffer[Int](2, 
4))
   }
 
+  @Test
+  def testAsyncWaitWithRetry(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+
+    val source = env.fromElements(1, 2, 3, 4, 5, 6)
+
+    val asyncFunction = new OddInputReturnEmptyAsyncFunc
+
+    val asyncRetryStrategy = createFixedRetryStrategy[Int](3, 10)
+
+    val timeout = 10000L
+    val asyncMapped = if (ordered) {
+      AsyncDataStream.orderedWaitWithRetry(
+        source,
+        asyncFunction,
+        timeout,
+        TimeUnit.MILLISECONDS,
+        asyncRetryStrategy)
+    } else {
+      AsyncDataStream.unorderedWaitWithRetry(
+        source,
+        asyncFunction,
+        timeout,
+        TimeUnit.MILLISECONDS,
+        asyncRetryStrategy)
+    }
+
+    executeAndValidate(ordered, env, asyncMapped, mutable.ArrayBuffer[Int](2, 
4, 6))
+  }
+
+  private def createFixedRetryStrategy[OUT](
+      maxAttempts: Int,
+      fixedDelayMs: Long): AsyncRetryStrategy[OUT] = {
+    new AsyncRetryStrategy[OUT] {
+
+      override def canRetry(currentAttempts: Int): Boolean = {
+        currentAttempts <= maxAttempts
+      }
+
+      override def getBackoffTimeMillis(currentAttempts: Int): Long = 
fixedDelayMs
+
+      override def getRetryPredicate(): AsyncRetryPredicate[OUT] = {
+        new AsyncRetryPredicate[OUT] {
+          override def resultPredicate: Option[Predicate[ju.Collection[OUT]]] 
= {
+            Option(
+              new Predicate[ju.Collection[OUT]] {
+                override def test(t: ju.Collection[OUT]): Boolean = t.isEmpty
+              }
+            )
+          }
+
+          override def exceptionPredicate: Option[Predicate[Throwable]] = 
Option.empty
+        }
+      }
+    }
+  }
+
+  @Test
+  def testAsyncWaitWithRetryUsingAnonymousFunction(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+
+    val source = env.fromElements(1, 2, 3, 4, 5, 6)
+
+    val asyncFunction: (Int, ResultFuture[Int]) => Unit =
+      (input, collector: ResultFuture[Int]) => {
+        Thread.sleep(3)
+        if (input % 2 == 1) {
+          Future {
+            collector.complete(List[Int]())
+          }(ExecutionContext.global)
+        } else {
+          Future {
+            collector.complete(List[Int](input))
+          }(ExecutionContext.global)
+        }
+      }
+
+    val timeout = 10000L
+    val asyncRetryStrategy = createFixedRetryStrategy[Int](3, 10)
+
+    val asyncMapped = if (ordered) {
+      AsyncDataStream.orderedWaitWithRetry(
+        source,
+        timeout,
+        TimeUnit.MILLISECONDS,
+        asyncRetryStrategy) {
+        asyncFunction
+      }
+    } else {
+      AsyncDataStream.unorderedWaitWithRetry(
+        source,
+        timeout,
+        TimeUnit.MILLISECONDS,
+        asyncRetryStrategy) {
+        asyncFunction
+      }
+    }
+
+    executeAndValidate(ordered, env, asyncMapped, mutable.ArrayBuffer[Int](2, 
4, 6))
+  }
+
 }
 
 class AsyncFunctionWithTimeoutExpired extends RichAsyncFunction[Int, Int] {
@@ -226,3 +330,19 @@ class MyRichAsyncFunction extends RichAsyncFunction[Int, 
Int] {
     resultFuture.complete(Seq(input * 3))
   }
 }
+
+class OddInputReturnEmptyAsyncFunc extends RichAsyncFunction[Int, Int] {
+
+  override def asyncInvoke(input: Int, resultFuture: ResultFuture[Int]): Unit 
= {
+    Thread.sleep(3)
+    if (input % 2 == 1) {
+      Future {
+        resultFuture.complete(List[Int]())
+      }(ExecutionContext.global)
+    } else {
+      Future {
+        resultFuture.complete(List[Int](input))
+      }(ExecutionContext.global)
+    }
+  }
+}

Reply via email to