This is an automated email from the ASF dual-hosted git repository.
panyuepeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2837c20a10b [FLINK-39124][JUnit5 Migration] Migrate the JUnit5 for
Module: flink-tests (#27666)
2837c20a10b is described below
commit 2837c20a10bd226188d3491b4a7bc4fddf2ea9cb
Author: Mate Czagany <[email protected]>
AuthorDate: Wed Mar 18 13:24:13 2026 +0100
[FLINK-39124][JUnit5 Migration] Migrate the JUnit5 for Module: flink-tests
(#27666)
* [FLINK-39124][test] Migrate operator restore tests to JUnit 5
* [FLINK-39124] Remove visibility modifiers from all test methods and
classes
* [FLINK-39124] Improve scopes and assertions
---
.../restore/AbstractOperatorRestoreTestBase.java | 87 +++++++++----------
.../restore/StreamOperatorSnapshotRestoreTest.java | 98 ++++++++++------------
.../AbstractKeyedOperatorRestoreTestBase.java | 12 +--
.../restore/keyed/KeyedComplexChainTest.java | 6 +-
.../state/operator/restore/keyed/KeyedJob.java | 33 ++++----
.../AbstractNonKeyedOperatorRestoreTestBase.java | 13 +--
.../operator/restore/unkeyed/ChainBreakTest.java | 8 +-
.../restore/unkeyed/ChainLengthDecreaseTest.java | 9 +-
.../restore/unkeyed/ChainLengthIncreaseTest.java | 9 +-
.../unkeyed/ChainLengthStatelessDecreaseTest.java | 9 +-
.../operator/restore/unkeyed/ChainOrderTest.java | 9 +-
.../operator/restore/unkeyed/ChainUnionTest.java | 9 +-
.../operator/restore/unkeyed/NonKeyedJob.java | 27 +++---
13 files changed, 135 insertions(+), 194 deletions(-)
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index 99403d6bb08..612953f4379 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -33,21 +33,25 @@ import
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.streaming.util.CheckpointStorageUtils;
import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.streaming.util.StateBackendUtils;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.test.util.MigrationTest;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.net.URI;
@@ -65,8 +69,7 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
/**
* Abstract class to verify that it is possible to migrate a savepoint across
upgraded Flink
@@ -79,19 +82,23 @@ import static org.junit.Assert.assertNotNull;
* <p>The savepoint _metadata file for the current branch is stored in the
savepointPath in {@link
* AbstractOperatorRestoreTestBase#migrateJob}.
*/
-public abstract class AbstractOperatorRestoreTestBase extends TestLogger
implements MigrationTest {
+@ExtendWith({TestLoggerExtension.class, ParameterizedTestExtension.class})
+public abstract class AbstractOperatorRestoreTestBase implements MigrationTest
{
- private final FlinkVersion flinkVersion;
+ @Parameter private FlinkVersion flinkVersion;
- @Parameterized.Parameters(name = "Migrate Savepoint: {0}")
- public static Collection<FlinkVersion> parameters() {
+ @Parameters(name = "Migrate Savepoint: {0}")
+ private static Collection<FlinkVersion> parameters() {
return FlinkVersion.rangeOf(
FlinkVersion.v1_8,
MigrationTest.getMostRecentlyPublishedVersion());
}
- @ClassRule
- public static final TestExecutorResource<ScheduledExecutorService>
EXECUTOR_RESOURCE =
- TestingUtils.defaultExecutorResource();
+ @RegisterExtension
+ private static final TestExecutorExtension<ScheduledExecutorService>
EXECUTOR_RESOURCE =
+ TestingUtils.defaultExecutorExtension();
+
+ private final ScheduledExecutor scheduledExecutor =
+ new
ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor());
private static final int NUM_TMS = 1;
private static final int NUM_SLOTS_PER_TM = 4;
@@ -110,25 +117,24 @@ public abstract class AbstractOperatorRestoreTestBase
extends TestLogger impleme
.map(AbstractOperatorRestoreTestBase::escapeRegexCharacters)
.collect(Collectors.joining(")|(", "(", ")")));
- @Rule public final TemporaryFolder tmpFolder = new TemporaryFolder();
+ @TempDir private File tmpFolder;
- @Rule
- public final MiniClusterWithClientResource cluster =
- new MiniClusterWithClientResource(
+ @RegisterExtension
+ private static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
+ new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(NUM_TMS)
.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
.build());
- private final ScheduledExecutor scheduledExecutor =
- new
ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor());
+ private ClusterClient<?> clusterClient;
- protected AbstractOperatorRestoreTestBase(FlinkVersion flinkVersion) {
- this.flinkVersion = flinkVersion;
+ @BeforeEach
+ void setClusterClient(@InjectClusterClient ClusterClient<?> clusterClient)
{
+ this.clusterClient = clusterClient;
}
protected void internalGenerateSnapshots(FlinkVersion targetVersion)
throws Exception {
- ClusterClient<?> clusterClient = cluster.getClusterClient();
final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
// Submit job with old version savepoint and create a migrated
savepoint in the new version.
@@ -149,9 +155,8 @@ public abstract class AbstractOperatorRestoreTestBase
extends TestLogger impleme
}
}
- @Test
- public void testMigrationAndRestore() throws Throwable {
- ClusterClient<?> clusterClient = cluster.getClusterClient();
+ @TestTemplate
+ void testMigrationAndRestore() throws Throwable {
final Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
// submit job with old version savepoint and create a migrated
savepoint in the new version
@@ -176,7 +181,7 @@ public abstract class AbstractOperatorRestoreTestBase
extends TestLogger impleme
jobToMigrate.setSavepointRestoreSettings(
SavepointRestoreSettings.forPath(savepointResource.getFile()));
- assertNotNull(jobToMigrate.getJobID());
+ assertThat(jobToMigrate.getJobID()).isNotNull();
clusterClient.submitJob(jobToMigrate).get();
@@ -187,12 +192,10 @@ public abstract class AbstractOperatorRestoreTestBase
extends TestLogger impleme
deadline,
(jobStatus) -> jobStatus == JobStatus.RUNNING,
scheduledExecutor);
- assertEquals(
- JobStatus.RUNNING,
- jobRunningFuture.get(deadline.timeLeft().toMillis(),
TimeUnit.MILLISECONDS));
+ assertThat(jobRunningFuture.get(deadline.timeLeft().toMillis(),
TimeUnit.MILLISECONDS))
+ .isEqualTo(JobStatus.RUNNING);
// Trigger savepoint
- File targetDirectory = tmpFolder.newFolder();
String savepointPath = null;
// FLINK-6918: Retry cancel with savepoint message in case that
StreamTasks were not running
@@ -204,7 +207,7 @@ public abstract class AbstractOperatorRestoreTestBase
extends TestLogger impleme
clusterClient
.cancelWithSavepoint(
jobToMigrate.getJobID(),
- targetDirectory.getAbsolutePath(),
+ tmpFolder.getAbsolutePath(),
SavepointFormatType.CANONICAL)
.get();
} catch (Exception e) {
@@ -217,7 +220,7 @@ public abstract class AbstractOperatorRestoreTestBase
extends TestLogger impleme
}
}
- assertNotNull("Could not take savepoint.", savepointPath);
+ assertThat(savepointPath).as("Could not take savepoint.").isNotNull();
CompletableFuture<JobStatus> jobCanceledFuture =
FutureUtils.retrySuccessfulWithDelay(
@@ -226,9 +229,8 @@ public abstract class AbstractOperatorRestoreTestBase
extends TestLogger impleme
deadline,
(jobStatus) -> jobStatus == JobStatus.CANCELED,
scheduledExecutor);
- assertEquals(
- JobStatus.CANCELED,
- jobCanceledFuture.get(deadline.timeLeft().toMillis(),
TimeUnit.MILLISECONDS));
+ assertThat(jobCanceledFuture.get(deadline.timeLeft().toMillis(),
TimeUnit.MILLISECONDS))
+ .isEqualTo(JobStatus.CANCELED);
return savepointPath;
}
@@ -239,7 +241,7 @@ public abstract class AbstractOperatorRestoreTestBase
extends TestLogger impleme
jobToRestore.setSavepointRestoreSettings(
SavepointRestoreSettings.forPath(savepointPath, true));
- assertNotNull("Job doesn't have a JobID.", jobToRestore.getJobID());
+ assertThat(jobToRestore.getJobID()).as("Job doesn't have a
JobID.").isNotNull();
clusterClient.submitJob(jobToRestore).get();
@@ -250,9 +252,8 @@ public abstract class AbstractOperatorRestoreTestBase
extends TestLogger impleme
deadline,
(jobStatus) -> jobStatus == JobStatus.FINISHED,
scheduledExecutor);
- assertEquals(
- JobStatus.FINISHED,
- jobStatusFuture.get(deadline.timeLeft().toMillis(),
TimeUnit.MILLISECONDS));
+ assertThat(jobStatusFuture.get(deadline.timeLeft().toMillis(),
TimeUnit.MILLISECONDS))
+ .isEqualTo(JobStatus.FINISHED);
}
private JobGraph createJobGraph(ExecutionMode mode) {
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/StreamOperatorSnapshotRestoreTest.java
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/StreamOperatorSnapshotRestoreTest.java
index ae8c12b2819..5c606000fbb 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/StreamOperatorSnapshotRestoreTest.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/StreamOperatorSnapshotRestoreTest.java
@@ -64,25 +64,23 @@ import
org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.util.TernaryBoolean;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import java.io.File;
import java.io.IOException;
import java.io.InputStream;
-import java.util.Arrays;
import java.util.BitSet;
-import java.util.Collection;
+
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link StreamOperator} snapshot restoration. */
-@RunWith(Parameterized.class)
-public class StreamOperatorSnapshotRestoreTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+class StreamOperatorSnapshotRestoreTest {
private static final int ONLY_JM_RECOVERY = 0;
private static final int TM_AND_JM_RECOVERY = 1;
@@ -91,42 +89,27 @@ public class StreamOperatorSnapshotRestoreTest extends
TestLogger {
private static final int MAX_PARALLELISM = 10;
- protected static TemporaryFolder temporaryFolder;
-
- @Parameterized.Parameter public StateBackendEnum stateBackendEnum;
+ @TempDir private static File temporaryFolder;
- enum StateBackendEnum {
+ private enum StateBackendEnum {
FILE,
ROCKSDB_FULLY_ASYNC,
ROCKSDB_INCREMENTAL
}
- @Parameterized.Parameters(name = "statebackend type ={0}")
- public static Collection<StateBackendEnum> parameter() {
- return Arrays.asList(StateBackendEnum.values());
- }
-
- @BeforeClass
- public static void beforeClass() throws IOException {
- temporaryFolder = new TemporaryFolder();
- temporaryFolder.create();
- }
-
- @AfterClass
- public static void afterClass() {
- temporaryFolder.delete();
- }
-
/** Test restoring an operator from a snapshot (local recovery
deactivated). */
- @Test
- public void testOperatorStatesSnapshotRestore() throws Exception {
- testOperatorStatesSnapshotRestoreInternal(ONLY_JM_RECOVERY);
+ @ParameterizedTest
+ @EnumSource(value = StateBackendEnum.class)
+ void testOperatorStatesSnapshotRestore(StateBackendEnum stateBackendEnum)
throws Exception {
+ testOperatorStatesSnapshotRestoreInternal(ONLY_JM_RECOVERY,
stateBackendEnum);
}
/** Test restoring an operator from a snapshot (local recovery activated).
*/
- @Test
- public void testOperatorStatesSnapshotRestoreWithLocalState() throws
Exception {
- testOperatorStatesSnapshotRestoreInternal(TM_AND_JM_RECOVERY);
+ @ParameterizedTest
+ @EnumSource(value = StateBackendEnum.class)
+ void testOperatorStatesSnapshotRestoreWithLocalState(StateBackendEnum
stateBackendEnum)
+ throws Exception {
+ testOperatorStatesSnapshotRestoreInternal(TM_AND_JM_RECOVERY,
stateBackendEnum);
}
/**
@@ -135,9 +118,11 @@ public class StreamOperatorSnapshotRestoreTest extends
TestLogger {
* <p>This case does not really simulate a practical scenario, but we make
sure that restore
* happens from the local state here because we discard the JM state.
*/
- @Test
- public void testOperatorStatesSnapshotRestoreWithLocalStateDeletedJM()
throws Exception {
- testOperatorStatesSnapshotRestoreInternal(TM_REMOVE_JM_RECOVERY);
+ @ParameterizedTest
+ @EnumSource(value = StateBackendEnum.class)
+ void
testOperatorStatesSnapshotRestoreWithLocalStateDeletedJM(StateBackendEnum
stateBackendEnum)
+ throws Exception {
+ testOperatorStatesSnapshotRestoreInternal(TM_REMOVE_JM_RECOVERY,
stateBackendEnum);
}
/**
@@ -147,12 +132,15 @@ public class StreamOperatorSnapshotRestoreTest extends
TestLogger {
* <p>This tests discards the local state, to simulate corruption and
checks that we still
* recover from the fallback JM state.
*/
- @Test
- public void testOperatorStatesSnapshotRestoreWithLocalStateDeletedTM()
throws Exception {
- testOperatorStatesSnapshotRestoreInternal(JM_REMOVE_TM_RECOVERY);
+ @ParameterizedTest
+ @EnumSource(value = StateBackendEnum.class)
+ void
testOperatorStatesSnapshotRestoreWithLocalStateDeletedTM(StateBackendEnum
stateBackendEnum)
+ throws Exception {
+ testOperatorStatesSnapshotRestoreInternal(JM_REMOVE_TM_RECOVERY,
stateBackendEnum);
}
- private void testOperatorStatesSnapshotRestoreInternal(final int mode)
throws Exception {
+ private void testOperatorStatesSnapshotRestoreInternal(
+ final int mode, StateBackendEnum stateBackendEnum) throws
Exception {
//
--------------------------------------------------------------------------
snapshot
@@ -183,7 +171,7 @@ public class StreamOperatorSnapshotRestoreTest extends
TestLogger {
mode == ONLY_JM_RECOVERY
? null
: new LocalSnapshotDirectoryProviderImpl(
- temporaryFolder.newFolder(), jobID,
jobVertexID, subtaskIdx);
+ temporaryFolder, jobID, jobVertexID,
subtaskIdx);
LocalRecoveryConfig localRecoveryConfig =
(directoryProvider == null)
@@ -259,8 +247,8 @@ public class StreamOperatorSnapshotRestoreTest extends
TestLogger {
OperatorSubtaskState taskLocalState =
snapshotWithLocalState.getTaskLocalState();
// We check if local state was created when we enabled local recovery
- Assert.assertTrue(
- mode > ONLY_JM_RECOVERY == (taskLocalState != null &&
taskLocalState.hasState()));
+ assertThat(taskLocalState != null && taskLocalState.hasState())
+ .isEqualTo(mode > ONLY_JM_RECOVERY);
if (mode == TM_REMOVE_JM_RECOVERY) {
jobManagerOwnedState.getManagedKeyedState().discardState();
@@ -302,7 +290,7 @@ public class StreamOperatorSnapshotRestoreTest extends
TestLogger {
// check restored managed keyed state
long exp = element.getValue() + 1;
long act = keyedState.value();
- Assert.assertEquals(exp, act);
+ assertThat(act).isEqualTo(exp);
} else {
// write managed keyed state that goes into snapshot
keyedState.update(element.getValue() + 1);
@@ -328,7 +316,7 @@ public class StreamOperatorSnapshotRestoreTest extends
TestLogger {
++count;
}
- Assert.assertEquals(MAX_PARALLELISM, count);
+ assertThat(count).isEqualTo(MAX_PARALLELISM);
// write raw operator state that goes into snapshot
OperatorStateCheckpointOutputStream outOp =
context.getRawOperatorStateOutput();
@@ -342,7 +330,7 @@ public class StreamOperatorSnapshotRestoreTest extends
TestLogger {
@Override
public void initializeState(StateInitializationContext context) throws
Exception {
- Assert.assertEquals(verifyRestore, context.isRestored());
+ assertThat(context.isRestored()).isEqualTo(verifyRestore);
keyedState =
context.getKeyedStateStore()
@@ -362,11 +350,11 @@ public class StreamOperatorSnapshotRestoreTest extends
TestLogger {
context.getRawKeyedStateInputs()) {
try (InputStream in = streamProvider.getStream()) {
DataInputView div = new DataInputViewStreamWrapper(in);
- Assert.assertEquals(streamProvider.getKeyGroupId() +
2, div.readInt());
+
assertThat(div.readInt()).isEqualTo(streamProvider.getKeyGroupId() + 2);
++count;
}
}
- Assert.assertEquals(MAX_PARALLELISM, count);
+ assertThat(count).isEqualTo(MAX_PARALLELISM);
// check restored managed operator state
BitSet check = new BitSet(10);
@@ -374,7 +362,7 @@ public class StreamOperatorSnapshotRestoreTest extends
TestLogger {
check.set(v);
}
- Assert.assertEquals(10, check.cardinality());
+ assertThat(check.cardinality()).isEqualTo(10);
// check restored raw operator state
check = new BitSet(13);
@@ -385,7 +373,7 @@ public class StreamOperatorSnapshotRestoreTest extends
TestLogger {
check.set(div.readInt() - 42);
}
}
- Assert.assertEquals(13, check.cardinality());
+ assertThat(check.cardinality()).isEqualTo(13);
}
}
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java
index fdc85b298b5..be58b958964 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/AbstractKeyedOperatorRestoreTestBase.java
@@ -25,19 +25,11 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase;
import org.apache.flink.test.state.operator.restore.ExecutionMode;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
/** Base class for all keyed operator restore tests. */
-@RunWith(Parameterized.class)
-public abstract class AbstractKeyedOperatorRestoreTestBase extends
AbstractOperatorRestoreTestBase {
-
- public AbstractKeyedOperatorRestoreTestBase(FlinkVersion flinkVersion) {
- super(flinkVersion);
- }
+abstract class AbstractKeyedOperatorRestoreTestBase extends
AbstractOperatorRestoreTestBase {
@Override
- public void createMigrationJob(StreamExecutionEnvironment env) {
+ protected void createMigrationJob(StreamExecutionEnvironment env) {
/** Source -> keyBy -> C(Window -> StatefulMap1 -> StatefulMap2) */
SingleOutputStreamOperator<Tuple2<Integer, Integer>> source =
KeyedJob.createIntegerTupleSource(env, ExecutionMode.MIGRATE);
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
index 98150aca70c..79be50b4f14 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
@@ -25,11 +25,7 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.state.operator.restore.ExecutionMode;
/** Test state restoration for a keyed operator restore tests. */
-public class KeyedComplexChainTest extends
AbstractKeyedOperatorRestoreTestBase {
-
- public KeyedComplexChainTest(FlinkVersion flinkVersion) {
- super(flinkVersion);
- }
+class KeyedComplexChainTest extends AbstractKeyedOperatorRestoreTestBase {
@SnapshotsGenerator
public void generateSnapshots(FlinkVersion targetVersion) throws Exception
{
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
index ea39fbc458b..58a47f5bafa 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
@@ -43,12 +43,12 @@ import
org.apache.flink.test.state.operator.restore.ExecutionMode;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ParameterTool;
-import org.junit.Assert;
-
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
+import static org.assertj.core.api.Assertions.assertThat;
+
/**
* Savepoint generator to create the savepoint used by the {@link
* AbstractKeyedOperatorRestoreTestBase}. Switch to specific version branches
and run this job to
@@ -206,15 +206,15 @@ public class KeyedJob {
while (input.hasNext() && restored.hasNext()) {
Tuple2<Integer, Integer> value = input.next();
Integer rValue = restored.next();
- Assert.assertEquals(rValue, value.f1);
+ assertThat(value.f1).isEqualTo(rValue);
}
- Assert.assertEquals(restored.hasNext(), input.hasNext());
+ assertThat(restored.hasNext()).isEqualTo(input.hasNext());
}
}
@Override
public void close() {
- Assert.assertTrue("Apply was never called.", applyCalled);
+ assertThat(applyCalled).as("Apply was never called.").isTrue();
}
}
@@ -248,17 +248,18 @@ public class KeyedJob {
break;
case MIGRATE:
case RESTORE:
- Assert.assertEquals(
- "Failed for "
- + valueToStore
- +
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
- 1,
- state.size());
- String value = state.get(0);
- Assert.assertEquals(
- valueToStore
- +
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
- value);
+ assertThat(state)
+ .as(
+ "Failed for "
+ + valueToStore
+ + getRuntimeContext()
+ .getTaskInfo()
+ .getIndexOfThisSubtask())
+ .containsExactly(
+ valueToStore
+ + getRuntimeContext()
+ .getTaskInfo()
+ .getIndexOfThisSubtask());
}
}
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
index eb5af92c2d4..998f6ead3c7 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
@@ -25,9 +25,6 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase;
import org.apache.flink.test.state.operator.restore.ExecutionMode;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
import static
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
import static
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap;
import static
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource;
@@ -35,16 +32,10 @@ import static
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.c
import static
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap;
/** Base class for all non-keyed operator restore tests. */
-@RunWith(Parameterized.class)
-public abstract class AbstractNonKeyedOperatorRestoreTestBase
- extends AbstractOperatorRestoreTestBase {
-
- protected AbstractNonKeyedOperatorRestoreTestBase(FlinkVersion
flinkVersion) {
- super(flinkVersion);
- }
+abstract class AbstractNonKeyedOperatorRestoreTestBase extends
AbstractOperatorRestoreTestBase {
@Override
- public void createMigrationJob(StreamExecutionEnvironment env) {
+ protected void createMigrationJob(StreamExecutionEnvironment env) {
/** Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map ->
StatefulMap3) */
DataStream<Integer> source = createSource(env, ExecutionMode.MIGRATE);
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
index f49805ca344..d9a6f85e2a5 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
@@ -31,11 +31,7 @@ import static
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.c
import static
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap;
/** Verifies that the state of all operators is restored if a topology change
breaks up a chain. */
-public class ChainBreakTest extends AbstractNonKeyedOperatorRestoreTestBase {
-
- public ChainBreakTest(FlinkVersion flinkVersion) {
- super(flinkVersion);
- }
+class ChainBreakTest extends AbstractNonKeyedOperatorRestoreTestBase {
@SnapshotsGenerator
public void generateSnapshots(FlinkVersion targetVersion) throws Exception
{
@@ -43,7 +39,7 @@ public class ChainBreakTest extends
AbstractNonKeyedOperatorRestoreTestBase {
}
@Override
- public void createRestoredJob(StreamExecutionEnvironment env) {
+ protected void createRestoredJob(StreamExecutionEnvironment env) {
/**
* Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map
-> StatefulMap3)
* Modified job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map)
-> StatefulMap3
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
index 516450c54ed..d83091e3955 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.test.state.operator.restore.unkeyed;
-import org.apache.flink.FlinkVersion;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -35,14 +34,10 @@ import static
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.c
*
* <p>The test shares the snapshots generated by {@link ChainBreakTest}.
*/
-public class ChainLengthDecreaseTest extends
AbstractNonKeyedOperatorRestoreTestBase {
-
- public ChainLengthDecreaseTest(FlinkVersion flinkVersion) {
- super(flinkVersion);
- }
+class ChainLengthDecreaseTest extends AbstractNonKeyedOperatorRestoreTestBase {
@Override
- public void createRestoredJob(StreamExecutionEnvironment env) {
+ protected void createRestoredJob(StreamExecutionEnvironment env) {
/**
* Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map
-> StatefulMap3)
* Modified job: Source -> StatefulMap1 -> CHAIN(Map -> StatefulMap3)
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
index 6239564975e..2cd02967488 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.test.state.operator.restore.unkeyed;
-import org.apache.flink.FlinkVersion;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -36,14 +35,10 @@ import static
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.c
*
* <p>The test shares the snapshots generated by {@link ChainBreakTest}.
*/
-public class ChainLengthIncreaseTest extends
AbstractNonKeyedOperatorRestoreTestBase {
-
- public ChainLengthIncreaseTest(FlinkVersion flinkVersion) {
- super(flinkVersion);
- }
+class ChainLengthIncreaseTest extends AbstractNonKeyedOperatorRestoreTestBase {
@Override
- public void createRestoredJob(StreamExecutionEnvironment env) {
+ protected void createRestoredJob(StreamExecutionEnvironment env) {
/**
* Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map
-> StatefulMap3)
* Modified job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map
-> StatefulMap3 ->
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthStatelessDecreaseTest.java
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthStatelessDecreaseTest.java
index 771e3caa53b..93f4b4b4fbd 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthStatelessDecreaseTest.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthStatelessDecreaseTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.test.state.operator.restore.unkeyed;
-import org.apache.flink.FlinkVersion;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -38,14 +37,10 @@ import static
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.c
*
* <p>The test shares the snapshots generated by {@link ChainBreakTest}.
*/
-public class ChainLengthStatelessDecreaseTest extends
AbstractNonKeyedOperatorRestoreTestBase {
-
- public ChainLengthStatelessDecreaseTest(FlinkVersion flinkVersion) {
- super(flinkVersion);
- }
+class ChainLengthStatelessDecreaseTest extends
AbstractNonKeyedOperatorRestoreTestBase {
@Override
- public void createRestoredJob(StreamExecutionEnvironment env) {
+ protected void createRestoredJob(StreamExecutionEnvironment env) {
/*
* Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map
-> StatefulMap3)
* Modified job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 ->
StatefulMap3)
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
index f15b5217f4a..7c12d51eb23 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.test.state.operator.restore.unkeyed;
-import org.apache.flink.FlinkVersion;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -36,14 +35,10 @@ import static
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.c
*
* <p>The test shares the snapshots generated by {@link ChainBreakTest}.
*/
-public class ChainOrderTest extends AbstractNonKeyedOperatorRestoreTestBase {
-
- public ChainOrderTest(FlinkVersion flinkVersion) {
- super(flinkVersion);
- }
+class ChainOrderTest extends AbstractNonKeyedOperatorRestoreTestBase {
@Override
- public void createRestoredJob(StreamExecutionEnvironment env) {
+ protected void createRestoredJob(StreamExecutionEnvironment env) {
/**
* Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map
-> StatefulMap3)
* Modified job: Source -> StatefulMap1 -> CHAIN(StatefulMap3 -> Map
-> StatefulMap2)
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
index 769a07fc08c..c7072b40b67 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.test.state.operator.restore.unkeyed;
-import org.apache.flink.FlinkVersion;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -35,14 +34,10 @@ import static
org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.c
*
* <p>The test shares the snapshots generated by {@link ChainBreakTest}.
*/
-public class ChainUnionTest extends AbstractNonKeyedOperatorRestoreTestBase {
-
- public ChainUnionTest(FlinkVersion flinkVersion) {
- super(flinkVersion);
- }
+class ChainUnionTest extends AbstractNonKeyedOperatorRestoreTestBase {
@Override
- public void createRestoredJob(StreamExecutionEnvironment env) {
+ protected void createRestoredJob(StreamExecutionEnvironment env) {
/**
* Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map
-> StatefulMap3)
* Modified job: Source -> CHAIN(StatefulMap1 -> StatefulMap2 -> Map
-> StatefulMap3)
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
index f7cf09604b6..c815349b8d8 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
@@ -34,11 +34,11 @@ import org.apache.flink.streaming.util.StateBackendUtils;
import org.apache.flink.test.state.operator.restore.ExecutionMode;
import org.apache.flink.util.ParameterTool;
-import org.junit.Assert;
-
import java.util.Arrays;
import java.util.List;
+import static org.assertj.core.api.Assertions.assertThat;
+
/**
* Savepoint generator to create the savepoint used by the {@link
* AbstractNonKeyedOperatorRestoreTestBase}. Switch to specific version
branches and run this job to
@@ -148,17 +148,18 @@ public class NonKeyedJob {
break;
case MIGRATE:
case RESTORE:
- Assert.assertEquals(
- "Failed for "
- + valueToStore
- +
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
- 1,
- state.size());
- String value = state.get(0);
- Assert.assertEquals(
- valueToStore
- +
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
- value);
+ assertThat(state)
+ .as(
+ "Failed for "
+ + valueToStore
+ + getRuntimeContext()
+ .getTaskInfo()
+ .getIndexOfThisSubtask())
+ .containsExactly(
+ valueToStore
+ + getRuntimeContext()
+ .getTaskInfo()
+ .getIndexOfThisSubtask());
}
}
}