zentol commented on a change in pull request #17556:
URL: https://github.com/apache/flink/pull/17556#discussion_r746682643



##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/SharedObjectsExtension.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit;
+
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This rule allows objects to be used both in the main test case as well as 
in UDFs by using
+ * serializable {@link SharedReference}s. Usage:
+ *
+ * <pre><code>
+ * {@literal    @RegisterExtension}

Review comment:
       So I'm curious, now that the rule/classrule semantics are baked into the 
extensions themselves, how is a user informed about incorrect usages?
   
   For example, would a static BeforeEachCallback fail? (and vice verse a 
non-static BeforeAllCallback)
   ```
   @RegisterExtension
   public static final SharedObjectsExtension sharedObjects = 
SharedObjectsExtension.create();
   ```

##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/SharedObjectsExtension.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit;
+
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This rule allows objects to be used both in the main test case as well as 
in UDFs by using
+ * serializable {@link SharedReference}s. Usage:
+ *
+ * <pre><code>
+ * {@literal    @RegisterExtension}
+ *     public final SharedObjectsExtension sharedObjects = 
SharedObjectsExtension.create();
+ *
+ * {@literal    @Test}
+ *     public void test() throws Exception {
+ *         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+ * {@literal        SharedReference<Queue<Long>> listRef = 
sharedObjects.add(new ConcurrentLinkedQueue<>());}
+ *         int n = 10000;
+ *         env.setParallelism(100);
+ *         env.fromSequence(0, n).map(i -> listRef.get().add(i));
+ *         env.execute();
+ *         assertEquals(n + 1, listRef.get().size());
+ *         assertEquals(
+ *                 LongStream.rangeClosed(0, 
n).boxed().collect(Collectors.toList()),
+ *                 
listRef.get().stream().sorted().collect(Collectors.toList()));
+ *     }
+ * </code></pre>
+ *
+ * <p>The main idea is that shared objects are bound to the scope of a test 
case instead of a class.
+ * That allows us to:
+ *
+ * <ul>
+ *   <li>Avoid all kinds of static fields in test classes that only exist 
since all fields in UDFs
+ *       need to be serializable.
+ *   <li>Hopefully make it easier to reason about the test setup
+ *   <li>Facilitate to share more test building blocks across test classes.
+ *   <li>Fully allow tests to be rerun/run in parallel without worrying about 
static fields
+ * </ul>
+ *
+ * <p>Note that since the shared objects are accessed through multiple 
threads, they need to be
+ * thread-safe or accessed in a thread-safe manner.
+ */
+public class SharedObjectsExtension implements BeforeEachCallback, 
AfterEachCallback {

Review comment:
       The original version is annotated with `@NotThreadSafe`

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
##########
@@ -58,7 +60,8 @@
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
-public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
+@ExtendWith({TestLoggerExtension.class})

Review comment:
       Isn't it possible to set this up for all classes automatically?

##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/EachCallbackWrapper.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+/** An extension wrap logic for {@link BeforeEachCallback} and {@link 
AfterEachCallback}. */
+public class EachCallbackWrapper implements BeforeEachCallback, 
AfterEachCallback {
+    private final CustomExtension customExtension;
+

Review comment:
       Have you considered something like this:
   ```suggestion
   public class EachCallbackWrapper<C extends CustomExtension> implements 
BeforeEachCallback, AfterEachCallback {
       private final C customExtension;
   
       public C get() { return customExceiton; }
   ```
   
   This _may_ be more user-friendly.

##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/RetryTestExecutionExtension.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit;
+
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.ConditionEvaluationResult;
+import org.junit.jupiter.api.extension.ExecutionCondition;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.TestExecutionExceptionHandler;
+import org.opentest4j.TestAbortedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+import static org.apache.flink.testutils.junit.RetryExtension.RETRY_KEY;
+import static org.apache.flink.testutils.junit.RetryExtension.RETRY_NAMESPACE;
+import static org.apache.flink.testutils.junit.RetryExtension.getTestMethodKey;
+
+/** Extension to decide whether a retry test should run. */
+public class RetryTestExecutionExtension
+        implements ExecutionCondition, TestExecutionExceptionHandler, 
AfterEachCallback {
+    public static final Logger LOG = 
LoggerFactory.getLogger(RetryTestExecutionExtension.class);

Review comment:
       ```suggestion
       private static final Logger LOG = 
LoggerFactory.getLogger(RetryTestExecutionExtension.class);
   ```

##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/RetryTestExecutionExtension.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit;
+
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.ConditionEvaluationResult;
+import org.junit.jupiter.api.extension.ExecutionCondition;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.TestExecutionExceptionHandler;
+import org.opentest4j.TestAbortedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+import static org.apache.flink.testutils.junit.RetryExtension.RETRY_KEY;
+import static org.apache.flink.testutils.junit.RetryExtension.RETRY_NAMESPACE;
+import static org.apache.flink.testutils.junit.RetryExtension.getTestMethodKey;
+
+/** Extension to decide whether a retry test should run. */
+public class RetryTestExecutionExtension
+        implements ExecutionCondition, TestExecutionExceptionHandler, 
AfterEachCallback {
+    public static final Logger LOG = 
LoggerFactory.getLogger(RetryTestExecutionExtension.class);
+    private final int retryIndex;
+    private final int totalTimes;
+    private Class<? extends Throwable> repeatableException;
+
+    public RetryTestExecutionExtension(
+            int retryIndex, int totalTimes, Class<? extends Throwable> 
repeatableException) {
+        this.retryIndex = retryIndex;
+        this.totalTimes = totalTimes;
+        this.repeatableException = repeatableException;
+    }
+
+    @Override
+    public ConditionEvaluationResult 
evaluateExecutionCondition(ExtensionContext context) {
+        Map<String, Boolean> shouldRetry =
+                (Map<String, Boolean>) 
context.getStore(RETRY_NAMESPACE).get(RETRY_KEY);
+        String method = getTestMethodKey(context);
+        if (!shouldRetry.get(method)) {
+            return ConditionEvaluationResult.disabled(method + "have already 
passed or failed.");
+        }
+        return ConditionEvaluationResult.enabled(
+                String.format("Test %s: [%d/%d]", method, retryIndex, 
totalTimes));
+    }
+
+    @Override
+    public void handleTestExecutionException(ExtensionContext context, 
Throwable throwable)
+            throws Throwable {
+        String method = getTestMethodKey(context);
+        if (repeatableException != null) {
+            // RetryOnException
+            if (repeatableException.isAssignableFrom(throwable.getClass())) {
+                // 1. continue retrying when get some repeatable exceptions
+                String retryMsg =
+                        String.format(
+                                "Retry test %s[%d/%d] failed with repeatable 
exception, continue retrying.",
+                                method, retryIndex, totalTimes);
+                LOG.error(retryMsg, throwable);
+                throw new TestAbortedException(retryMsg);

Review comment:
       why does this result in a retry? 😕 

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterExtension.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.core.testutils.CustomExtension;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/** Extension which starts a {@link MiniCluster} for testing purposes. */
+public class MiniClusterExtension implements CustomExtension {

Review comment:
       We may not be able to get rid of the MiniClusterResource that quickly 
because users also rely on it. It may be better to just wrap a 
MiniClusterResource, or create a generic base-class, with 2 sub-classes (for 
junit 4/5 respectively).

##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/TestLoggerExtension.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.TestWatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+/** A JUnit-5-style test logger. */
+public class TestLoggerExtension implements TestWatcher, BeforeEachCallback {

Review comment:
       Does this intentionally not cover the existing `nameProvider`?

##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/util/LogLevelExtension.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.apache.logging.slf4j.Log4jLogger;
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A extension that sets the log level for specific class/package loggers for 
a test. Logging
+ * configuration will only be extended when logging is enabled at all (so root 
logger is not OFF).
+ */
+public class LogLevelExtension implements BeforeAllCallback, AfterAllCallback {

Review comment:
       Isn't this subsumed by the `LoggerAuditingExtension`? I would open a 
ticket to migrate the 2! existing usages instead of porting it.

##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CustomExtension.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+/**
+ * An extension logic should be used with {@link EachCallbackWrapper} or 
{@link AllCallbackWrapper}.
+ *
+ * <p>{@code before} method will be called in {@code beforeEach} or {@code 
beforeAll}. {@code after}
+ * will be called in {@code afterEach} or {@code afterAll}.
+ *
+ * <p>Usage example:
+ *
+ * <pre>{@code
+ * public class Test{
+ *      CustomExtension eachCustom = new CustomExtensionImpl1();
+ *      CustomExtension allCustom = new CustomExtensionImpl2();
+ *      @RegisterExtension
+ *      static AllCallbackWrapper allWrapper = new 
AllCallbackWrapper(allCustom);
+ *      @RegisterExtension
+ *      EachCallbackWrapper eachWrapper = new EachCallbackWrapper(eachCustom);
+ * }
+ * }</pre>
+ *
+ * <p>Note that do not use the same {@code CustomExtension} instance to the 
AllCallbackWrapper and
+ * EachCallbackWrapper for the same test class, which may result in odd 
behavior.
+ */

Review comment:
       ```suggestion
   /**
    * An extension that is invoked before/after all/each tests, depending on 
whether it is wrapped in a {@link EachCallbackWrapper} or {@link 
AllCallbackWrapper}.
    *
    * <p>{@code before} method will be called in {@code beforeEach} or {@code 
beforeAll}. {@code after}
    * will be called in {@code afterEach} or {@code afterAll}.
    *
    * <p>Usage example:
    *
    * <pre>{@code
    * public class Test{
    *      CustomExtension eachCustom = new CustomExtensionImpl1();
    *      CustomExtension allCustom = new CustomExtensionImpl2();
    *      @RegisterExtension
    *      static AllCallbackWrapper allWrapper = new 
AllCallbackWrapper(allCustom);
    *      @RegisterExtension
    *      EachCallbackWrapper eachWrapper = new 
EachCallbackWrapper(eachCustom);
    * }
    * }</pre>
    *
    * <p>A {@code CustomExtension} instance must not be wrapped in both 
AllCallbackWrapper and
    * EachCallbackWrapper for the same test class.
    */
   ```

##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/RetryExtension.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.Extension;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/** An extension to let failed test retry. */
+public class RetryExtension implements TestTemplateInvocationContextProvider, 
AfterAllCallback {
+    public static final Logger LOG = 
LoggerFactory.getLogger(RetryExtension.class);
+    public static final ExtensionContext.Namespace RETRY_NAMESPACE =
+            ExtensionContext.Namespace.create("retryLog");
+    public static final String RETRY_KEY = "testRetry";
+
+    @Override
+    public boolean supportsTestTemplate(ExtensionContext context) {
+        Method testMethod = context.getRequiredTestMethod();
+        RetryOnFailure retryOnFailure = 
testMethod.getAnnotation(RetryOnFailure.class);
+        RetryOnException retryOnException = 
testMethod.getAnnotation(RetryOnException.class);
+
+        if (retryOnFailure == null && retryOnException == null) {
+            // if nothing is specified on the test method, fall back to 
annotations on the class
+            retryOnFailure = 
context.getTestClass().get().getAnnotation(RetryOnFailure.class);
+            retryOnException = 
context.getTestClass().get().getAnnotation(RetryOnException.class);
+        }
+        return retryOnException != null || retryOnFailure != null;
+    }
+
+    @Override
+    public Stream<TestTemplateInvocationContext> 
provideTestTemplateInvocationContexts(
+            ExtensionContext context) {
+        Method testMethod = context.getRequiredTestMethod();
+        RetryOnFailure retryOnFailure = 
testMethod.getAnnotation(RetryOnFailure.class);
+        RetryOnException retryOnException = 
testMethod.getAnnotation(RetryOnException.class);
+
+        if (retryOnFailure == null && retryOnException == null) {
+            // if nothing is specified on the test method, fall back to 
annotations on the class
+            retryOnFailure = 
context.getTestClass().get().getAnnotation(RetryOnFailure.class);
+            retryOnException = 
context.getTestClass().get().getAnnotation(RetryOnException.class);
+        }
+        // sanity check that we don't use both annotations
+        if (retryOnFailure != null && retryOnException != null) {
+            throw new IllegalArgumentException(
+                    "You cannot combine the RetryOnFailure and 
RetryOnException annotations.");
+        }
+
+        Map<String, Boolean> testLog =
+                (Map<String, Boolean>)
+                        context.getStore(RETRY_NAMESPACE)
+                                .getOrComputeIfAbsent(RETRY_KEY, key -> new 
HashMap<>());
+        testLog.put(getTestMethodKey(context), true);
+        if (retryOnFailure == null) {
+            int totalTimes = retryOnException.times() + 1;
+            RetryOnException finalRetryOnException = retryOnException;
+            return IntStream.rangeClosed(1, totalTimes)
+                    .mapToObj(
+                            i ->
+                                    new RetryContext(
+                                            i, totalTimes, 
finalRetryOnException.exception()));
+        } else {
+            int totalTimes = retryOnFailure.times() + 1;
+            return IntStream.rangeClosed(1, totalTimes)
+                    .mapToObj(i -> new RetryContext(i, totalTimes, null));
+        }
+    }
+
+    @Override
+    public void afterAll(ExtensionContext context) throws Exception {
+        context.getStore(RETRY_NAMESPACE).remove(RETRY_KEY);
+    }
+
+    public static String getTestMethodKey(ExtensionContext context) {
+        return context.getRequiredTestClass().getCanonicalName()
+                + "."

Review comment:
       ```suggestion
                   + "#"
   ```
   ?

##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/RetryExtension.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.Extension;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/** An extension to let failed test retry. */
+public class RetryExtension implements TestTemplateInvocationContextProvider, 
AfterAllCallback {
+    public static final Logger LOG = 
LoggerFactory.getLogger(RetryExtension.class);
+    public static final ExtensionContext.Namespace RETRY_NAMESPACE =
+            ExtensionContext.Namespace.create("retryLog");
+    public static final String RETRY_KEY = "testRetry";

Review comment:
       ```suggestion
       static final ExtensionContext.Namespace RETRY_NAMESPACE =
               ExtensionContext.Namespace.create("retryLog");
       static final String RETRY_KEY = "testRetry";
   ```

##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/RetryExtension.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.Extension;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/** An extension to let failed test retry. */
+public class RetryExtension implements TestTemplateInvocationContextProvider, 
AfterAllCallback {
+    public static final Logger LOG = 
LoggerFactory.getLogger(RetryExtension.class);
+    public static final ExtensionContext.Namespace RETRY_NAMESPACE =
+            ExtensionContext.Namespace.create("retryLog");
+    public static final String RETRY_KEY = "testRetry";
+
+    @Override
+    public boolean supportsTestTemplate(ExtensionContext context) {
+        Method testMethod = context.getRequiredTestMethod();
+        RetryOnFailure retryOnFailure = 
testMethod.getAnnotation(RetryOnFailure.class);
+        RetryOnException retryOnException = 
testMethod.getAnnotation(RetryOnException.class);
+
+        if (retryOnFailure == null && retryOnException == null) {
+            // if nothing is specified on the test method, fall back to 
annotations on the class
+            retryOnFailure = 
context.getTestClass().get().getAnnotation(RetryOnFailure.class);
+            retryOnException = 
context.getTestClass().get().getAnnotation(RetryOnException.class);
+        }
+        return retryOnException != null || retryOnFailure != null;
+    }
+
+    @Override
+    public Stream<TestTemplateInvocationContext> 
provideTestTemplateInvocationContexts(
+            ExtensionContext context) {
+        Method testMethod = context.getRequiredTestMethod();
+        RetryOnFailure retryOnFailure = 
testMethod.getAnnotation(RetryOnFailure.class);
+        RetryOnException retryOnException = 
testMethod.getAnnotation(RetryOnException.class);
+
+        if (retryOnFailure == null && retryOnException == null) {
+            // if nothing is specified on the test method, fall back to 
annotations on the class
+            retryOnFailure = 
context.getTestClass().get().getAnnotation(RetryOnFailure.class);

Review comment:
       Well, that makes `supportsTestTemplate` a bit pointless doesn't it :/ 
Could we not store the annotation in the `ExtensionContext` in 
`supportsTestTemplate` and just retrieve it here?

##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/RetryExtension.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.Extension;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/** An extension to let failed test retry. */
+public class RetryExtension implements TestTemplateInvocationContextProvider, 
AfterAllCallback {
+    public static final Logger LOG = 
LoggerFactory.getLogger(RetryExtension.class);

Review comment:
       ```suggestion
       private static final Logger LOG = 
LoggerFactory.getLogger(RetryExtension.class);
   ```

##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/RetryExtension.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.Extension;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/** An extension to let failed test retry. */
+public class RetryExtension implements TestTemplateInvocationContextProvider, 
AfterAllCallback {
+    public static final Logger LOG = 
LoggerFactory.getLogger(RetryExtension.class);
+    public static final ExtensionContext.Namespace RETRY_NAMESPACE =
+            ExtensionContext.Namespace.create("retryLog");
+    public static final String RETRY_KEY = "testRetry";
+
+    @Override
+    public boolean supportsTestTemplate(ExtensionContext context) {
+        Method testMethod = context.getRequiredTestMethod();
+        RetryOnFailure retryOnFailure = 
testMethod.getAnnotation(RetryOnFailure.class);
+        RetryOnException retryOnException = 
testMethod.getAnnotation(RetryOnException.class);
+
+        if (retryOnFailure == null && retryOnException == null) {
+            // if nothing is specified on the test method, fall back to 
annotations on the class
+            retryOnFailure = 
context.getTestClass().get().getAnnotation(RetryOnFailure.class);
+            retryOnException = 
context.getTestClass().get().getAnnotation(RetryOnException.class);
+        }
+        return retryOnException != null || retryOnFailure != null;
+    }
+
+    @Override
+    public Stream<TestTemplateInvocationContext> 
provideTestTemplateInvocationContexts(
+            ExtensionContext context) {
+        Method testMethod = context.getRequiredTestMethod();
+        RetryOnFailure retryOnFailure = 
testMethod.getAnnotation(RetryOnFailure.class);
+        RetryOnException retryOnException = 
testMethod.getAnnotation(RetryOnException.class);
+
+        if (retryOnFailure == null && retryOnException == null) {
+            // if nothing is specified on the test method, fall back to 
annotations on the class
+            retryOnFailure = 
context.getTestClass().get().getAnnotation(RetryOnFailure.class);
+            retryOnException = 
context.getTestClass().get().getAnnotation(RetryOnException.class);
+        }
+        // sanity check that we don't use both annotations
+        if (retryOnFailure != null && retryOnException != null) {
+            throw new IllegalArgumentException(
+                    "You cannot combine the RetryOnFailure and 
RetryOnException annotations.");
+        }
+
+        Map<String, Boolean> testLog =
+                (Map<String, Boolean>)
+                        context.getStore(RETRY_NAMESPACE)
+                                .getOrComputeIfAbsent(RETRY_KEY, key -> new 
HashMap<>());
+        testLog.put(getTestMethodKey(context), true);
+        if (retryOnFailure == null) {
+            int totalTimes = retryOnException.times() + 1;
+            RetryOnException finalRetryOnException = retryOnException;
+            return IntStream.rangeClosed(1, totalTimes)
+                    .mapToObj(
+                            i ->
+                                    new RetryContext(
+                                            i, totalTimes, 
finalRetryOnException.exception()));
+        } else {
+            int totalTimes = retryOnFailure.times() + 1;
+            return IntStream.rangeClosed(1, totalTimes)
+                    .mapToObj(i -> new RetryContext(i, totalTimes, null));
+        }
+    }
+
+    @Override
+    public void afterAll(ExtensionContext context) throws Exception {
+        context.getStore(RETRY_NAMESPACE).remove(RETRY_KEY);
+    }
+
+    public static String getTestMethodKey(ExtensionContext context) {

Review comment:
       ```suggestion
       static String getTestMethodKey(ExtensionContext context) {
   ```

##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/RetryExtension.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.Extension;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/** An extension to let failed test retry. */
+public class RetryExtension implements TestTemplateInvocationContextProvider, 
AfterAllCallback {
+    public static final Logger LOG = 
LoggerFactory.getLogger(RetryExtension.class);
+    public static final ExtensionContext.Namespace RETRY_NAMESPACE =
+            ExtensionContext.Namespace.create("retryLog");
+    public static final String RETRY_KEY = "testRetry";
+
+    @Override
+    public boolean supportsTestTemplate(ExtensionContext context) {
+        Method testMethod = context.getRequiredTestMethod();
+        RetryOnFailure retryOnFailure = 
testMethod.getAnnotation(RetryOnFailure.class);
+        RetryOnException retryOnException = 
testMethod.getAnnotation(RetryOnException.class);
+
+        if (retryOnFailure == null && retryOnException == null) {
+            // if nothing is specified on the test method, fall back to 
annotations on the class
+            retryOnFailure = 
context.getTestClass().get().getAnnotation(RetryOnFailure.class);
+            retryOnException = 
context.getTestClass().get().getAnnotation(RetryOnException.class);
+        }
+        return retryOnException != null || retryOnFailure != null;
+    }
+
+    @Override
+    public Stream<TestTemplateInvocationContext> 
provideTestTemplateInvocationContexts(
+            ExtensionContext context) {
+        Method testMethod = context.getRequiredTestMethod();
+        RetryOnFailure retryOnFailure = 
testMethod.getAnnotation(RetryOnFailure.class);
+        RetryOnException retryOnException = 
testMethod.getAnnotation(RetryOnException.class);
+
+        if (retryOnFailure == null && retryOnException == null) {
+            // if nothing is specified on the test method, fall back to 
annotations on the class
+            retryOnFailure = 
context.getTestClass().get().getAnnotation(RetryOnFailure.class);
+            retryOnException = 
context.getTestClass().get().getAnnotation(RetryOnException.class);
+        }
+        // sanity check that we don't use both annotations
+        if (retryOnFailure != null && retryOnException != null) {
+            throw new IllegalArgumentException(
+                    "You cannot combine the RetryOnFailure and 
RetryOnException annotations.");
+        }
+
+        Map<String, Boolean> testLog =
+                (Map<String, Boolean>)
+                        context.getStore(RETRY_NAMESPACE)
+                                .getOrComputeIfAbsent(RETRY_KEY, key -> new 
HashMap<>());
+        testLog.put(getTestMethodKey(context), true);
+        if (retryOnFailure == null) {
+            int totalTimes = retryOnException.times() + 1;
+            RetryOnException finalRetryOnException = retryOnException;
+            return IntStream.rangeClosed(1, totalTimes)
+                    .mapToObj(
+                            i ->
+                                    new RetryContext(
+                                            i, totalTimes, 
finalRetryOnException.exception()));
+        } else {
+            int totalTimes = retryOnFailure.times() + 1;
+            return IntStream.rangeClosed(1, totalTimes)
+                    .mapToObj(i -> new RetryContext(i, totalTimes, null));
+        }
+    }
+
+    @Override
+    public void afterAll(ExtensionContext context) throws Exception {
+        context.getStore(RETRY_NAMESPACE).remove(RETRY_KEY);
+    }
+
+    public static String getTestMethodKey(ExtensionContext context) {
+        return context.getRequiredTestClass().getCanonicalName()
+                + "."
+                + context.getRequiredTestMethod().getName();
+    }
+
+    class RetryContext implements TestTemplateInvocationContext {
+        int retryIndex;
+        int totalTimes;
+        Class<? extends Throwable> repeatableException;
+
+        RetryContext(
+                int retryIndex, int totalTimes, Class<? extends Throwable> 
repeatableException) {
+            this.retryIndex = retryIndex;
+            this.totalTimes = totalTimes;
+            this.repeatableException = repeatableException;
+        }
+
+        @Override
+        public String getDisplayName(int invocationIndex) {
+            return String.format(
+                    "RetryIndex: [%d], Test:[%d/%d]", invocationIndex, 
retryIndex, totalTimes);

Review comment:
       what is the difference between `invocationIndex` and `retryIndex?`
   ```suggestion
                       "Attempt [%d/%d]", retryIndex, totalTimes);
   ```

##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/RetryTestExecutionExtension.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit;
+
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.ConditionEvaluationResult;
+import org.junit.jupiter.api.extension.ExecutionCondition;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.TestExecutionExceptionHandler;
+import org.opentest4j.TestAbortedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+import static org.apache.flink.testutils.junit.RetryExtension.RETRY_KEY;
+import static org.apache.flink.testutils.junit.RetryExtension.RETRY_NAMESPACE;
+import static org.apache.flink.testutils.junit.RetryExtension.getTestMethodKey;
+
+/** Extension to decide whether a retry test should run. */
+public class RetryTestExecutionExtension
+        implements ExecutionCondition, TestExecutionExceptionHandler, 
AfterEachCallback {
+    public static final Logger LOG = 
LoggerFactory.getLogger(RetryTestExecutionExtension.class);
+    private final int retryIndex;
+    private final int totalTimes;
+    private Class<? extends Throwable> repeatableException;
+
+    public RetryTestExecutionExtension(
+            int retryIndex, int totalTimes, Class<? extends Throwable> 
repeatableException) {
+        this.retryIndex = retryIndex;
+        this.totalTimes = totalTimes;
+        this.repeatableException = repeatableException;
+    }
+
+    @Override
+    public ConditionEvaluationResult 
evaluateExecutionCondition(ExtensionContext context) {
+        Map<String, Boolean> shouldRetry =
+                (Map<String, Boolean>) 
context.getStore(RETRY_NAMESPACE).get(RETRY_KEY);
+        String method = getTestMethodKey(context);
+        if (!shouldRetry.get(method)) {
+            return ConditionEvaluationResult.disabled(method + "have already 
passed or failed.");
+        }
+        return ConditionEvaluationResult.enabled(
+                String.format("Test %s: [%d/%d]", method, retryIndex, 
totalTimes));
+    }
+
+    @Override
+    public void handleTestExecutionException(ExtensionContext context, 
Throwable throwable)
+            throws Throwable {
+        String method = getTestMethodKey(context);
+        if (repeatableException != null) {

Review comment:
       are there any limitation as to what we can add to the ExtensionContext 
store? It would be quite neat if we could just encapsulate the 2 restart 
mechanisms in 2 implementations of a sort of restart strategy, and we just 
store one such instance in the store.

##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/RetryTestExecutionExtension.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit;
+
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.ConditionEvaluationResult;
+import org.junit.jupiter.api.extension.ExecutionCondition;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.TestExecutionExceptionHandler;
+import org.opentest4j.TestAbortedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+import static org.apache.flink.testutils.junit.RetryExtension.RETRY_KEY;
+import static org.apache.flink.testutils.junit.RetryExtension.RETRY_NAMESPACE;
+import static org.apache.flink.testutils.junit.RetryExtension.getTestMethodKey;
+
+/** Extension to decide whether a retry test should run. */
+public class RetryTestExecutionExtension
+        implements ExecutionCondition, TestExecutionExceptionHandler, 
AfterEachCallback {
+    public static final Logger LOG = 
LoggerFactory.getLogger(RetryTestExecutionExtension.class);
+    private final int retryIndex;
+    private final int totalTimes;
+    private Class<? extends Throwable> repeatableException;
+
+    public RetryTestExecutionExtension(
+            int retryIndex, int totalTimes, Class<? extends Throwable> 
repeatableException) {
+        this.retryIndex = retryIndex;
+        this.totalTimes = totalTimes;
+        this.repeatableException = repeatableException;
+    }
+
+    @Override
+    public ConditionEvaluationResult 
evaluateExecutionCondition(ExtensionContext context) {
+        Map<String, Boolean> shouldRetry =
+                (Map<String, Boolean>) 
context.getStore(RETRY_NAMESPACE).get(RETRY_KEY);
+        String method = getTestMethodKey(context);
+        if (!shouldRetry.get(method)) {
+            return ConditionEvaluationResult.disabled(method + "have already 
passed or failed.");
+        }
+        return ConditionEvaluationResult.enabled(
+                String.format("Test %s: [%d/%d]", method, retryIndex, 
totalTimes));
+    }
+
+    @Override
+    public void handleTestExecutionException(ExtensionContext context, 
Throwable throwable)
+            throws Throwable {
+        String method = getTestMethodKey(context);
+        if (repeatableException != null) {
+            // RetryOnException
+            if (repeatableException.isAssignableFrom(throwable.getClass())) {
+                // 1. continue retrying when get some repeatable exceptions
+                String retryMsg =
+                        String.format(
+                                "Retry test %s[%d/%d] failed with repeatable 
exception, continue retrying.",
+                                method, retryIndex, totalTimes);
+                LOG.error(retryMsg, throwable);

Review comment:
       maybe just a warning?

##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/RetryTestExecutionExtension.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit;
+
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.ConditionEvaluationResult;
+import org.junit.jupiter.api.extension.ExecutionCondition;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.TestExecutionExceptionHandler;
+import org.opentest4j.TestAbortedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+import static org.apache.flink.testutils.junit.RetryExtension.RETRY_KEY;
+import static org.apache.flink.testutils.junit.RetryExtension.RETRY_NAMESPACE;
+import static org.apache.flink.testutils.junit.RetryExtension.getTestMethodKey;
+
+/** Extension to decide whether a retry test should run. */
+public class RetryTestExecutionExtension
+        implements ExecutionCondition, TestExecutionExceptionHandler, 
AfterEachCallback {
+    public static final Logger LOG = 
LoggerFactory.getLogger(RetryTestExecutionExtension.class);
+    private final int retryIndex;
+    private final int totalTimes;
+    private Class<? extends Throwable> repeatableException;
+
+    public RetryTestExecutionExtension(
+            int retryIndex, int totalTimes, Class<? extends Throwable> 
repeatableException) {
+        this.retryIndex = retryIndex;
+        this.totalTimes = totalTimes;
+        this.repeatableException = repeatableException;
+    }
+
+    @Override
+    public ConditionEvaluationResult 
evaluateExecutionCondition(ExtensionContext context) {
+        Map<String, Boolean> shouldRetry =
+                (Map<String, Boolean>) 
context.getStore(RETRY_NAMESPACE).get(RETRY_KEY);
+        String method = getTestMethodKey(context);
+        if (!shouldRetry.get(method)) {
+            return ConditionEvaluationResult.disabled(method + "have already 
passed or failed.");

Review comment:
       ```suggestion
               return ConditionEvaluationResult.disabled(method + " has already 
passed or failed.");
   ```

##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/test/java/org/apache/flink/testutils/junit/RetryOnExceptionExtensionTest.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Tests for the RetryOnException annotation. */
+@ExtendWith(RetryExtension.class)
+public class RetryOnExceptionExtensionTest {
+
+    private static final int NUMBER_OF_RUNS = 3;
+
+    private static int runsForSuccessfulTest = 0;
+
+    private static int runsForTestWithMatchingException = 0;
+
+    private static int runsForTestWithSubclassException = 0;
+
+    private static int runsForPassAfterOneFailure = 0;
+
+    @AfterAll
+    public static void verify() {
+        assertEquals(NUMBER_OF_RUNS + 1, runsForTestWithMatchingException);
+        assertEquals(NUMBER_OF_RUNS + 1, runsForTestWithSubclassException);
+        assertEquals(1, runsForSuccessfulTest);
+        assertEquals(2, runsForPassAfterOneFailure);
+    }
+
+    @TestTemplate

Review comment:
       Is this always required if once wants to use `@RetryOnException?` Can we 
set it up somehow that the Retry annotations is sufficient?

##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/RetryTestExecutionExtension.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit;
+
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.ConditionEvaluationResult;
+import org.junit.jupiter.api.extension.ExecutionCondition;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.TestExecutionExceptionHandler;
+import org.opentest4j.TestAbortedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+import static org.apache.flink.testutils.junit.RetryExtension.RETRY_KEY;
+import static org.apache.flink.testutils.junit.RetryExtension.RETRY_NAMESPACE;
+import static org.apache.flink.testutils.junit.RetryExtension.getTestMethodKey;
+
+/** Extension to decide whether a retry test should run. */
+public class RetryTestExecutionExtension
+        implements ExecutionCondition, TestExecutionExceptionHandler, 
AfterEachCallback {
+    public static final Logger LOG = 
LoggerFactory.getLogger(RetryTestExecutionExtension.class);
+    private final int retryIndex;
+    private final int totalTimes;
+    private Class<? extends Throwable> repeatableException;
+
+    public RetryTestExecutionExtension(
+            int retryIndex, int totalTimes, Class<? extends Throwable> 
repeatableException) {
+        this.retryIndex = retryIndex;
+        this.totalTimes = totalTimes;
+        this.repeatableException = repeatableException;
+    }
+
+    @Override
+    public ConditionEvaluationResult 
evaluateExecutionCondition(ExtensionContext context) {
+        Map<String, Boolean> shouldRetry =
+                (Map<String, Boolean>) 
context.getStore(RETRY_NAMESPACE).get(RETRY_KEY);
+        String method = getTestMethodKey(context);
+        if (!shouldRetry.get(method)) {
+            return ConditionEvaluationResult.disabled(method + "have already 
passed or failed.");
+        }
+        return ConditionEvaluationResult.enabled(
+                String.format("Test %s: [%d/%d]", method, retryIndex, 
totalTimes));
+    }
+
+    @Override
+    public void handleTestExecutionException(ExtensionContext context, 
Throwable throwable)
+            throws Throwable {
+        String method = getTestMethodKey(context);
+        if (repeatableException != null) {
+            // RetryOnException
+            if (repeatableException.isAssignableFrom(throwable.getClass())) {
+                // 1. continue retrying when get some repeatable exceptions
+                String retryMsg =
+                        String.format(
+                                "Retry test %s[%d/%d] failed with repeatable 
exception, continue retrying.",
+                                method, retryIndex, totalTimes);
+                LOG.error(retryMsg, throwable);
+                throw new TestAbortedException(retryMsg);
+            } else {
+                // 2. stop retrying when get an unrepeatable exception
+                Map<String, Boolean> shouldRetry =
+                        (Map<String, Boolean>) 
context.getStore(RETRY_NAMESPACE).get(RETRY_KEY);
+                shouldRetry.put(method, false);
+                LOG.error(
+                        String.format(
+                                "Retry test %s[%d/%d] failed with unrepeatable 
exception, stop retrying.",
+                                method, retryIndex, totalTimes),
+                        throwable);
+                throw throwable;
+            }
+        } else {
+            // RetryOnFailure
+            // 1. Failed when reach the total retry times
+            if (retryIndex == totalTimes) {
+                LOG.error("Test Failed at the last retry.", throwable);
+                throw throwable;
+            }
+
+            // 2. continue retrying
+            String retryMsg =
+                    String.format(
+                            "Retry test %s[%d/%d] failed, continue retrying.",
+                            method, retryIndex, totalTimes);
+            LOG.error(retryMsg, throwable);
+            throw new TestAbortedException(retryMsg);
+        }
+    }
+
+    @Override
+    public void afterEach(ExtensionContext context) throws Exception {
+        Throwable exception = context.getExecutionException().orElse(null);
+        if (exception == null) {
+            Map<String, Boolean> shouldRetry =
+                    (Map<String, Boolean>) 
context.getStore(RETRY_NAMESPACE).get(RETRY_KEY);
+            String method = getTestMethodKey(context);
+            shouldRetry.put(method, false);
+            LOG.info(

Review comment:
       ```suggestion
               LOG.trace(
   ```
   This will just be noise.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to