This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d788f0a2af3e13ba2b56f6abd88531332cf62593 Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Fri Jul 22 15:42:52 2022 +0200 [FLINK-28644][tests] Migrate state-processing-api to new collectAsync() --- .../flink/state/api/SavepointWriterITCase.java | 52 +++----- .../state/api/SavepointWriterWindowITCase.java | 82 +++++------- .../flink/state/api/WritableSavepointITCase.java | 50 +++---- .../state/api/WritableSavepointWindowITCase.java | 86 ++++++------ .../flink/streaming/util/StreamCollector.java | 146 --------------------- .../streaming/util/StreamCollectorExtension.java | 146 --------------------- 6 files changed, 104 insertions(+), 458 deletions(-) diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java index 28e9e1c3f2f..9c3febdd01e 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java @@ -27,10 +27,8 @@ import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; -import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -44,14 +42,13 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; -import org.apache.flink.streaming.util.StreamCollector; +import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.util.AbstractID; +import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.Collector; -import org.apache.flink.util.SerializedThrowable; import org.junit.Assert; -import org.junit.Rule; import org.junit.Test; import java.util.ArrayList; @@ -60,9 +57,9 @@ import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Objects; -import java.util.Optional; import java.util.Set; -import java.util.concurrent.CompletableFuture; + +import static org.assertj.core.api.Assertions.assertThat; /** IT test for writing savepoints. */ public class SavepointWriterITCase extends AbstractTestBase { @@ -82,8 +79,6 @@ public class SavepointWriterITCase extends AbstractTestBase { private static final Collection<CurrencyRate> currencyRates = Arrays.asList(new CurrencyRate("USD", 1.0), new CurrencyRate("EUR", 1.3)); - @Rule public StreamCollector collector = new StreamCollector(); - @Test public void testDefaultStateBackend() throws Exception { testStateBootstrapAndModification(null); @@ -151,7 +146,7 @@ public class SavepointWriterITCase extends AbstractTestBase { .flatMap(new UpdateAndGetAccount()) .uid(ACCOUNT_UID); - CompletableFuture<Collection<Account>> results = collector.collect(stream); + final CloseableIterator<Account> results = stream.collectAsync(); env.fromCollection(currencyRates) .connect(env.fromCollection(currencyRates).broadcast(descriptor)) @@ -159,22 +154,14 @@ public class SavepointWriterITCase extends AbstractTestBase { .uid(CURRENCY_UID) .addSink(new DiscardingSink<>()); - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - jobGraph.setSavepointRestoreSettings( + final StreamGraph streamGraph = env.getStreamGraph(); + streamGraph.setSavepointRestoreSettings( SavepointRestoreSettings.forPath(savepointPath, false)); - ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient(); - Optional<SerializedThrowable> serializedThrowable = - client.submitJob(jobGraph) - .thenCompose(client::requestJobResult) - .get() - .getSerializedThrowable(); - - serializedThrowable.ifPresent( - t -> { - throw new AssertionError("Unexpected exception during bootstrapping", t); - }); - Assert.assertEquals("Unexpected output", 3, results.get().size()); + env.execute(streamGraph); + + assertThat(results).toIterable().hasSize(3); + results.close(); } private void modifySavepoint(StateBackend backend, String savepointPath, String modifyPath) @@ -210,26 +197,21 @@ public class SavepointWriterITCase extends AbstractTestBase { .flatMap(new UpdateAndGetAccount()) .uid(ACCOUNT_UID); - CompletableFuture<Collection<Account>> results = collector.collect(stream); + final CloseableIterator<Account> results = stream.collectAsync(); stream.map(acc -> acc.id) .map(new StatefulOperator()) .uid(MODIFY_UID) .addSink(new DiscardingSink<>()); - JobGraph jobGraph = sEnv.getStreamGraph().getJobGraph(); - jobGraph.setSavepointRestoreSettings( + final StreamGraph streamGraph = sEnv.getStreamGraph(); + streamGraph.setSavepointRestoreSettings( SavepointRestoreSettings.forPath(savepointPath, false)); - ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient(); - Optional<SerializedThrowable> serializedThrowable = - client.submitJob(jobGraph) - .thenCompose(client::requestJobResult) - .get() - .getSerializedThrowable(); + sEnv.execute(streamGraph); - Assert.assertFalse(serializedThrowable.isPresent()); - Assert.assertEquals("Unexpected output", 3, results.get().size()); + assertThat(results).toIterable().hasSize(3); + results.close(); } /** A simple pojo. */ diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java index 89e8edb7bc6..9d61ce14d9c 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java @@ -27,9 +27,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.client.program.ClusterClient; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; -import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; @@ -40,21 +38,17 @@ import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; +import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.evictors.CountEvictor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.streaming.util.StreamCollector; import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.util.AbstractID; +import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.Collector; -import org.apache.flink.util.SerializedThrowable; -import org.hamcrest.Matcher; -import org.hamcrest.Matchers; -import org.junit.Assert; -import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -64,9 +58,8 @@ import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; /** IT Test for writing savepoints to the {@code WindowOperator}. */ @SuppressWarnings("unchecked") @@ -78,13 +71,11 @@ public class SavepointWriterWindowITCase extends AbstractTestBase { private static final Collection<String> WORDS = Arrays.asList("hello", "world", "hello", "everyone"); - private static final Matcher<Iterable<? extends Tuple2<String, Integer>>> STANDARD_MATCHER = - Matchers.containsInAnyOrder( - Tuple2.of("hello", 2), Tuple2.of("world", 1), Tuple2.of("everyone", 1)); + private static final Iterable<? extends Tuple2<String, Integer>> STANDARD_MATCHER = + Arrays.asList(Tuple2.of("hello", 2), Tuple2.of("world", 1), Tuple2.of("everyone", 1)); - private static final Matcher<Iterable<? extends Tuple2<String, Integer>>> EVICTOR_MATCHER = - Matchers.containsInAnyOrder( - Tuple2.of("hello", 1), Tuple2.of("world", 1), Tuple2.of("everyone", 1)); + private static final Iterable<? extends Tuple2<String, Integer>> EVICTOR_MATCHER = + Arrays.asList(Tuple2.of("hello", 1), Tuple2.of("world", 1), Tuple2.of("everyone", 1)); private static final TypeInformation<Tuple2<String, Integer>> TUPLE_TYPE_INFO = new TypeHint<Tuple2<String, Integer>>() {}.getTypeInfo(); @@ -130,8 +121,6 @@ public class SavepointWriterWindowITCase extends AbstractTestBase { return parameterList; } - @Rule public StreamCollector collector = new StreamCollector(); - private final WindowBootstrap windowBootstrap; private final WindowStream windowStream; @@ -182,12 +171,14 @@ public class SavepointWriterWindowITCase extends AbstractTestBase { .window(TumblingEventTimeWindows.of(Time.milliseconds(5))); DataStream<Tuple2<String, Integer>> windowed = windowStream.window(stream).uid(UID); - CompletableFuture<Collection<Tuple2<String, Integer>>> future = collector.collect(windowed); + CloseableIterator<Tuple2<String, Integer>> future = windowed.collectAsync(); submitJob(savepointPath, env); - Collection<Tuple2<String, Integer>> results = future.get(); - Assert.assertThat("Incorrect results from bootstrapped windows", results, STANDARD_MATCHER); + assertThat(future) + .toIterable() + .as("Incorrect results from bootstrapped windows") + .containsAll(STANDARD_MATCHER); } @Test @@ -225,12 +216,14 @@ public class SavepointWriterWindowITCase extends AbstractTestBase { .evictor(CountEvictor.of(1)); DataStream<Tuple2<String, Integer>> windowed = windowStream.window(stream).uid(UID); - CompletableFuture<Collection<Tuple2<String, Integer>>> future = collector.collect(windowed); + CloseableIterator<Tuple2<String, Integer>> future = windowed.collectAsync(); submitJob(savepointPath, env); - Collection<Tuple2<String, Integer>> results = future.get(); - Assert.assertThat("Incorrect results from bootstrapped windows", results, EVICTOR_MATCHER); + assertThat(future) + .toIterable() + .as("Incorrect results from bootstrapped windows") + .containsAll(EVICTOR_MATCHER); } @Test @@ -269,13 +262,14 @@ public class SavepointWriterWindowITCase extends AbstractTestBase { Time.milliseconds(5), Time.milliseconds(1))); DataStream<Tuple2<String, Integer>> windowed = windowStream.window(stream).uid(UID); - CompletableFuture<Collection<Tuple2<String, Integer>>> future = collector.collect(windowed); + CloseableIterator<Tuple2<String, Integer>> future = windowed.collectAsync(); submitJob(savepointPath, env); - Collection<Tuple2<String, Integer>> results = - future.get().stream().distinct().collect(Collectors.toList()); - Assert.assertThat("Incorrect results from bootstrapped windows", results, STANDARD_MATCHER); + assertThat(future) + .toIterable() + .as("Incorrect results from bootstrapped windows") + .containsAll(STANDARD_MATCHER); } @Test @@ -317,30 +311,22 @@ public class SavepointWriterWindowITCase extends AbstractTestBase { .evictor(CountEvictor.of(1)); DataStream<Tuple2<String, Integer>> windowed = windowStream.window(stream).uid(UID); - CompletableFuture<Collection<Tuple2<String, Integer>>> future = collector.collect(windowed); + CloseableIterator<Tuple2<String, Integer>> future = windowed.collectAsync(); submitJob(savepointPath, env); - Collection<Tuple2<String, Integer>> results = - future.get().stream().distinct().collect(Collectors.toList()); - Assert.assertThat("Incorrect results from bootstrapped windows", results, EVICTOR_MATCHER); + assertThat(future) + .toIterable() + .as("Incorrect results from bootstrapped windows") + .containsAll(EVICTOR_MATCHER); } - private void submitJob(String savepointPath, StreamExecutionEnvironment sEnv) { - JobGraph jobGraph = sEnv.getStreamGraph().getJobGraph(); - jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, true)); - - ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient(); - try { - Optional<SerializedThrowable> serializedThrowable = - client.submitJob(jobGraph) - .thenCompose(client::requestJobResult) - .get() - .getSerializedThrowable(); - Assert.assertFalse(serializedThrowable.isPresent()); - } catch (Throwable t) { - Assert.fail("Failed to submit job"); - } + private void submitJob(String savepointPath, StreamExecutionEnvironment sEnv) throws Exception { + StreamGraph streamGraph = sEnv.getStreamGraph(); + streamGraph.setSavepointRestoreSettings( + SavepointRestoreSettings.forPath(savepointPath, true)); + + sEnv.execute(streamGraph); } private static class Reducer implements ReduceFunction<Tuple2<String, Integer>> { diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointITCase.java index c70787d0094..c881096b178 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointITCase.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointITCase.java @@ -28,11 +28,9 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; -import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -47,14 +45,13 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; -import org.apache.flink.streaming.util.StreamCollector; +import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.util.AbstractID; +import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.Collector; -import org.apache.flink.util.SerializedThrowable; import org.junit.Assert; -import org.junit.Rule; import org.junit.Test; import java.util.ArrayList; @@ -63,9 +60,9 @@ import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Objects; -import java.util.Optional; import java.util.Set; -import java.util.concurrent.CompletableFuture; + +import static org.assertj.core.api.Assertions.assertThat; /** IT test for writing savepoints. */ public class WritableSavepointITCase extends AbstractTestBase { @@ -86,8 +83,6 @@ public class WritableSavepointITCase extends AbstractTestBase { private static final Collection<CurrencyRate> currencyRates = Arrays.asList(new CurrencyRate("USD", 1.0), new CurrencyRate("EUR", 1.3)); - @Rule public StreamCollector collector = new StreamCollector(); - @Test public void testFsStateBackend() throws Exception { testStateBootstrapAndModification( @@ -161,7 +156,7 @@ public class WritableSavepointITCase extends AbstractTestBase { .flatMap(new UpdateAndGetAccount()) .uid(ACCOUNT_UID); - CompletableFuture<Collection<Account>> results = collector.collect(stream); + CloseableIterator<Account> results = stream.collectAsync(); sEnv.fromCollection(currencyRates) .connect(sEnv.fromCollection(currencyRates).broadcast(descriptor)) @@ -169,22 +164,13 @@ public class WritableSavepointITCase extends AbstractTestBase { .uid(CURRENCY_UID) .addSink(new DiscardingSink<>()); - JobGraph jobGraph = sEnv.getStreamGraph().getJobGraph(); - jobGraph.setSavepointRestoreSettings( + StreamGraph streamGraph = sEnv.getStreamGraph(); + streamGraph.setSavepointRestoreSettings( SavepointRestoreSettings.forPath(savepointPath, false)); - ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient(); - Optional<SerializedThrowable> serializedThrowable = - client.submitJob(jobGraph) - .thenCompose(client::requestJobResult) - .get() - .getSerializedThrowable(); - - serializedThrowable.ifPresent( - t -> { - throw new AssertionError("Unexpected exception during bootstrapping", t); - }); - Assert.assertEquals("Unexpected output", 3, results.get().size()); + sEnv.execute(streamGraph); + + assertThat(results).toIterable().hasSize(3); } private void modifySavepoint(StateBackend backend, String savepointPath, String modifyPath) @@ -214,26 +200,20 @@ public class WritableSavepointITCase extends AbstractTestBase { .flatMap(new UpdateAndGetAccount()) .uid(ACCOUNT_UID); - CompletableFuture<Collection<Account>> results = collector.collect(stream); + CloseableIterator<Account> results = stream.collectAsync(); stream.map(acc -> acc.id) .map(new StatefulOperator()) .uid(MODIFY_UID) .addSink(new DiscardingSink<>()); - JobGraph jobGraph = sEnv.getStreamGraph().getJobGraph(); - jobGraph.setSavepointRestoreSettings( + StreamGraph streamGraph = sEnv.getStreamGraph(); + streamGraph.setSavepointRestoreSettings( SavepointRestoreSettings.forPath(savepointPath, false)); - ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient(); - Optional<SerializedThrowable> serializedThrowable = - client.submitJob(jobGraph) - .thenCompose(client::requestJobResult) - .get() - .getSerializedThrowable(); + sEnv.execute(streamGraph); - Assert.assertFalse(serializedThrowable.isPresent()); - Assert.assertEquals("Unexpected output", 3, results.get().size()); + assertThat(results).toIterable().hasSize(3); } /** A simple pojo. */ diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointWindowITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointWindowITCase.java index 7e16b891a00..4115b7645cd 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointWindowITCase.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointWindowITCase.java @@ -27,10 +27,8 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.client.program.ClusterClient; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; -import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; @@ -42,21 +40,17 @@ import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; +import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.evictors.CountEvictor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.streaming.util.StreamCollector; import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.util.AbstractID; +import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.Collector; -import org.apache.flink.util.SerializedThrowable; -import org.hamcrest.Matcher; -import org.hamcrest.Matchers; -import org.junit.Assert; -import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -64,11 +58,10 @@ import org.junit.runners.Parameterized; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; + +import static org.assertj.core.api.Assertions.assertThat; /** IT Test for writing savepoints to the {@code WindowOperator}. */ @SuppressWarnings("unchecked") @@ -80,13 +73,11 @@ public class WritableSavepointWindowITCase extends AbstractTestBase { private static final Collection<String> WORDS = Arrays.asList("hello", "world", "hello", "everyone"); - private static final Matcher<Iterable<? extends Tuple2<String, Integer>>> STANDARD_MATCHER = - Matchers.containsInAnyOrder( - Tuple2.of("hello", 2), Tuple2.of("world", 1), Tuple2.of("everyone", 1)); + private static final Iterable<? extends Tuple2<String, Integer>> STANDARD_MATCHER = + Arrays.asList(Tuple2.of("hello", 2), Tuple2.of("world", 1), Tuple2.of("everyone", 1)); - private static final Matcher<Iterable<? extends Tuple2<String, Integer>>> EVICTOR_MATCHER = - Matchers.containsInAnyOrder( - Tuple2.of("hello", 1), Tuple2.of("world", 1), Tuple2.of("everyone", 1)); + private static final Iterable<? extends Tuple2<String, Integer>> EVICTOR_MATCHER = + Arrays.asList(Tuple2.of("hello", 1), Tuple2.of("world", 1), Tuple2.of("everyone", 1)); private static final TypeInformation<Tuple2<String, Integer>> TUPLE_TYPE_INFO = new TypeHint<Tuple2<String, Integer>>() {}.getTypeInfo(); @@ -136,8 +127,6 @@ public class WritableSavepointWindowITCase extends AbstractTestBase { return parameterList; } - @Rule public StreamCollector collector = new StreamCollector(); - private final WindowBootstrap windowBootstrap; private final WindowStream windowStream; @@ -185,12 +174,14 @@ public class WritableSavepointWindowITCase extends AbstractTestBase { .window(TumblingEventTimeWindows.of(Time.milliseconds(5))); DataStream<Tuple2<String, Integer>> windowed = windowStream.window(stream).uid(UID); - CompletableFuture<Collection<Tuple2<String, Integer>>> future = collector.collect(windowed); + CloseableIterator<Tuple2<String, Integer>> future = windowed.collectAsync(); submitJob(savepointPath, sEnv); - Collection<Tuple2<String, Integer>> results = future.get(); - Assert.assertThat("Incorrect results from bootstrapped windows", results, STANDARD_MATCHER); + assertThat(future) + .toIterable() + .as("Incorrect results from bootstrapped windows") + .containsExactlyInAnyOrderElementsOf(STANDARD_MATCHER); } @Test @@ -223,12 +214,14 @@ public class WritableSavepointWindowITCase extends AbstractTestBase { .evictor(CountEvictor.of(1)); DataStream<Tuple2<String, Integer>> windowed = windowStream.window(stream).uid(UID); - CompletableFuture<Collection<Tuple2<String, Integer>>> future = collector.collect(windowed); + CloseableIterator<Tuple2<String, Integer>> future = windowed.collectAsync(); submitJob(savepointPath, sEnv); - Collection<Tuple2<String, Integer>> results = future.get(); - Assert.assertThat("Incorrect results from bootstrapped windows", results, EVICTOR_MATCHER); + assertThat(future) + .toIterable() + .as("Incorrect results from bootstrapped windows") + .containsExactlyInAnyOrderElementsOf(EVICTOR_MATCHER); } @Test @@ -264,13 +257,16 @@ public class WritableSavepointWindowITCase extends AbstractTestBase { Time.milliseconds(5), Time.milliseconds(1))); DataStream<Tuple2<String, Integer>> windowed = windowStream.window(stream).uid(UID); - CompletableFuture<Collection<Tuple2<String, Integer>>> future = collector.collect(windowed); + CloseableIterator<Tuple2<String, Integer>> future = windowed.collectAsync(); submitJob(savepointPath, sEnv); - Collection<Tuple2<String, Integer>> results = future.get(); - Assert.assertEquals("Incorrect number of results", 15, results.size()); - Assert.assertThat("Incorrect bootstrap state", new HashSet<>(results), STANDARD_MATCHER); + assertThat(future) + .toIterable() + .as("Incorrect number of results") + .hasSize(15) + .as("Incorrect results from bootstrapped windows") + .containsAll(STANDARD_MATCHER); } @Test @@ -308,30 +304,24 @@ public class WritableSavepointWindowITCase extends AbstractTestBase { .evictor(CountEvictor.of(1)); DataStream<Tuple2<String, Integer>> windowed = windowStream.window(stream).uid(UID); - CompletableFuture<Collection<Tuple2<String, Integer>>> future = collector.collect(windowed); + CloseableIterator<Tuple2<String, Integer>> future = windowed.collectAsync(); submitJob(savepointPath, sEnv); - Collection<Tuple2<String, Integer>> results = future.get(); - Assert.assertEquals("Incorrect number of results", 15, results.size()); - Assert.assertThat("Incorrect bootstrap state", new HashSet<>(results), EVICTOR_MATCHER); + assertThat(future) + .toIterable() + .as("Incorrect number of results") + .hasSize(15) + .as("Incorrect results bootstrapped windows") + .containsAll(EVICTOR_MATCHER); } - private void submitJob(String savepointPath, StreamExecutionEnvironment sEnv) { - JobGraph jobGraph = sEnv.getStreamGraph().getJobGraph(); - jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, true)); - - ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient(); - try { - Optional<SerializedThrowable> serializedThrowable = - client.submitJob(jobGraph) - .thenCompose(client::requestJobResult) - .get() - .getSerializedThrowable(); - Assert.assertFalse(serializedThrowable.isPresent()); - } catch (Throwable t) { - Assert.fail("Failed to submit job"); - } + private void submitJob(String savepointPath, StreamExecutionEnvironment sEnv) throws Exception { + StreamGraph streamGraph = sEnv.getStreamGraph(); + streamGraph.setSavepointRestoreSettings( + SavepointRestoreSettings.forPath(savepointPath, true)); + + sEnv.execute(streamGraph); } private static class Reducer implements ReduceFunction<Tuple2<String, Integer>> { diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamCollector.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamCollector.java deleted file mode 100644 index 1d096521e76..00000000000 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamCollector.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; - -import org.junit.rules.ExternalResource; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicLong; - -/** - * A simple utility for collecting all the elements in a {@link DataStream}. - * - * <pre>{@code - * public class DataStreamTest { - * - * {@literal @}Rule - * public StreamCollector collector = new StreamCollector(); - * - * public void test() throws Exception { - * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - * DataStream<Integer> stream = env.fromElements(1, 2, 3); - * - * CompletableFuture<Collection<Integer>> results = collector.collect(stream); - * Assert.assertThat(results.get(), hasItems(1, 2, 3)); - * } - * } - * }</pre> - * - * <p><b>Note:</b> The stream collector assumes: 1) The stream is bounded. 2) All elements will fit - * in memory. 3) All tasks run within the same JVM. - */ -@SuppressWarnings("rawtypes") -public class StreamCollector extends ExternalResource { - - private static final AtomicLong counter = new AtomicLong(); - - private static final Map<Long, CountDownLatch> latches = new ConcurrentHashMap<>(); - - private static final Map<Long, Queue> resultQueues = new ConcurrentHashMap<>(); - - private List<Long> ids; - - @Override - protected void before() { - ids = new ArrayList<>(); - } - - /** - * @return A future that contains all the elements of the DataStream which completes when all - * elements have been processed. - */ - public <IN> CompletableFuture<Collection<IN>> collect(DataStream<IN> stream) { - final long id = counter.getAndIncrement(); - ids.add(id); - - int parallelism = stream.getParallelism(); - if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) { - parallelism = stream.getExecutionEnvironment().getParallelism(); - } - - CountDownLatch latch = new CountDownLatch(parallelism); - latches.put(id, latch); - - Queue<IN> results = new ConcurrentLinkedDeque<>(); - resultQueues.put(id, results); - - stream.addSink(new CollectingSink<>(id)); - - return CompletableFuture.runAsync( - () -> { - try { - latch.await(); - } catch (InterruptedException e) { - throw new RuntimeException("Failed to collect results"); - } - }) - .thenApply(ignore -> results); - } - - @Override - protected void after() { - for (Long id : ids) { - latches.remove(id); - resultQueues.remove(id); - } - } - - private static class CollectingSink<IN> extends RichSinkFunction<IN> { - - private final long id; - - private transient CountDownLatch latch; - - private transient Queue<IN> results; - - private CollectingSink(long id) { - this.id = id; - } - - @Override - @SuppressWarnings("unchecked") - public void open(Configuration parameters) throws Exception { - latch = StreamCollector.latches.get(id); - results = (Queue<IN>) StreamCollector.resultQueues.get(id); - } - - @Override - public void invoke(IN value, Context context) throws Exception { - results.add(value); - } - - @Override - public void close() throws Exception { - latch.countDown(); - } - } -} diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamCollectorExtension.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamCollectorExtension.java deleted file mode 100644 index d7be3bf41c1..00000000000 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamCollectorExtension.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.util; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; - -import org.junit.jupiter.api.extension.AfterEachCallback; -import org.junit.jupiter.api.extension.BeforeEachCallback; -import org.junit.jupiter.api.extension.ExtensionContext; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicLong; - -/** - * A simple utility for collecting all the elements in a {@link DataStream}. - * - * <pre>{@code - * public class DataStreamTest { - * - * {@literal @}RegisterExtension - * public StreamCollectorExtension collector = new StreamCollectorExtension(); - * - * public void test() throws Exception { - * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - * DataStream<Integer> stream = env.fromElements(1, 2, 3); - * - * CompletableFuture<Collection<Integer>> results = collector.collect(stream); - * Assert.assertThat(results.get(), hasItems(1, 2, 3)); - * } - * } - * }</pre> - * - * <p><b>Note:</b> The stream collector assumes: 1) The stream is bounded. 2) All elements will fit - * in memory. 3) All tasks run within the same JVM. - */ -public class StreamCollectorExtension implements BeforeEachCallback, AfterEachCallback { - private static final AtomicLong counter = new AtomicLong(); - - private static final Map<Long, CountDownLatch> latches = new ConcurrentHashMap<>(); - - private static final Map<Long, Queue> resultQueues = new ConcurrentHashMap<>(); - - private List<Long> ids; - - @Override - public void beforeEach(ExtensionContext context) throws Exception { - ids = new ArrayList<>(); - } - - /** - * @return A future that contains all the elements of the DataStream which completes when all - * elements have been processed. - */ - public <IN> CompletableFuture<Collection<IN>> collect(DataStream<IN> stream) { - final long id = counter.getAndIncrement(); - ids.add(id); - - int parallelism = stream.getParallelism(); - if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) { - parallelism = stream.getExecutionEnvironment().getParallelism(); - } - - CountDownLatch latch = new CountDownLatch(parallelism); - latches.put(id, latch); - - Queue<IN> results = new ConcurrentLinkedDeque<>(); - resultQueues.put(id, results); - - stream.addSink(new StreamCollectorExtension.CollectingSink<>(id)); - - return CompletableFuture.runAsync( - () -> { - try { - latch.await(); - } catch (InterruptedException e) { - throw new RuntimeException("Failed to collect results"); - } - }) - .thenApply(ignore -> results); - } - - @Override - public void afterEach(ExtensionContext context) throws Exception { - for (Long id : ids) { - latches.remove(id); - resultQueues.remove(id); - } - } - - private static class CollectingSink<IN> extends RichSinkFunction<IN> { - - private final long id; - - private transient CountDownLatch latch; - - private transient Queue<IN> results; - - private CollectingSink(long id) { - this.id = id; - } - - @Override - @SuppressWarnings("unchecked") - public void open(Configuration parameters) throws Exception { - latch = StreamCollectorExtension.latches.get(id); - results = (Queue<IN>) StreamCollectorExtension.resultQueues.get(id); - } - - @Override - public void invoke(IN value, Context context) throws Exception { - results.add(value); - } - - @Override - public void close() throws Exception { - latch.countDown(); - } - } -}