This is an automated email from the ASF dual-hosted git repository.
ruanhang1993 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 583ce7140c6 [FLINK-38488][tests] Use 'throws Exception' instead of
try-catch-fail in tests in flink-tests (#27092)
583ce7140c6 is described below
commit 583ce7140c669ce6537d6ee76ed1ba331892aced
Author: Mingliang Liu <[email protected]>
AuthorDate: Thu Nov 6 00:51:25 2025 -0800
[FLINK-38488][tests] Use 'throws Exception' instead of try-catch-fail in
tests in flink-tests (#27092)
---
.../EventTimeAllWindowCheckpointingITCase.java | 549 ++++++++-------
.../EventTimeWindowCheckpointingITCase.java | 733 ++++++++++-----------
.../ProcessingTimeWindowCheckpointingITCase.java | 352 +++++-----
.../test/checkpointing/RegionFailoverITCase.java | 15 +-
.../StreamCheckpointNotifierITCase.java | 98 ++-
.../StreamFaultToleranceTestBase.java | 32 +-
.../TimestampedFileInputSplitTest.java | 12 +-
.../test/example/client/LocalExecutorITCase.java | 43 +-
.../apache/flink/test/io/InputOutputITCase.java | 11 +-
.../flink/test/misc/CustomSerializationITCase.java | 21 +-
.../flink/test/misc/MiscellaneousIssuesITCase.java | 123 ++--
...tractTaskManagerProcessFailureRecoveryTest.java | 4 +-
.../flink/test/runtime/IPv6HostnamesITCase.java | 82 +--
.../test/state/StateHandleSerializationTest.java | 21 +-
.../test/streaming/runtime/PartitionerITCase.java | 9 +-
15 files changed, 968 insertions(+), 1137 deletions(-)
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index 003308e9a6c..5ad625971d5 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -47,7 +47,6 @@ import java.time.Duration;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
/**
* This verifies that checkpointing works correctly with event time windows.
@@ -79,320 +78,300 @@ public class EventTimeAllWindowCheckpointingITCase
extends TestLogger {
// ------------------------------------------------------------------------
@Test
- public void testTumblingTimeWindow() {
+ public void testTumblingTimeWindow() throws Exception {
final int numElementsPerKey = 3000;
final int windowSize = 100;
final int numKeys = 1;
- try {
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(PARALLELISM);
- env.enableCheckpointing(100);
- RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1,
0L);
-
- env.addSource(
- new FailingSource(
- new
EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(
- numKeys, windowSize),
- numElementsPerKey))
- .rebalance()
-
.windowAll(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize)))
- .apply(
- new RichAllWindowFunction<
- Tuple2<Long, IntType>,
- Tuple4<Long, Long, Long, IntType>,
- TimeWindow>() {
-
- private boolean open = false;
-
- @Override
- public void open(OpenContext openContext) {
- assertEquals(
- 1,
- getRuntimeContext()
- .getTaskInfo()
-
.getNumberOfParallelSubtasks());
- open = true;
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+ env.enableCheckpointing(100);
+ RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L);
+
+ env.addSource(
+ new FailingSource(
+ new
EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(
+ numKeys, windowSize),
+ numElementsPerKey))
+ .rebalance()
+
.windowAll(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize)))
+ .apply(
+ new RichAllWindowFunction<
+ Tuple2<Long, IntType>,
+ Tuple4<Long, Long, Long, IntType>,
+ TimeWindow>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(OpenContext openContext) {
+ assertEquals(
+ 1,
+ getRuntimeContext()
+ .getTaskInfo()
+
.getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public void apply(
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> values,
+ Collector<Tuple4<Long, Long, Long,
IntType>> out) {
+
+ // validate that the function has been opened
properly
+ assertTrue(open);
+
+ int sum = 0;
+ long key = -1;
+
+ for (Tuple2<Long, IntType> value : values) {
+ sum += value.f1.value;
+ key = value.f0;
}
-
- @Override
- public void apply(
- TimeWindow window,
- Iterable<Tuple2<Long, IntType>> values,
- Collector<Tuple4<Long, Long, Long,
IntType>> out) {
-
- // validate that the function has been
opened properly
- assertTrue(open);
-
- int sum = 0;
- long key = -1;
-
- for (Tuple2<Long, IntType> value : values)
{
- sum += value.f1.value;
- key = value.f0;
- }
- out.collect(
- new Tuple4<>(
- key,
- window.getStart(),
- window.getEnd(),
- new IntType(sum)));
- }
- })
- .addSink(
- new ValidatingSink<>(
- new
EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(
- numElementsPerKey),
- new
EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(
- numKeys, numElementsPerKey,
windowSize)))
- .setParallelism(1);
-
- env.execute("Tumbling Window Test");
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ out.collect(
+ new Tuple4<>(
+ key,
+ window.getStart(),
+ window.getEnd(),
+ new IntType(sum)));
+ }
+ })
+ .addSink(
+ new ValidatingSink<>(
+ new
EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(
+ numElementsPerKey),
+ new
EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(
+ numKeys, numElementsPerKey,
windowSize)))
+ .setParallelism(1);
+
+ env.execute("Tumbling Window Test");
}
@Test
- public void testSlidingTimeWindow() {
+ public void testSlidingTimeWindow() throws Exception {
final int numElementsPerKey = 3000;
final int windowSize = 1000;
final int windowSlide = 100;
final int numKeys = 1;
- try {
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(PARALLELISM);
- env.enableCheckpointing(100);
- RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1,
0L);
-
- env.addSource(
- new FailingSource(
- new
EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(
- numKeys, windowSlide),
- numElementsPerKey))
- .rebalance()
- .windowAll(
- SlidingEventTimeWindows.of(
- Duration.ofMillis(windowSize),
Duration.ofMillis(windowSlide)))
- .apply(
- new RichAllWindowFunction<
- Tuple2<Long, IntType>,
- Tuple4<Long, Long, Long, IntType>,
- TimeWindow>() {
-
- private boolean open = false;
-
- @Override
- public void open(OpenContext openContext) {
- assertEquals(
- 1,
- getRuntimeContext()
- .getTaskInfo()
-
.getNumberOfParallelSubtasks());
- open = true;
- }
-
- @Override
- public void apply(
- TimeWindow window,
- Iterable<Tuple2<Long, IntType>> values,
- Collector<Tuple4<Long, Long, Long,
IntType>> out) {
-
- // validate that the function has been
opened properly
- assertTrue(open);
-
- int sum = 0;
- long key = -1;
-
- for (Tuple2<Long, IntType> value : values)
{
- sum += value.f1.value;
- key = value.f0;
- }
- out.collect(
- new Tuple4<>(
- key,
- window.getStart(),
- window.getEnd(),
- new IntType(sum)));
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+ env.enableCheckpointing(100);
+ RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L);
+
+ env.addSource(
+ new FailingSource(
+ new
EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(
+ numKeys, windowSlide),
+ numElementsPerKey))
+ .rebalance()
+ .windowAll(
+ SlidingEventTimeWindows.of(
+ Duration.ofMillis(windowSize),
Duration.ofMillis(windowSlide)))
+ .apply(
+ new RichAllWindowFunction<
+ Tuple2<Long, IntType>,
+ Tuple4<Long, Long, Long, IntType>,
+ TimeWindow>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(OpenContext openContext) {
+ assertEquals(
+ 1,
+ getRuntimeContext()
+ .getTaskInfo()
+
.getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public void apply(
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> values,
+ Collector<Tuple4<Long, Long, Long,
IntType>> out) {
+
+ // validate that the function has been opened
properly
+ assertTrue(open);
+
+ int sum = 0;
+ long key = -1;
+
+ for (Tuple2<Long, IntType> value : values) {
+ sum += value.f1.value;
+ key = value.f0;
}
- })
- .addSink(
- new ValidatingSink<>(
- new
EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(
- numElementsPerKey),
- new
EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(
- numKeys, numElementsPerKey,
windowSlide)))
- .setParallelism(1);
-
- env.execute("Sliding Window Test");
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ out.collect(
+ new Tuple4<>(
+ key,
+ window.getStart(),
+ window.getEnd(),
+ new IntType(sum)));
+ }
+ })
+ .addSink(
+ new ValidatingSink<>(
+ new
EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(
+ numElementsPerKey),
+ new
EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(
+ numKeys, numElementsPerKey,
windowSlide)))
+ .setParallelism(1);
+
+ env.execute("Sliding Window Test");
}
@Test
- public void testPreAggregatedTumblingTimeWindow() {
+ public void testPreAggregatedTumblingTimeWindow() throws Exception {
final int numElementsPerKey = 3000;
final int windowSize = 100;
final int numKeys = 1;
- try {
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(PARALLELISM);
- env.enableCheckpointing(100);
- RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1,
0L);
-
- env.addSource(
- new FailingSource(
- new
EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(
- numKeys, windowSize),
- numElementsPerKey))
- .rebalance()
-
.windowAll(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize)))
- .reduce(
- new ReduceFunction<Tuple2<Long, IntType>>() {
-
- @Override
- public Tuple2<Long, IntType> reduce(
- Tuple2<Long, IntType> a, Tuple2<Long,
IntType> b) {
-
- return new Tuple2<>(a.f0, new
IntType(a.f1.value + b.f1.value));
- }
- },
- new RichAllWindowFunction<
- Tuple2<Long, IntType>,
- Tuple4<Long, Long, Long, IntType>,
- TimeWindow>() {
-
- private boolean open = false;
-
- @Override
- public void open(OpenContext openContext) {
- assertEquals(
- 1,
- getRuntimeContext()
- .getTaskInfo()
-
.getNumberOfParallelSubtasks());
- open = true;
- }
-
- @Override
- public void apply(
- TimeWindow window,
- Iterable<Tuple2<Long, IntType>> input,
- Collector<Tuple4<Long, Long, Long,
IntType>> out) {
-
- // validate that the function has been
opened properly
- assertTrue(open);
-
- for (Tuple2<Long, IntType> in : input) {
- out.collect(
- new Tuple4<>(
- in.f0,
- window.getStart(),
- window.getEnd(),
- in.f1));
- }
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+ env.enableCheckpointing(100);
+ RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L);
+
+ env.addSource(
+ new FailingSource(
+ new
EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(
+ numKeys, windowSize),
+ numElementsPerKey))
+ .rebalance()
+
.windowAll(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize)))
+ .reduce(
+ new ReduceFunction<Tuple2<Long, IntType>>() {
+
+ @Override
+ public Tuple2<Long, IntType> reduce(
+ Tuple2<Long, IntType> a, Tuple2<Long,
IntType> b) {
+
+ return new Tuple2<>(a.f0, new
IntType(a.f1.value + b.f1.value));
+ }
+ },
+ new RichAllWindowFunction<
+ Tuple2<Long, IntType>,
+ Tuple4<Long, Long, Long, IntType>,
+ TimeWindow>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(OpenContext openContext) {
+ assertEquals(
+ 1,
+ getRuntimeContext()
+ .getTaskInfo()
+
.getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public void apply(
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> input,
+ Collector<Tuple4<Long, Long, Long,
IntType>> out) {
+
+ // validate that the function has been opened
properly
+ assertTrue(open);
+
+ for (Tuple2<Long, IntType> in : input) {
+ out.collect(
+ new Tuple4<>(
+ in.f0,
+ window.getStart(),
+ window.getEnd(),
+ in.f1));
}
- })
- .addSink(
- new ValidatingSink<>(
- new
EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(
- numElementsPerKey),
- new
EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(
- numKeys, numElementsPerKey,
windowSize)))
- .setParallelism(1);
-
- env.execute("PreAggregated Tumbling Window Test");
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ }
+ })
+ .addSink(
+ new ValidatingSink<>(
+ new
EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(
+ numElementsPerKey),
+ new
EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(
+ numKeys, numElementsPerKey,
windowSize)))
+ .setParallelism(1);
+
+ env.execute("PreAggregated Tumbling Window Test");
}
@Test
- public void testPreAggregatedSlidingTimeWindow() {
+ public void testPreAggregatedSlidingTimeWindow() throws Exception {
final int numElementsPerKey = 3000;
final int windowSize = 1000;
final int windowSlide = 100;
final int numKeys = 1;
- try {
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(PARALLELISM);
- env.enableCheckpointing(100);
- RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1,
0L);
-
- env.addSource(
- new FailingSource(
- new
EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(
- numKeys, windowSlide),
- numElementsPerKey))
- .rebalance()
- .windowAll(
- SlidingEventTimeWindows.of(
- Duration.ofMillis(windowSize),
Duration.ofMillis(windowSlide)))
- .reduce(
- new ReduceFunction<Tuple2<Long, IntType>>() {
-
- @Override
- public Tuple2<Long, IntType> reduce(
- Tuple2<Long, IntType> a, Tuple2<Long,
IntType> b) {
-
- return new Tuple2<>(a.f0, new
IntType(a.f1.value + b.f1.value));
- }
- },
- new RichAllWindowFunction<
- Tuple2<Long, IntType>,
- Tuple4<Long, Long, Long, IntType>,
- TimeWindow>() {
-
- private boolean open = false;
-
- @Override
- public void open(OpenContext openContext) {
- assertEquals(
- 1,
- getRuntimeContext()
- .getTaskInfo()
-
.getNumberOfParallelSubtasks());
- open = true;
- }
-
- @Override
- public void apply(
- TimeWindow window,
- Iterable<Tuple2<Long, IntType>> input,
- Collector<Tuple4<Long, Long, Long,
IntType>> out) {
-
- // validate that the function has been
opened properly
- assertTrue(open);
-
- for (Tuple2<Long, IntType> in : input) {
- out.collect(
- new Tuple4<>(
- in.f0,
- window.getStart(),
- window.getEnd(),
- in.f1));
- }
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+ env.enableCheckpointing(100);
+ RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L);
+
+ env.addSource(
+ new FailingSource(
+ new
EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(
+ numKeys, windowSlide),
+ numElementsPerKey))
+ .rebalance()
+ .windowAll(
+ SlidingEventTimeWindows.of(
+ Duration.ofMillis(windowSize),
Duration.ofMillis(windowSlide)))
+ .reduce(
+ new ReduceFunction<Tuple2<Long, IntType>>() {
+
+ @Override
+ public Tuple2<Long, IntType> reduce(
+ Tuple2<Long, IntType> a, Tuple2<Long,
IntType> b) {
+
+ return new Tuple2<>(a.f0, new
IntType(a.f1.value + b.f1.value));
+ }
+ },
+ new RichAllWindowFunction<
+ Tuple2<Long, IntType>,
+ Tuple4<Long, Long, Long, IntType>,
+ TimeWindow>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(OpenContext openContext) {
+ assertEquals(
+ 1,
+ getRuntimeContext()
+ .getTaskInfo()
+
.getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public void apply(
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> input,
+ Collector<Tuple4<Long, Long, Long,
IntType>> out) {
+
+ // validate that the function has been opened
properly
+ assertTrue(open);
+
+ for (Tuple2<Long, IntType> in : input) {
+ out.collect(
+ new Tuple4<>(
+ in.f0,
+ window.getStart(),
+ window.getEnd(),
+ in.f1));
}
- })
- .addSink(
- new ValidatingSink<>(
- new
EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(
- numElementsPerKey),
- new
EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(
- numKeys, numElementsPerKey,
windowSlide)))
- .setParallelism(1);
-
- env.execute("PreAggregated Sliding Window Test");
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ }
+ })
+ .addSink(
+ new ValidatingSink<>(
+ new
EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(
+ numElementsPerKey),
+ new
EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(
+ numKeys, numElementsPerKey,
windowSlide)))
+ .setParallelism(1);
+
+ env.execute("PreAggregated Sliding Window Test");
}
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index ca07e3a6ab2..161062e6826 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -76,7 +76,6 @@ import java.util.stream.Collectors;
import static
org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
/**
* This verifies that checkpointing works correctly with event time windows.
This is more strict
@@ -308,435 +307,405 @@ public class EventTimeWindowCheckpointingITCase extends
TestLogger {
// ------------------------------------------------------------------------
@Test
- public void testTumblingTimeWindow() {
+ public void testTumblingTimeWindow() throws Exception {
final int numElementsPerKey = numElementsPerKey();
final int windowSize = windowSize();
final int numKeys = numKeys();
- try {
- StreamExecutionEnvironment env =
-
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
- env.setParallelism(PARALLELISM);
- env.enableCheckpointing(100);
- RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1,
0L);
- env.getConfig().setUseSnapshotCompression(true);
-
- env.addSource(
- new FailingSource(
- new KeyedEventTimeGenerator(numKeys,
windowSize),
- numElementsPerKey))
- .rebalance()
- .keyBy(x -> x.f0)
-
.window(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize)))
- .apply(
- new RichWindowFunction<
- Tuple2<Long, IntType>,
- Tuple4<Long, Long, Long, IntType>,
- Long,
- TimeWindow>() {
-
- private boolean open = false;
-
- @Override
- public void open(OpenContext openContext) {
- assertEquals(
- PARALLELISM,
- getRuntimeContext()
- .getTaskInfo()
-
.getNumberOfParallelSubtasks());
- open = true;
+ StreamExecutionEnvironment env =
+
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+ env.setParallelism(PARALLELISM);
+ env.enableCheckpointing(100);
+ RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L);
+ env.getConfig().setUseSnapshotCompression(true);
+
+ env.addSource(
+ new FailingSource(
+ new KeyedEventTimeGenerator(numKeys,
windowSize),
+ numElementsPerKey))
+ .rebalance()
+ .keyBy(x -> x.f0)
+
.window(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize)))
+ .apply(
+ new RichWindowFunction<
+ Tuple2<Long, IntType>,
+ Tuple4<Long, Long, Long, IntType>,
+ Long,
+ TimeWindow>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(OpenContext openContext) {
+ assertEquals(
+ PARALLELISM,
+ getRuntimeContext()
+ .getTaskInfo()
+
.getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public void apply(
+ Long l,
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> values,
+ Collector<Tuple4<Long, Long, Long,
IntType>> out) {
+
+ // validate that the function has been opened
properly
+ assertTrue(open);
+
+ int sum = 0;
+ long key = -1;
+
+ for (Tuple2<Long, IntType> value : values) {
+ sum += value.f1.value;
+ key = value.f0;
}
- @Override
- public void apply(
- Long l,
- TimeWindow window,
- Iterable<Tuple2<Long, IntType>> values,
- Collector<Tuple4<Long, Long, Long,
IntType>> out) {
-
- // validate that the function has been
opened properly
- assertTrue(open);
-
- int sum = 0;
- long key = -1;
-
- for (Tuple2<Long, IntType> value : values)
{
- sum += value.f1.value;
- key = value.f0;
- }
-
- final Tuple4<Long, Long, Long, IntType>
result =
- new Tuple4<>(
- key,
- window.getStart(),
- window.getEnd(),
- new IntType(sum));
- out.collect(result);
- }
- })
- .addSink(
- new ValidatingSink<>(
- new
SinkValidatorUpdateFun(numElementsPerKey),
- new SinkValidatorCheckFun(
- numKeys, numElementsPerKey,
windowSize)))
- .setParallelism(1);
-
- env.execute("Tumbling Window Test");
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ final Tuple4<Long, Long, Long, IntType> result
=
+ new Tuple4<>(
+ key,
+ window.getStart(),
+ window.getEnd(),
+ new IntType(sum));
+ out.collect(result);
+ }
+ })
+ .addSink(
+ new ValidatingSink<>(
+ new SinkValidatorUpdateFun(numElementsPerKey),
+ new SinkValidatorCheckFun(numKeys,
numElementsPerKey, windowSize)))
+ .setParallelism(1);
+
+ env.execute("Tumbling Window Test");
}
@Test
- public void testTumblingTimeWindowWithKVStateMinMaxParallelism() {
+ public void testTumblingTimeWindowWithKVStateMinMaxParallelism() throws
Exception {
doTestTumblingTimeWindowWithKVState(PARALLELISM);
}
@Test
- public void testTumblingTimeWindowWithKVStateMaxMaxParallelism() {
+ public void testTumblingTimeWindowWithKVStateMaxMaxParallelism() throws
Exception {
doTestTumblingTimeWindowWithKVState(1 << 15);
}
- public void doTestTumblingTimeWindowWithKVState(int maxParallelism) {
+ public void doTestTumblingTimeWindowWithKVState(int maxParallelism) throws
Exception {
final int numElementsPerKey = numElementsPerKey();
final int windowSize = windowSize();
final int numKeys = numKeys();
- try {
- StreamExecutionEnvironment env =
-
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
- env.setParallelism(PARALLELISM);
- env.setMaxParallelism(maxParallelism);
- env.enableCheckpointing(100);
- RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1,
0L);
- env.getConfig().setUseSnapshotCompression(true);
-
- env.addSource(
- new FailingSource(
- new KeyedEventTimeGenerator(numKeys,
windowSize),
- numElementsPerKey))
- .rebalance()
- .keyBy(x -> x.f0)
-
.window(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize)))
- .apply(
- new RichWindowFunction<
- Tuple2<Long, IntType>,
- Tuple4<Long, Long, Long, IntType>,
- Long,
- TimeWindow>() {
-
- private boolean open = false;
-
- private ValueState<Integer> count;
-
- @Override
- public void open(OpenContext openContext) {
- assertEquals(
- PARALLELISM,
- getRuntimeContext()
- .getTaskInfo()
-
.getNumberOfParallelSubtasks());
- open = true;
- count =
- getRuntimeContext()
- .getState(
- new
ValueStateDescriptor<>(
- "count",
Integer.class, 0));
+ StreamExecutionEnvironment env =
+
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+ env.setParallelism(PARALLELISM);
+ env.setMaxParallelism(maxParallelism);
+ env.enableCheckpointing(100);
+ RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L);
+ env.getConfig().setUseSnapshotCompression(true);
+
+ env.addSource(
+ new FailingSource(
+ new KeyedEventTimeGenerator(numKeys,
windowSize),
+ numElementsPerKey))
+ .rebalance()
+ .keyBy(x -> x.f0)
+
.window(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize)))
+ .apply(
+ new RichWindowFunction<
+ Tuple2<Long, IntType>,
+ Tuple4<Long, Long, Long, IntType>,
+ Long,
+ TimeWindow>() {
+
+ private boolean open = false;
+
+ private ValueState<Integer> count;
+
+ @Override
+ public void open(OpenContext openContext) {
+ assertEquals(
+ PARALLELISM,
+ getRuntimeContext()
+ .getTaskInfo()
+
.getNumberOfParallelSubtasks());
+ open = true;
+ count =
+ getRuntimeContext()
+ .getState(
+ new
ValueStateDescriptor<>(
+ "count",
Integer.class, 0));
+ }
+
+ @Override
+ public void apply(
+ Long l,
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> values,
+ Collector<Tuple4<Long, Long, Long,
IntType>> out)
+ throws Exception {
+
+ // the window count state starts with the key,
so that we get
+ // different count results for each key
+ if (count.value() == 0) {
+ count.update(l.intValue());
}
- @Override
- public void apply(
- Long l,
- TimeWindow window,
- Iterable<Tuple2<Long, IntType>> values,
- Collector<Tuple4<Long, Long, Long,
IntType>> out)
- throws Exception {
-
- // the window count state starts with the
key, so that we get
- // different count results for each key
- if (count.value() == 0) {
- count.update(l.intValue());
- }
-
- // validate that the function has been
opened properly
- assertTrue(open);
-
- count.update(count.value() + 1);
- out.collect(
- new Tuple4<>(
- l,
- window.getStart(),
- window.getEnd(),
- new
IntType(count.value())));
- }
- })
- .addSink(
- new ValidatingSink<>(
- new CountingSinkValidatorUpdateFun(),
- new SinkValidatorCheckFun(
- numKeys, numElementsPerKey,
windowSize)))
- .setParallelism(1);
-
- env.execute("Tumbling Window Test");
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ // validate that the function has been opened
properly
+ assertTrue(open);
+
+ count.update(count.value() + 1);
+ out.collect(
+ new Tuple4<>(
+ l,
+ window.getStart(),
+ window.getEnd(),
+ new IntType(count.value())));
+ }
+ })
+ .addSink(
+ new ValidatingSink<>(
+ new CountingSinkValidatorUpdateFun(),
+ new SinkValidatorCheckFun(numKeys,
numElementsPerKey, windowSize)))
+ .setParallelism(1);
+
+ env.execute("Tumbling Window Test");
}
@Test
- public void testSlidingTimeWindow() {
+ public void testSlidingTimeWindow() throws Exception {
final int numElementsPerKey = numElementsPerKey();
final int windowSize = windowSize();
final int windowSlide = windowSlide();
final int numKeys = numKeys();
- try {
- StreamExecutionEnvironment env =
-
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
- env.setMaxParallelism(2 * PARALLELISM);
- env.setParallelism(PARALLELISM);
- env.enableCheckpointing(100);
- RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1,
0L);
- env.getConfig().setUseSnapshotCompression(true);
-
- env.addSource(
- new FailingSource(
- new KeyedEventTimeGenerator(numKeys,
windowSlide),
- numElementsPerKey))
- .rebalance()
- .keyBy(x -> x.f0)
- .window(
- SlidingEventTimeWindows.of(
- Duration.ofMillis(windowSize),
Duration.ofMillis(windowSlide)))
- .apply(
- new RichWindowFunction<
- Tuple2<Long, IntType>,
- Tuple4<Long, Long, Long, IntType>,
- Long,
- TimeWindow>() {
-
- private boolean open = false;
-
- @Override
- public void open(OpenContext openContext) {
- assertEquals(
- PARALLELISM,
- getRuntimeContext()
- .getTaskInfo()
-
.getNumberOfParallelSubtasks());
- open = true;
- }
-
- @Override
- public void apply(
- Long l,
- TimeWindow window,
- Iterable<Tuple2<Long, IntType>> values,
- Collector<Tuple4<Long, Long, Long,
IntType>> out) {
-
- // validate that the function has been
opened properly
- assertTrue(open);
-
- int sum = 0;
- long key = -1;
-
- for (Tuple2<Long, IntType> value : values)
{
- sum += value.f1.value;
- key = value.f0;
- }
- final Tuple4<Long, Long, Long, IntType>
output =
- new Tuple4<>(
- key,
- window.getStart(),
- window.getEnd(),
- new IntType(sum));
- out.collect(output);
+ StreamExecutionEnvironment env =
+
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+ env.setMaxParallelism(2 * PARALLELISM);
+ env.setParallelism(PARALLELISM);
+ env.enableCheckpointing(100);
+ RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L);
+ env.getConfig().setUseSnapshotCompression(true);
+
+ env.addSource(
+ new FailingSource(
+ new KeyedEventTimeGenerator(numKeys,
windowSlide),
+ numElementsPerKey))
+ .rebalance()
+ .keyBy(x -> x.f0)
+ .window(
+ SlidingEventTimeWindows.of(
+ Duration.ofMillis(windowSize),
Duration.ofMillis(windowSlide)))
+ .apply(
+ new RichWindowFunction<
+ Tuple2<Long, IntType>,
+ Tuple4<Long, Long, Long, IntType>,
+ Long,
+ TimeWindow>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(OpenContext openContext) {
+ assertEquals(
+ PARALLELISM,
+ getRuntimeContext()
+ .getTaskInfo()
+
.getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public void apply(
+ Long l,
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> values,
+ Collector<Tuple4<Long, Long, Long,
IntType>> out) {
+
+ // validate that the function has been opened
properly
+ assertTrue(open);
+
+ int sum = 0;
+ long key = -1;
+
+ for (Tuple2<Long, IntType> value : values) {
+ sum += value.f1.value;
+ key = value.f0;
}
- })
- .addSink(
- new ValidatingSink<>(
- new
SinkValidatorUpdateFun(numElementsPerKey),
- new SinkValidatorCheckFun(
- numKeys, numElementsPerKey,
windowSlide)))
- .setParallelism(1);
-
- env.execute("Tumbling Window Test");
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ final Tuple4<Long, Long, Long, IntType> output
=
+ new Tuple4<>(
+ key,
+ window.getStart(),
+ window.getEnd(),
+ new IntType(sum));
+ out.collect(output);
+ }
+ })
+ .addSink(
+ new ValidatingSink<>(
+ new SinkValidatorUpdateFun(numElementsPerKey),
+ new SinkValidatorCheckFun(numKeys,
numElementsPerKey, windowSlide)))
+ .setParallelism(1);
+
+ env.execute("Tumbling Window Test");
}
@Test
- public void testPreAggregatedTumblingTimeWindow() {
+ public void testPreAggregatedTumblingTimeWindow() throws Exception {
final int numElementsPerKey = numElementsPerKey();
final int windowSize = windowSize();
final int numKeys = numKeys();
- try {
- StreamExecutionEnvironment env =
-
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
- env.setParallelism(PARALLELISM);
- env.enableCheckpointing(100);
- RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1,
0L);
- env.getConfig().setUseSnapshotCompression(true);
-
- env.addSource(
- new FailingSource(
- new KeyedEventTimeGenerator(numKeys,
windowSize),
- numElementsPerKey))
- .rebalance()
- .keyBy(x -> x.f0)
-
.window(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize)))
- .reduce(
- new ReduceFunction<Tuple2<Long, IntType>>() {
-
- @Override
- public Tuple2<Long, IntType> reduce(
- Tuple2<Long, IntType> a, Tuple2<Long,
IntType> b) {
- return new Tuple2<>(a.f0, new
IntType(a.f1.value + b.f1.value));
- }
- },
- new RichWindowFunction<
- Tuple2<Long, IntType>,
- Tuple4<Long, Long, Long, IntType>,
- Long,
- TimeWindow>() {
-
- private boolean open = false;
-
- @Override
- public void open(OpenContext openContext) {
- assertEquals(
- PARALLELISM,
- getRuntimeContext()
- .getTaskInfo()
-
.getNumberOfParallelSubtasks());
- open = true;
- }
-
- @Override
- public void apply(
- Long l,
- TimeWindow window,
- Iterable<Tuple2<Long, IntType>> input,
- Collector<Tuple4<Long, Long, Long,
IntType>> out) {
-
- // validate that the function has been
opened properly
- assertTrue(open);
-
- for (Tuple2<Long, IntType> in : input) {
- final Tuple4<Long, Long, Long,
IntType> output =
- new Tuple4<>(
- in.f0,
- window.getStart(),
- window.getEnd(),
- in.f1);
- out.collect(output);
- }
+ StreamExecutionEnvironment env =
+
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+ env.setParallelism(PARALLELISM);
+ env.enableCheckpointing(100);
+ RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L);
+ env.getConfig().setUseSnapshotCompression(true);
+
+ env.addSource(
+ new FailingSource(
+ new KeyedEventTimeGenerator(numKeys,
windowSize),
+ numElementsPerKey))
+ .rebalance()
+ .keyBy(x -> x.f0)
+
.window(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize)))
+ .reduce(
+ new ReduceFunction<Tuple2<Long, IntType>>() {
+
+ @Override
+ public Tuple2<Long, IntType> reduce(
+ Tuple2<Long, IntType> a, Tuple2<Long,
IntType> b) {
+ return new Tuple2<>(a.f0, new
IntType(a.f1.value + b.f1.value));
+ }
+ },
+ new RichWindowFunction<
+ Tuple2<Long, IntType>,
+ Tuple4<Long, Long, Long, IntType>,
+ Long,
+ TimeWindow>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(OpenContext openContext) {
+ assertEquals(
+ PARALLELISM,
+ getRuntimeContext()
+ .getTaskInfo()
+
.getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public void apply(
+ Long l,
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> input,
+ Collector<Tuple4<Long, Long, Long,
IntType>> out) {
+
+ // validate that the function has been opened
properly
+ assertTrue(open);
+
+ for (Tuple2<Long, IntType> in : input) {
+ final Tuple4<Long, Long, Long, IntType>
output =
+ new Tuple4<>(
+ in.f0,
+ window.getStart(),
+ window.getEnd(),
+ in.f1);
+ out.collect(output);
}
- })
- .addSink(
- new ValidatingSink<>(
- new
SinkValidatorUpdateFun(numElementsPerKey),
- new SinkValidatorCheckFun(
- numKeys, numElementsPerKey,
windowSize)))
- .setParallelism(1);
-
- env.execute("Tumbling Window Test");
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ }
+ })
+ .addSink(
+ new ValidatingSink<>(
+ new SinkValidatorUpdateFun(numElementsPerKey),
+ new SinkValidatorCheckFun(numKeys,
numElementsPerKey, windowSize)))
+ .setParallelism(1);
+
+ env.execute("Tumbling Window Test");
}
@Test
- public void testPreAggregatedSlidingTimeWindow() {
+ public void testPreAggregatedSlidingTimeWindow() throws Exception {
final int numElementsPerKey = numElementsPerKey();
final int windowSize = windowSize();
final int windowSlide = windowSlide();
final int numKeys = numKeys();
- try {
- StreamExecutionEnvironment env =
-
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
- env.setParallelism(PARALLELISM);
- env.enableCheckpointing(100);
- RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1,
0L);
- env.getConfig().setUseSnapshotCompression(true);
-
- env.addSource(
- new FailingSource(
- new KeyedEventTimeGenerator(numKeys,
windowSlide),
- numElementsPerKey))
- .rebalance()
- .keyBy(x -> x.f0)
- .window(
- SlidingEventTimeWindows.of(
- Duration.ofMillis(windowSize),
Duration.ofMillis(windowSlide)))
- .reduce(
- new ReduceFunction<Tuple2<Long, IntType>>() {
-
- @Override
- public Tuple2<Long, IntType> reduce(
- Tuple2<Long, IntType> a, Tuple2<Long,
IntType> b) {
-
- // validate that the function has been
opened properly
- return new Tuple2<>(a.f0, new
IntType(a.f1.value + b.f1.value));
- }
- },
- new RichWindowFunction<
- Tuple2<Long, IntType>,
- Tuple4<Long, Long, Long, IntType>,
- Long,
- TimeWindow>() {
-
- private boolean open = false;
-
- @Override
- public void open(OpenContext openContext) {
- assertEquals(
- PARALLELISM,
- getRuntimeContext()
- .getTaskInfo()
-
.getNumberOfParallelSubtasks());
- open = true;
- }
-
- @Override
- public void apply(
- Long l,
- TimeWindow window,
- Iterable<Tuple2<Long, IntType>> input,
- Collector<Tuple4<Long, Long, Long,
IntType>> out) {
-
- // validate that the function has been
opened properly
- assertTrue(open);
-
- for (Tuple2<Long, IntType> in : input) {
- out.collect(
- new Tuple4<>(
- in.f0,
- window.getStart(),
- window.getEnd(),
- in.f1));
- }
+ StreamExecutionEnvironment env =
+
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+ env.setParallelism(PARALLELISM);
+ env.enableCheckpointing(100);
+ RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L);
+ env.getConfig().setUseSnapshotCompression(true);
+
+ env.addSource(
+ new FailingSource(
+ new KeyedEventTimeGenerator(numKeys,
windowSlide),
+ numElementsPerKey))
+ .rebalance()
+ .keyBy(x -> x.f0)
+ .window(
+ SlidingEventTimeWindows.of(
+ Duration.ofMillis(windowSize),
Duration.ofMillis(windowSlide)))
+ .reduce(
+ new ReduceFunction<Tuple2<Long, IntType>>() {
+
+ @Override
+ public Tuple2<Long, IntType> reduce(
+ Tuple2<Long, IntType> a, Tuple2<Long,
IntType> b) {
+
+ // validate that the function has been opened
properly
+ return new Tuple2<>(a.f0, new
IntType(a.f1.value + b.f1.value));
+ }
+ },
+ new RichWindowFunction<
+ Tuple2<Long, IntType>,
+ Tuple4<Long, Long, Long, IntType>,
+ Long,
+ TimeWindow>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(OpenContext openContext) {
+ assertEquals(
+ PARALLELISM,
+ getRuntimeContext()
+ .getTaskInfo()
+
.getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public void apply(
+ Long l,
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> input,
+ Collector<Tuple4<Long, Long, Long,
IntType>> out) {
+
+ // validate that the function has been opened
properly
+ assertTrue(open);
+
+ for (Tuple2<Long, IntType> in : input) {
+ out.collect(
+ new Tuple4<>(
+ in.f0,
+ window.getStart(),
+ window.getEnd(),
+ in.f1));
}
- })
- .addSink(
- new ValidatingSink<>(
- new
SinkValidatorUpdateFun(numElementsPerKey),
- new SinkValidatorCheckFun(
- numKeys, numElementsPerKey,
windowSlide)))
- .setParallelism(1);
-
- env.execute("Tumbling Window Test");
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ }
+ })
+ .addSink(
+ new ValidatingSink<>(
+ new SinkValidatorUpdateFun(numElementsPerKey),
+ new SinkValidatorCheckFun(numKeys,
numElementsPerKey, windowSlide)))
+ .setParallelism(1);
+
+ env.execute("Tumbling Window Test");
}
// ------------------------------------------------------------------------
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ProcessingTimeWindowCheckpointingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ProcessingTimeWindowCheckpointingITCase.java
index 1ebbbd35f0a..d58905e1ef6 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ProcessingTimeWindowCheckpointingITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ProcessingTimeWindowCheckpointingITCase.java
@@ -78,216 +78,190 @@ public class ProcessingTimeWindowCheckpointingITCase
extends TestLogger {
// ------------------------------------------------------------------------
@Test
- public void testTumblingProcessingTimeWindow() {
+ public void testTumblingProcessingTimeWindow() throws Exception {
final int numElements = 3000;
- try {
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(PARALLELISM);
- env.getConfig().setAutoWatermarkInterval(10);
- env.enableCheckpointing(100);
- RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1,
0L);
-
- SinkValidatorUpdaterAndChecker updaterAndChecker =
- new SinkValidatorUpdaterAndChecker(numElements, 1);
-
- env.addSource(new FailingSource(new Generator(), numElements,
true))
- .rebalance()
- .keyBy(x -> x.f0)
-
.window(TumblingProcessingTimeWindows.of(Duration.ofMillis(100)))
- .apply(
- new RichWindowFunction<
- Tuple2<Long, IntType>,
- Tuple2<Long, IntType>,
- Long,
- TimeWindow>() {
-
- private boolean open = false;
-
- @Override
- public void open(OpenContext openContext) {
- assertEquals(
- PARALLELISM,
- getRuntimeContext()
- .getTaskInfo()
-
.getNumberOfParallelSubtasks());
- open = true;
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+ env.getConfig().setAutoWatermarkInterval(10);
+ env.enableCheckpointing(100);
+ RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L);
+
+ SinkValidatorUpdaterAndChecker updaterAndChecker =
+ new SinkValidatorUpdaterAndChecker(numElements, 1);
+
+ env.addSource(new FailingSource(new Generator(), numElements, true))
+ .rebalance()
+ .keyBy(x -> x.f0)
+
.window(TumblingProcessingTimeWindows.of(Duration.ofMillis(100)))
+ .apply(
+ new RichWindowFunction<
+ Tuple2<Long, IntType>, Tuple2<Long, IntType>,
Long, TimeWindow>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(OpenContext openContext) {
+ assertEquals(
+ PARALLELISM,
+ getRuntimeContext()
+ .getTaskInfo()
+
.getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public void apply(
+ Long l,
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> values,
+ Collector<Tuple2<Long, IntType>> out) {
+
+ // validate that the function has been opened
properly
+ assertTrue(open);
+
+ for (Tuple2<Long, IntType> value : values) {
+ assertEquals(value.f0.intValue(),
value.f1.value);
+ out.collect(new Tuple2<>(value.f0, new
IntType(1)));
}
+ }
+ })
+ .addSink(new ValidatingSink<>(updaterAndChecker,
updaterAndChecker, true))
+ .setParallelism(1);
- @Override
- public void apply(
- Long l,
- TimeWindow window,
- Iterable<Tuple2<Long, IntType>> values,
- Collector<Tuple2<Long, IntType>> out) {
-
- // validate that the function has been
opened properly
- assertTrue(open);
-
- for (Tuple2<Long, IntType> value : values)
{
- assertEquals(value.f0.intValue(),
value.f1.value);
- out.collect(new Tuple2<>(value.f0, new
IntType(1)));
- }
- }
- })
- .addSink(new ValidatingSink<>(updaterAndChecker,
updaterAndChecker, true))
- .setParallelism(1);
-
- tryExecute(env, "Tumbling Window Test");
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ tryExecute(env, "Tumbling Window Test");
}
@Test
- public void testSlidingProcessingTimeWindow() {
+ public void testSlidingProcessingTimeWindow() throws Exception {
final int numElements = 3000;
- try {
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(PARALLELISM);
- env.getConfig().setAutoWatermarkInterval(10);
- env.enableCheckpointing(100);
- RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1,
0L);
- SinkValidatorUpdaterAndChecker updaterAndChecker =
- new SinkValidatorUpdaterAndChecker(numElements, 3);
- env.addSource(new FailingSource(new Generator(), numElements,
true))
- .rebalance()
- .keyBy(x -> x.f0)
- .window(
- SlidingProcessingTimeWindows.of(
- Duration.ofMillis(150),
Duration.ofMillis(50)))
- .apply(
- new RichWindowFunction<
- Tuple2<Long, IntType>,
- Tuple2<Long, IntType>,
- Long,
- TimeWindow>() {
-
- private boolean open = false;
-
- @Override
- public void open(OpenContext openContext) {
- assertEquals(
- PARALLELISM,
- getRuntimeContext()
- .getTaskInfo()
-
.getNumberOfParallelSubtasks());
- open = true;
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+ env.getConfig().setAutoWatermarkInterval(10);
+ env.enableCheckpointing(100);
+ RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L);
+ SinkValidatorUpdaterAndChecker updaterAndChecker =
+ new SinkValidatorUpdaterAndChecker(numElements, 3);
+ env.addSource(new FailingSource(new Generator(), numElements, true))
+ .rebalance()
+ .keyBy(x -> x.f0)
+ .window(
+ SlidingProcessingTimeWindows.of(
+ Duration.ofMillis(150), Duration.ofMillis(50)))
+ .apply(
+ new RichWindowFunction<
+ Tuple2<Long, IntType>, Tuple2<Long, IntType>,
Long, TimeWindow>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(OpenContext openContext) {
+ assertEquals(
+ PARALLELISM,
+ getRuntimeContext()
+ .getTaskInfo()
+
.getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public void apply(
+ Long l,
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> values,
+ Collector<Tuple2<Long, IntType>> out) {
+
+ // validate that the function has been opened
properly
+ assertTrue(open);
+
+ for (Tuple2<Long, IntType> value : values) {
+ assertEquals(value.f0.intValue(),
value.f1.value);
+ out.collect(new Tuple2<>(value.f0, new
IntType(1)));
}
+ }
+ })
+ .addSink(new ValidatingSink<>(updaterAndChecker,
updaterAndChecker, true))
+ .setParallelism(1);
- @Override
- public void apply(
- Long l,
- TimeWindow window,
- Iterable<Tuple2<Long, IntType>> values,
- Collector<Tuple2<Long, IntType>> out) {
-
- // validate that the function has been
opened properly
- assertTrue(open);
-
- for (Tuple2<Long, IntType> value : values)
{
- assertEquals(value.f0.intValue(),
value.f1.value);
- out.collect(new Tuple2<>(value.f0, new
IntType(1)));
- }
- }
- })
- .addSink(new ValidatingSink<>(updaterAndChecker,
updaterAndChecker, true))
- .setParallelism(1);
-
- tryExecute(env, "Sliding Window Test");
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ tryExecute(env, "Sliding Window Test");
}
@Test
- public void testAggregatingTumblingProcessingTimeWindow() {
+ public void testAggregatingTumblingProcessingTimeWindow() throws Exception
{
final int numElements = 3000;
- try {
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(PARALLELISM);
- env.getConfig().setAutoWatermarkInterval(10);
- env.enableCheckpointing(100);
- RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1,
0L);
- SinkValidatorUpdaterAndChecker updaterAndChecker =
- new SinkValidatorUpdaterAndChecker(numElements, 1);
- env.addSource(new FailingSource(new Generator(), numElements,
true))
- .map(
- new MapFunction<Tuple2<Long, IntType>,
Tuple2<Long, IntType>>() {
- @Override
- public Tuple2<Long, IntType> map(Tuple2<Long,
IntType> value) {
- value.f1.value = 1;
- return value;
- }
- })
- .rebalance()
- .keyBy(x -> x.f0)
-
.window(TumblingProcessingTimeWindows.of(Duration.ofMillis(100)))
- .reduce(
- new ReduceFunction<Tuple2<Long, IntType>>() {
-
- @Override
- public Tuple2<Long, IntType> reduce(
- Tuple2<Long, IntType> a, Tuple2<Long,
IntType> b) {
- return new Tuple2<>(a.f0, new IntType(1));
- }
- })
- .addSink(new ValidatingSink<>(updaterAndChecker,
updaterAndChecker, true))
- .setParallelism(1);
-
- tryExecute(env, "Aggregating Tumbling Window Test");
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+ env.getConfig().setAutoWatermarkInterval(10);
+ env.enableCheckpointing(100);
+ RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L);
+ SinkValidatorUpdaterAndChecker updaterAndChecker =
+ new SinkValidatorUpdaterAndChecker(numElements, 1);
+ env.addSource(new FailingSource(new Generator(), numElements, true))
+ .map(
+ new MapFunction<Tuple2<Long, IntType>, Tuple2<Long,
IntType>>() {
+ @Override
+ public Tuple2<Long, IntType> map(Tuple2<Long,
IntType> value) {
+ value.f1.value = 1;
+ return value;
+ }
+ })
+ .rebalance()
+ .keyBy(x -> x.f0)
+
.window(TumblingProcessingTimeWindows.of(Duration.ofMillis(100)))
+ .reduce(
+ new ReduceFunction<Tuple2<Long, IntType>>() {
+
+ @Override
+ public Tuple2<Long, IntType> reduce(
+ Tuple2<Long, IntType> a, Tuple2<Long,
IntType> b) {
+ return new Tuple2<>(a.f0, new IntType(1));
+ }
+ })
+ .addSink(new ValidatingSink<>(updaterAndChecker,
updaterAndChecker, true))
+ .setParallelism(1);
+
+ tryExecute(env, "Aggregating Tumbling Window Test");
}
@Test
- public void testAggregatingSlidingProcessingTimeWindow() {
+ public void testAggregatingSlidingProcessingTimeWindow() throws Exception {
final int numElements = 3000;
- try {
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(PARALLELISM);
- env.getConfig().setAutoWatermarkInterval(10);
- env.enableCheckpointing(100);
- RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1,
0L);
- SinkValidatorUpdaterAndChecker updaterAndChecker =
- new SinkValidatorUpdaterAndChecker(numElements, 3);
- env.addSource(new FailingSource(new Generator(), numElements,
true))
- .map(
- new MapFunction<Tuple2<Long, IntType>,
Tuple2<Long, IntType>>() {
- @Override
- public Tuple2<Long, IntType> map(Tuple2<Long,
IntType> value) {
- value.f1.value = 1;
- return value;
- }
- })
- .rebalance()
- .keyBy(x -> x.f0)
- .window(
- SlidingProcessingTimeWindows.of(
- Duration.ofMillis(150),
Duration.ofMillis(50)))
- .reduce(
- new ReduceFunction<Tuple2<Long, IntType>>() {
- @Override
- public Tuple2<Long, IntType> reduce(
- Tuple2<Long, IntType> a, Tuple2<Long,
IntType> b) {
- return new Tuple2<>(a.f0, new IntType(1));
- }
- })
- .addSink(new ValidatingSink<>(updaterAndChecker,
updaterAndChecker, true))
- .setParallelism(1);
-
- tryExecute(env, "Aggregating Sliding Window Test");
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+ env.getConfig().setAutoWatermarkInterval(10);
+ env.enableCheckpointing(100);
+ RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L);
+ SinkValidatorUpdaterAndChecker updaterAndChecker =
+ new SinkValidatorUpdaterAndChecker(numElements, 3);
+ env.addSource(new FailingSource(new Generator(), numElements, true))
+ .map(
+ new MapFunction<Tuple2<Long, IntType>, Tuple2<Long,
IntType>>() {
+ @Override
+ public Tuple2<Long, IntType> map(Tuple2<Long,
IntType> value) {
+ value.f1.value = 1;
+ return value;
+ }
+ })
+ .rebalance()
+ .keyBy(x -> x.f0)
+ .window(
+ SlidingProcessingTimeWindows.of(
+ Duration.ofMillis(150), Duration.ofMillis(50)))
+ .reduce(
+ new ReduceFunction<Tuple2<Long, IntType>>() {
+ @Override
+ public Tuple2<Long, IntType> reduce(
+ Tuple2<Long, IntType> a, Tuple2<Long,
IntType> b) {
+ return new Tuple2<>(a.f0, new IntType(1));
+ }
+ })
+ .addSink(new ValidatingSink<>(updaterAndChecker,
updaterAndChecker, true))
+ .setParallelism(1);
+
+ tryExecute(env, "Aggregating Sliding Window Test");
}
// ------------------------------------------------------------------------
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
index 39bf862c0e2..89658ec82e7 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
@@ -141,16 +141,11 @@ public class RegionFailoverITCase extends TestLogger {
* completed checkpoint's.
*/
@Test(timeout = 60000)
- public void testMultiRegionFailover() {
- try {
- JobGraph jobGraph = createJobGraph();
- ClusterClient<?> client = cluster.getClusterClient();
- submitJobAndWaitForResult(client, jobGraph,
getClass().getClassLoader());
- verifyAfterJobExecuted();
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
+ public void testMultiRegionFailover() throws Exception {
+ JobGraph jobGraph = createJobGraph();
+ ClusterClient<?> client = cluster.getClusterClient();
+ submitJobAndWaitForResult(client, jobGraph,
getClass().getClassLoader());
+ verifyAfterJobExecuted();
}
private void verifyAfterJobExecuted() {
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
index 62451cd39fb..57871604682 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
@@ -53,7 +53,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
/**
* Integration test for the {@link CheckpointListener} interface. The test
ensures that {@link
@@ -83,74 +82,67 @@ public class StreamCheckpointNotifierITCase extends
AbstractTestBaseJUnit4 {
* </pre>
*/
@Test
- public void testProgram() {
- try {
- final StreamExecutionEnvironment env =
- StreamExecutionEnvironment.getExecutionEnvironment();
- assertEquals("test setup broken", PARALLELISM,
env.getParallelism());
+ public void testProgram() throws Exception {
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ assertEquals("test setup broken", PARALLELISM, env.getParallelism());
- env.enableCheckpointing(500);
- RestartStrategyUtils.configureFixedDelayRestartStrategy(env,
Integer.MAX_VALUE, 0L);
+ env.enableCheckpointing(500);
+ RestartStrategyUtils.configureFixedDelayRestartStrategy(env,
Integer.MAX_VALUE, 0L);
- final int numElements = 10000;
- final int numTaskTotal = PARALLELISM * 5;
+ final int numElements = 10000;
+ final int numTaskTotal = PARALLELISM * 5;
- DataStream<Long> stream =
- env.addSource(new GeneratingSourceFunction(numElements,
numTaskTotal));
+ DataStream<Long> stream =
+ env.addSource(new GeneratingSourceFunction(numElements,
numTaskTotal));
- stream
- // -------------- first vertex, chained to the src
----------------
- .filter(new LongRichFilterFunction())
+ stream
+ // -------------- first vertex, chained to the src
----------------
+ .filter(new LongRichFilterFunction())
- // -------------- second vertex, applying the co-map
----------------
- .connect(stream)
- .flatMap(new LeftIdentityCoRichFlatMapFunction())
+ // -------------- second vertex, applying the co-map
----------------
+ .connect(stream)
+ .flatMap(new LeftIdentityCoRichFlatMapFunction())
- // -------------- third vertex - the stateful one that
also fails
- // ----------------
- .map(new IdentityMapFunction())
- .startNewChain()
+ // -------------- third vertex - the stateful one that also
fails
+ // ----------------
+ .map(new IdentityMapFunction())
+ .startNewChain()
- // -------------- fourth vertex - reducer and the sink
----------------
- .keyBy(x -> x.f0)
- .reduce(new OnceFailingReducer(numElements))
- .sinkTo(new DiscardingSink<>());
+ // -------------- fourth vertex - reducer and the sink
----------------
+ .keyBy(x -> x.f0)
+ .reduce(new OnceFailingReducer(numElements))
+ .sinkTo(new DiscardingSink<>());
- env.execute();
+ env.execute();
- final long failureCheckpointID =
OnceFailingReducer.failureCheckpointID;
- assertNotEquals(0L, failureCheckpointID);
+ final long failureCheckpointID =
OnceFailingReducer.failureCheckpointID;
+ assertNotEquals(0L, failureCheckpointID);
- List<List<Long>[]> allLists =
- Arrays.asList(
- GeneratingSourceFunction.COMPLETED_CHECKPOINTS,
- LongRichFilterFunction.COMPLETED_CHECKPOINTS,
-
LeftIdentityCoRichFlatMapFunction.COMPLETED_CHECKPOINTS,
- IdentityMapFunction.COMPLETED_CHECKPOINTS,
- OnceFailingReducer.COMPLETED_CHECKPOINTS);
+ List<List<Long>[]> allLists =
+ Arrays.asList(
+ GeneratingSourceFunction.COMPLETED_CHECKPOINTS,
+ LongRichFilterFunction.COMPLETED_CHECKPOINTS,
+
LeftIdentityCoRichFlatMapFunction.COMPLETED_CHECKPOINTS,
+ IdentityMapFunction.COMPLETED_CHECKPOINTS,
+ OnceFailingReducer.COMPLETED_CHECKPOINTS);
- for (List<Long>[] parallelNotifications : allLists) {
- for (List<Long> notifications : parallelNotifications) {
+ for (List<Long>[] parallelNotifications : allLists) {
+ for (List<Long> notifications : parallelNotifications) {
- assertTrue(
- "No checkpoint notification was received.",
notifications.size() > 0);
+ assertTrue("No checkpoint notification was received.",
notifications.size() > 0);
- assertFalse(
- "Failure checkpoint was marked as completed.",
- notifications.contains(failureCheckpointID));
+ assertFalse(
+ "Failure checkpoint was marked as completed.",
+ notifications.contains(failureCheckpointID));
- assertFalse(
- "No checkpoint received after failure.",
- notifications.get(notifications.size() - 1) ==
failureCheckpointID);
+ assertFalse(
+ "No checkpoint received after failure.",
+ notifications.get(notifications.size() - 1) ==
failureCheckpointID);
- assertTrue(
- "Checkpoint notification was received multiple
times",
- notifications.size() == new
HashSet<Long>(notifications).size());
- }
+ assertTrue(
+ "Checkpoint notification was received multiple times",
+ notifications.size() == new
HashSet<Long>(notifications).size());
}
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
}
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index a0e13ef9d50..61de82d4139 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -123,28 +123,22 @@ public abstract class StreamFaultToleranceTestBase
extends TestLogger {
*/
@Test
public void runCheckpointedProgram() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+ env.enableCheckpointing(500);
+ RestartStrategyUtils.configureFixedDelayRestartStrategy(env,
Integer.MAX_VALUE, 0L);
+
+ testProgram(env);
+
+ JobGraph jobGraph = env.getStreamGraph().getJobGraph();
try {
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(PARALLELISM);
- env.enableCheckpointing(500);
- RestartStrategyUtils.configureFixedDelayRestartStrategy(env,
Integer.MAX_VALUE, 0L);
-
- testProgram(env);
-
- JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- try {
- submitJobAndWaitForResult(
- cluster.getClusterClient(), jobGraph,
getClass().getClassLoader());
- } catch (Exception e) {
- Assert.assertTrue(
- ExceptionUtils.findThrowable(e,
SuccessException.class).isPresent());
- }
-
- postSubmit();
+ submitJobAndWaitForResult(
+ cluster.getClusterClient(), jobGraph,
getClass().getClassLoader());
} catch (Exception e) {
- e.printStackTrace();
- Assert.fail(e.getMessage());
+ Assert.assertTrue(ExceptionUtils.findThrowable(e,
SuccessException.class).isPresent());
}
+
+ postSubmit();
}
//
--------------------------------------------------------------------------------------------
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
index cb93a298e04..e6917f21a8f 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
@@ -87,16 +87,10 @@ public class TimestampedFileInputSplitTest extends
TestLogger {
Assert.assertTrue(richThirdSplit.compareTo(richForthSplit) < 0);
}
- @Test
+ @Test(expected = IllegalArgumentException.class)
public void testIllegalArgument() {
- try {
- new TimestampedFileInputSplit(
- -10, 2, new Path("test"), 0, 100, null); // invalid
modification time
- } catch (Exception e) {
- if (!(e instanceof IllegalArgumentException)) {
- Assert.fail(e.getMessage());
- }
- }
+ new TimestampedFileInputSplit(
+ -10, 2, new Path("test"), 0, 100, null); // invalid
modification time
}
@Test
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/example/client/LocalExecutorITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/example/client/LocalExecutorITCase.java
index 7cfab00d5e3..e005e39cff3 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/example/client/LocalExecutorITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/example/client/LocalExecutorITCase.java
@@ -36,7 +36,6 @@ import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.testfunctions.Tokenizer;
import org.apache.flink.util.TestLogger;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -67,32 +66,26 @@ public class LocalExecutorITCase extends TestLogger {
}
@Test(timeout = 60_000)
- public void testLocalExecutorWithWordCount() throws InterruptedException {
- try {
- // set up the files
- File inFile = File.createTempFile("wctext", ".in");
- File outFile = File.createTempFile("wctext", ".out");
- inFile.deleteOnExit();
- outFile.deleteOnExit();
-
- try (FileWriter fw = new FileWriter(inFile)) {
- fw.write(WordCountData.TEXT);
- }
-
- final Configuration config = new Configuration();
- config.set(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
- config.set(DeploymentOptions.ATTACHED, true);
-
- StreamGraph wcStreamGraph = getWordCountStreamGraph(inFile,
outFile, parallelism);
- JobClient jobClient =
- executor.execute(wcStreamGraph, config,
ClassLoader.getSystemClassLoader())
- .get();
- jobClient.getJobExecutionResult().get();
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail(e.getMessage());
+ public void testLocalExecutorWithWordCount() throws Exception {
+ // set up the files
+ File inFile = File.createTempFile("wctext", ".in");
+ File outFile = File.createTempFile("wctext", ".out");
+ inFile.deleteOnExit();
+ outFile.deleteOnExit();
+
+ try (FileWriter fw = new FileWriter(inFile)) {
+ fw.write(WordCountData.TEXT);
}
+ final Configuration config = new Configuration();
+ config.set(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
+ config.set(DeploymentOptions.ATTACHED, true);
+
+ StreamGraph wcStreamGraph = getWordCountStreamGraph(inFile, outFile,
parallelism);
+ JobClient jobClient =
+ executor.execute(wcStreamGraph, config,
ClassLoader.getSystemClassLoader()).get();
+ jobClient.getJobExecutionResult().get();
+
assertThat(miniCluster.isRunning(), is(false));
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/io/InputOutputITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/io/InputOutputITCase.java
index 124348ac5a4..c1ebd8fea0d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/io/InputOutputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/io/InputOutputITCase.java
@@ -24,8 +24,6 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.sink.legacy.OutputFormatSinkFunction;
import org.apache.flink.test.util.JavaProgramTestBaseJUnit4;
-import static org.junit.Assert.fail;
-
/**
* Tests for non rich DataSource and DataSink input output formats being
correctly used at runtime.
*/
@@ -38,12 +36,7 @@ public class InputOutputITCase extends
JavaProgramTestBaseJUnit4 {
TestNonRichOutputFormat output = new TestNonRichOutputFormat();
env.createInput(new TestNonRichInputFormat())
.addSink(new OutputFormatSinkFunction<>(output));
- try {
- env.execute();
- } catch (Exception e) {
- // we didn't break anything by making everything rich.
- e.printStackTrace();
- fail(e.getMessage());
- }
+ env.execute();
+ // we didn't break anything by making everything rich.
}
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
index 720b99cb5cd..32281cdcfb3 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
@@ -40,7 +40,6 @@ import java.io.IOException;
import static org.apache.flink.util.ExceptionUtils.findThrowable;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
/**
* Test for proper error messages in case user-defined serialization is broken
and detected in the
@@ -67,7 +66,7 @@ public class CustomSerializationITCase extends TestLogger {
}
@Test
- public void testIncorrectSerializer1() {
+ public void testIncorrectSerializer1() throws Exception {
try {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARLLELISM);
@@ -93,14 +92,11 @@ public class CustomSerializationITCase extends TestLogger {
.getMessage()
.contains("broken
serialization."))
.isPresent());
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
}
}
@Test
- public void testIncorrectSerializer2() {
+ public void testIncorrectSerializer2() throws Exception {
try {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARLLELISM);
@@ -126,14 +122,11 @@ public class CustomSerializationITCase extends TestLogger
{
.getMessage()
.contains("broken
serialization."))
.isPresent());
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
}
}
@Test
- public void testIncorrectSerializer3() {
+ public void testIncorrectSerializer3() throws Exception {
try {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARLLELISM);
@@ -159,14 +152,11 @@ public class CustomSerializationITCase extends TestLogger
{
.getMessage()
.contains("broken
serialization."))
.isPresent());
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
}
}
@Test
- public void testIncorrectSerializer4() {
+ public void testIncorrectSerializer4() throws Exception {
try {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARLLELISM);
@@ -192,9 +182,6 @@ public class CustomSerializationITCase extends TestLogger {
.getMessage()
.contains("broken
serialization."))
.isPresent());
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
}
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
index 6febe2ea473..c8835648486 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
@@ -63,87 +63,70 @@ public class MiscellaneousIssuesITCase extends TestLogger {
.build());
@Test
- public void testNullValues() {
- try {
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- DataStream<String> data =
- env.fromData("hallo")
- .map(
- new MapFunction<String, String>() {
- @Override
- public String map(String value) throws
Exception {
- return null;
- }
- });
- data.sinkTo(
- FileSink.forRowFormat(
- new Path("/tmp/myTest"), new
SimpleStringEncoder<String>())
- .build());
+ public void testNullValues() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ DataStream<String> data =
+ env.fromData("hallo")
+ .map(
+ new MapFunction<String, String>() {
+ @Override
+ public String map(String value) throws
Exception {
+ return null;
+ }
+ });
+ data.sinkTo(
+ FileSink.forRowFormat(new Path("/tmp/myTest"), new
SimpleStringEncoder<String>())
+ .build());
- try {
- env.execute();
- fail("this should fail due to null values.");
- } catch (JobExecutionException e) {
- assertTrue(findThrowable(e,
NullPointerException.class).isPresent());
- }
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
+ try {
+ env.execute();
+ fail("this should fail due to null values.");
+ } catch (JobExecutionException e) {
+ assertTrue(findThrowable(e,
NullPointerException.class).isPresent());
}
}
@Test
- public void testDisjointDataflows() {
- try {
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(5);
+ public void testDisjointDataflows() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(5);
- // generate two different flows
- env.fromSequence(1, 10).sinkTo(new DiscardingSink<>());
- env.fromSequence(1, 10).sinkTo(new DiscardingSink<>());
-
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ // generate two different flows
+ env.fromSequence(1, 10).sinkTo(new DiscardingSink<>());
+ env.fromSequence(1, 10).sinkTo(new DiscardingSink<>());
}
@Test
- public void testAccumulatorsAfterNoOp() {
+ public void testAccumulatorsAfterNoOp() throws Exception {
final String accName = "test_accumulator";
- try {
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(6);
-
- env.fromSequence(1, 1000000)
- .rebalance()
- .flatMap(
- new RichFlatMapFunction<Long, Long>() {
-
- private LongCounter counter;
-
- @Override
- public void open(OpenContext openContext) {
- counter =
getRuntimeContext().getLongCounter(accName);
- }
-
- @Override
- public void flatMap(Long value,
Collector<Long> out) {
- counter.add(1L);
- }
- })
- .sinkTo(new DiscardingSink<>());
-
- JobExecutionResult result = env.execute();
-
- assertEquals(1000000L,
result.getAllAccumulatorResults().get(accName));
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(6);
+
+ env.fromSequence(1, 1000000)
+ .rebalance()
+ .flatMap(
+ new RichFlatMapFunction<Long, Long>() {
+
+ private LongCounter counter;
+
+ @Override
+ public void open(OpenContext openContext) {
+ counter =
getRuntimeContext().getLongCounter(accName);
+ }
+
+ @Override
+ public void flatMap(Long value, Collector<Long>
out) {
+ counter.add(1L);
+ }
+ })
+ .sinkTo(new DiscardingSink<>());
+
+ JobExecutionResult result = env.execute();
+
+ assertEquals(1000000L, result.getAllAccumulatorResults().get(accName));
}
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 7a6e353d9d6..b5389bf03a2 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -226,13 +226,11 @@ abstract class
AbstractTaskManagerProcessFailureRecoveryTest {
// all seems well :-)
} catch (Exception e) {
- e.printStackTrace();
printProcessLog("TaskManager 1", taskManagerProcess1);
printProcessLog("TaskManager 2", taskManagerProcess2);
printProcessLog("TaskManager 3", taskManagerProcess3);
- fail(e.getMessage());
+ throw e;
} catch (Error e) {
- e.printStackTrace();
printProcessLog("TaskManager 1", taskManagerProcess1);
printProcessLog("TaskManager 2", taskManagerProcess2);
printProcessLog("TaskManager 3", taskManagerProcess3);
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
index a5d0215a959..99eb787731e 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
@@ -51,8 +51,6 @@ import java.net.ServerSocket;
import java.util.Enumeration;
import java.util.List;
-import static org.junit.Assert.fail;
-
/** Test proper handling of IPv6 address literals in URLs. */
@SuppressWarnings("serial")
public class IPv6HostnamesITCase extends TestLogger {
@@ -83,52 +81,44 @@ public class IPv6HostnamesITCase extends TestLogger {
}
@Test
- public void testClusterWithIPv6host() {
- try {
-
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
-
- // get input data
- DataStream<String> text =
env.fromData(WordCountData.TEXT.split("\n"));
-
- DataStream<Tuple2<String, Integer>> counts =
- text.flatMap(
- new FlatMapFunction<String, Tuple2<String,
Integer>>() {
- @Override
- public void flatMap(
- String value,
- Collector<Tuple2<String,
Integer>> out)
- throws Exception {
- for (String token :
value.toLowerCase().split("\\W+")) {
- if (token.length() > 0) {
- out.collect(
- new Tuple2<String,
Integer>(token, 1));
- }
+ public void testClusterWithIPv6host() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
+
+ // get input data
+ DataStream<String> text = env.fromData(WordCountData.TEXT.split("\n"));
+
+ DataStream<Tuple2<String, Integer>> counts =
+ text.flatMap(
+ new FlatMapFunction<String, Tuple2<String,
Integer>>() {
+ @Override
+ public void flatMap(
+ String value,
Collector<Tuple2<String, Integer>> out)
+ throws Exception {
+ for (String token :
value.toLowerCase().split("\\W+")) {
+ if (token.length() > 0) {
+ out.collect(new Tuple2<String,
Integer>(token, 1));
}
}
- })
- .keyBy(x -> x.f0)
-
.window(GlobalWindows.createWithEndOfStreamTrigger())
- .reduce(
- new ReduceFunction<Tuple2<String,
Integer>>() {
- @Override
- public Tuple2<String, Integer> reduce(
- Tuple2<String, Integer> value1,
- Tuple2<String, Integer> value2)
- throws Exception {
- return Tuple2.of(value1.f0,
value1.f1 + value2.f1);
- }
- });
-
- List<Tuple2<String, Integer>> result =
- CollectionUtil.iteratorToList(counts.executeAndCollect());
-
- TestBaseUtils.compareResultAsText(result,
WordCountData.COUNTS_AS_TUPLES);
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ }
+ })
+ .keyBy(x -> x.f0)
+ .window(GlobalWindows.createWithEndOfStreamTrigger())
+ .reduce(
+ new ReduceFunction<Tuple2<String, Integer>>() {
+ @Override
+ public Tuple2<String, Integer> reduce(
+ Tuple2<String, Integer> value1,
+ Tuple2<String, Integer> value2)
+ throws Exception {
+ return Tuple2.of(value1.f0, value1.f1
+ value2.f1);
+ }
+ });
+
+ List<Tuple2<String, Integer>> result =
+ CollectionUtil.iteratorToList(counts.executeAndCollect());
+
+ TestBaseUtils.compareResultAsText(result,
WordCountData.COUNTS_AS_TUPLES);
}
private Inet6Address getLocalIPv6Address() {
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/state/StateHandleSerializationTest.java
b/flink-tests/src/test/java/org/apache/flink/test/state/StateHandleSerializationTest.java
index f31dc9e7370..7019f775044 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/state/StateHandleSerializationTest.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/state/StateHandleSerializationTest.java
@@ -36,22 +36,17 @@ import static org.junit.Assert.fail;
public class StateHandleSerializationTest {
@Test
- public void ensureStateHandlesHaveSerialVersionUID() {
- try {
- Reflections reflections = new Reflections("org.apache.flink");
+ public void ensureStateHandlesHaveSerialVersionUID() throws Exception {
+ Reflections reflections = new Reflections("org.apache.flink");
- // check all state handles
+ // check all state handles
- @SuppressWarnings("unchecked")
- Set<Class<?>> stateHandleImplementations =
- (Set<Class<?>>) (Set<?>)
reflections.getSubTypesOf(StateObject.class);
+ @SuppressWarnings("unchecked")
+ Set<Class<?>> stateHandleImplementations =
+ (Set<Class<?>>) (Set<?>)
reflections.getSubTypesOf(StateObject.class);
- for (Class<?> clazz : stateHandleImplementations) {
- validataSerialVersionUID(clazz);
- }
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
+ for (Class<?> clazz : stateHandleImplementations) {
+ validataSerialVersionUID(clazz);
}
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java
index 6c60f12b705..736670f6283 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java
@@ -80,7 +80,7 @@ public class PartitionerITCase extends AbstractTestBaseJUnit4
{
}
@Test
- public void partitionerTest() {
+ public void partitionerTest() throws Exception {
TestListResultSink<Tuple2<Integer, String>> hashPartitionResultSink =
new TestListResultSink<Tuple2<Integer, String>>();
@@ -145,12 +145,7 @@ public class PartitionerITCase extends
AbstractTestBaseJUnit4 {
// partition global
src.global().map(new
SubtaskIndexAssigner()).addSink(globalPartitionResultSink);
- try {
- env.execute();
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ env.execute();
List<Tuple2<Integer, String>> hashPartitionResult =
hashPartitionResultSink.getResult();
List<Tuple2<Integer, String>> customPartitionResult =
customPartitionResultSink.getResult();