This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new d426489c9e5 [FLINK-29498][datastream] Add Scala Async Retry Strategies and ResultPredicates Helper Classes d426489c9e5 is described below commit d426489c9e5c634e2eec8fde6c71356700b7d4b2 Author: Eric Xiao <eric.x...@shopify.com> AuthorDate: Sun Oct 16 18:12:27 2022 -0400 [FLINK-29498][datastream] Add Scala Async Retry Strategies and ResultPredicates Helper Classes This closes #21077. --- .../docs/dev/datastream/operators/asyncio.md | 10 +- .../docs/dev/datastream/operators/asyncio.md | 10 +- .../api/operators/async/AsyncWaitOperator.java | 2 +- .../api/scala/async/AsyncRetryStrategies.scala | 130 +++++++++++++++++++++ .../api/scala/async/RetryPredicates.scala | 38 ++++++ .../api/scala/AsyncDataStreamITCase.scala | 42 ++----- 6 files changed, 194 insertions(+), 38 deletions(-) diff --git a/docs/content.zh/docs/dev/datastream/operators/asyncio.md b/docs/content.zh/docs/dev/datastream/operators/asyncio.md index 6dbddab5824..0caa5e8ce3b 100644 --- a/docs/content.zh/docs/dev/datastream/operators/asyncio.md +++ b/docs/content.zh/docs/dev/datastream/operators/asyncio.md @@ -125,8 +125,8 @@ DataStream<Tuple2<String, String>> resultStream = // 通过工具类创建一个异步重试策略, 或用户实现自定义的策略 AsyncRetryStrategy asyncRetryStrategy = new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // maxAttempts=3, fixedDelay=100ms - .retryIfResult(RetryPredicates.EMPTY_RESULT_PREDICATE) - .retryIfException(RetryPredicates.HAS_EXCEPTION_PREDICATE) + .ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE) + .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE) .build(); // 应用异步 I/O 转换操作并启用重试 @@ -170,7 +170,11 @@ val resultStream: DataStream[(String, String)] = // 或 应用异步 I/O 转换操作并启用重试 // 创建一个异步重试策略 -val asyncRetryStrategy: AsyncRetryStrategy[OUT] = ... +val asyncRetryStrategy: AsyncRetryStrategy[String] = + new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // maxAttempts=3, fixedDelay=100ms + .ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE) + .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE) + .build(); // 应用异步 I/O 转换操作并启用重试 val resultStream: DataStream[(String, String)] = diff --git a/docs/content/docs/dev/datastream/operators/asyncio.md b/docs/content/docs/dev/datastream/operators/asyncio.md index a6d218fff55..631c83eaa1b 100644 --- a/docs/content/docs/dev/datastream/operators/asyncio.md +++ b/docs/content/docs/dev/datastream/operators/asyncio.md @@ -140,8 +140,8 @@ DataStream<Tuple2<String, String>> resultStream = // create an async retry strategy via utility class or a user defined strategy AsyncRetryStrategy asyncRetryStrategy = new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // maxAttempts=3, fixedDelay=100ms - .retryIfResult(RetryPredicates.EMPTY_RESULT_PREDICATE) - .retryIfException(RetryPredicates.HAS_EXCEPTION_PREDICATE) + .ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE) + .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE) .build(); // apply the async I/O transformation with retry @@ -185,7 +185,11 @@ val resultStream: DataStream[(String, String)] = // apply the async I/O transformation with retry // create an AsyncRetryStrategy -val asyncRetryStrategy: AsyncRetryStrategy[OUT] = ... +val asyncRetryStrategy: AsyncRetryStrategy[String] = + new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // maxAttempts=3, fixedDelay=100ms + .ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE) + .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE) + .build(); // apply the async I/O transformation with retry val resultStream: DataStream[(String, String)] = diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java index 0d88943b21e..7d2685f7828 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java @@ -430,7 +430,7 @@ public class AsyncWaitOperator<IN, OUT> /** * A guard similar to ResultHandler#complete to prevent repeated complete calls from * ill-written AsyncFunction. This flag indicates a retry is in-flight, new retry will be - * rejected if it is ture, and it will be reset to false after the retry fired. + * rejected if it is true, and it will be reset to false after the retry fired. */ private final AtomicBoolean retryAwaiting = new AtomicBoolean(false); diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncRetryStrategies.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncRetryStrategies.scala new file mode 100644 index 00000000000..2e317b574bc --- /dev/null +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/AsyncRetryStrategies.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.scala.async + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.streaming.api.functions.async +import org.apache.flink.streaming.api.functions.async.{AsyncRetryStrategy => JAsyncRetryStrategy} +import org.apache.flink.streaming.util.retryable.{AsyncRetryStrategies => JAsyncRetryStrategies} + +import java.{util => ju} +import java.util.function.Predicate + +/** Utility class to create concrete {@link AsyncRetryStrategy}. */ +object AsyncRetryStrategies { + + final private class JavaToScalaRetryStrategy[T](retryStrategy: JAsyncRetryStrategy[T]) + extends AsyncRetryStrategy[T] { + + /** @return whether the next attempt can happen */ + override def canRetry(currentAttempts: Int): Boolean = retryStrategy.canRetry(currentAttempts) + + /** @return the delay time of next attempt */ + override def getBackoffTimeMillis(currentAttempts: Int): Long = + retryStrategy.getBackoffTimeMillis(currentAttempts) + + /** @return the defined retry predicate {@link AsyncRetryPredicate} */ + override def getRetryPredicate(): AsyncRetryPredicate[T] = new AsyncRetryPredicate[T] { + val retryPredicates: async.AsyncRetryPredicate[T] = retryStrategy.getRetryPredicate + + /** + * An Optional Java {@Predicate } that defines a condition on asyncFunction's future result + * which will trigger a later reattempt operation, will be called before user's + * ResultFuture#complete. + * + * @return + * predicate on result of {@link ju.Collection} + */ + override def resultPredicate: Option[Predicate[ju.Collection[T]]] = Option( + retryPredicates.resultPredicate.orElse(null)) + + /** + * An Optional Java {@Predicate } that defines a condition on asyncFunction's exception which + * will trigger a later reattempt operation, will be called before user's + * ResultFuture#completeExceptionally. + * + * @return + * predicate on {@link Throwable} exception + */ + override def exceptionPredicate: Option[Predicate[Throwable]] = Option( + retryPredicates.exceptionPredicate.orElse(null)) + } + } + + /** + * FixedDelayRetryStrategyBuilder for building an {@link AsyncRetryStrategy} with fixed delay + * retrying behaviours. + */ + @PublicEvolving + @SerialVersionUID(1L) + class FixedDelayRetryStrategyBuilder[OUT]( + private val maxAttempts: Int, + private val backoffTimeMillis: Long + ) { + private var builder = + new JAsyncRetryStrategies.FixedDelayRetryStrategyBuilder[OUT](maxAttempts, backoffTimeMillis) + + def ifResult(resultRetryPredicate: Predicate[ju.Collection[OUT]]) + : FixedDelayRetryStrategyBuilder[OUT] = { + this.builder = this.builder.ifResult(resultRetryPredicate) + this + } + + def ifException( + exceptionRetryPredicate: Predicate[Throwable]): FixedDelayRetryStrategyBuilder[OUT] = { + this.builder = this.builder.ifException(exceptionRetryPredicate) + this + } + + def build(): AsyncRetryStrategy[OUT] = new JavaToScalaRetryStrategy[OUT](builder.build()) + } + + /** + * ExponentialBackoffDelayRetryStrategyBuilder for building an {@link AsyncRetryStrategy} with + * exponential delay retrying behaviours. + */ + @PublicEvolving + @SerialVersionUID(1L) + class ExponentialBackoffDelayRetryStrategyBuilder[OUT]( + private val maxAttempts: Int, + private val initialDelay: Long, + private val maxRetryDelay: Long, + private val multiplier: Double + ) { + private var builder = + new JAsyncRetryStrategies.ExponentialBackoffDelayRetryStrategyBuilder[OUT]( + maxAttempts, + initialDelay, + maxRetryDelay, + multiplier) + + def ifResult(resultRetryPredicate: Predicate[ju.Collection[OUT]]) + : ExponentialBackoffDelayRetryStrategyBuilder[OUT] = { + this.builder = this.builder.ifResult(resultRetryPredicate) + this + } + + def ifException(exceptionRetryPredicate: Predicate[Throwable]) + : ExponentialBackoffDelayRetryStrategyBuilder[OUT] = { + this.builder = this.builder.ifException(exceptionRetryPredicate) + this + } + + def build(): AsyncRetryStrategy[OUT] = new JavaToScalaRetryStrategy[OUT](builder.build()) + } +} diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/RetryPredicates.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/RetryPredicates.scala new file mode 100644 index 00000000000..56a37eaca12 --- /dev/null +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/RetryPredicates.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.scala.async + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.streaming.util.retryable.{RetryPredicates => JRetryPredicates} + +import java.util +import java.util.function.Predicate + +/** Utility class to create concrete retry predicates. */ +@PublicEvolving +object RetryPredicates { + + /** A predicate matches empty result which means an empty {@link Collection}. */ + def EMPTY_RESULT_PREDICATE[T]: Predicate[util.Collection[T]] = + JRetryPredicates.EMPTY_RESULT_PREDICATE.asInstanceOf[Predicate[util.Collection[T]]] + + /** A predicate matches any exception which means a non-null{@link Throwable}. */ + def HAS_EXCEPTION_PREDICATE: Predicate[Throwable] = + JRetryPredicates.HAS_EXCEPTION_PREDICATE.asInstanceOf[Predicate[Throwable]] + +} diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala index f09e499be55..edd53b660d7 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AsyncDataStreamITCase.scala @@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.scala import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.scala.AsyncDataStreamITCase._ -import org.apache.flink.streaming.api.scala.async.{AsyncRetryPredicate, AsyncRetryStrategy, ResultFuture, RichAsyncFunction} +import org.apache.flink.streaming.api.scala.async.{AsyncRetryStrategies, ResultFuture, RetryPredicates, RichAsyncFunction} import org.apache.flink.test.util.AbstractTestBase import org.junit.Assert._ @@ -31,7 +31,6 @@ import org.junit.runners.Parameterized.Parameters import java.{util => ju} import java.util.concurrent.{CountDownLatch, TimeUnit} -import java.util.function.Predicate import scala.collection.mutable import scala.concurrent.{ExecutionContext, Future} @@ -174,7 +173,11 @@ class AsyncDataStreamITCase(ordered: Boolean) extends AbstractTestBase { val asyncFunction = new OddInputReturnEmptyAsyncFunc - val asyncRetryStrategy = createFixedRetryStrategy[Int](3, 10) + val asyncRetryStrategy = + new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 10) + .ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE[Int]) + .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE) + .build() val timeout = 10000L val asyncMapped = if (ordered) { @@ -196,33 +199,6 @@ class AsyncDataStreamITCase(ordered: Boolean) extends AbstractTestBase { executeAndValidate(ordered, env, asyncMapped, mutable.ArrayBuffer[Int](2, 4, 6)) } - private def createFixedRetryStrategy[OUT]( - maxAttempts: Int, - fixedDelayMs: Long): AsyncRetryStrategy[OUT] = { - new AsyncRetryStrategy[OUT] { - - override def canRetry(currentAttempts: Int): Boolean = { - currentAttempts <= maxAttempts - } - - override def getBackoffTimeMillis(currentAttempts: Int): Long = fixedDelayMs - - override def getRetryPredicate(): AsyncRetryPredicate[OUT] = { - new AsyncRetryPredicate[OUT] { - override def resultPredicate: Option[Predicate[ju.Collection[OUT]]] = { - Option( - new Predicate[ju.Collection[OUT]] { - override def test(t: ju.Collection[OUT]): Boolean = t.isEmpty - } - ) - } - - override def exceptionPredicate: Option[Predicate[Throwable]] = Option.empty - } - } - } - } - @Test def testAsyncWaitWithRetryUsingAnonymousFunction(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment @@ -245,7 +221,8 @@ class AsyncDataStreamITCase(ordered: Boolean) extends AbstractTestBase { } val timeout = 10000L - val asyncRetryStrategy = createFixedRetryStrategy[Int](3, 10) + val asyncRetryStrategy = new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder[Int](3, 10) + .build() val asyncMapped = if (ordered) { AsyncDataStream.orderedWaitWithRetry( @@ -283,6 +260,7 @@ class AsyncFunctionWithTimeoutExpired extends RichAsyncFunction[Int, Int] { resultFuture.complete(Seq(input * 2)) }(ExecutionContext.global) } + override def timeout(input: Int, resultFuture: ResultFuture[Int]): Unit = { resultFuture.complete(Seq(input * 3)) invokeLatch.countDown() @@ -307,6 +285,7 @@ class AsyncFunctionWithoutTimeoutExpired extends RichAsyncFunction[Int, Int] { timeoutLatch.countDown() }(ExecutionContext.global) } + override def timeout(input: Int, resultFuture: ResultFuture[Int]): Unit = { // this sleeping helps reproducing race condition with cancellation Thread.sleep(10) @@ -326,6 +305,7 @@ class MyRichAsyncFunction extends RichAsyncFunction[Int, Int] { resultFuture.complete(Seq(input * 2)) }(ExecutionContext.global) } + override def timeout(input: Int, resultFuture: ResultFuture[Int]): Unit = { resultFuture.complete(Seq(input * 3)) }