This is an automated email from the ASF dual-hosted git repository.

ruanhang1993 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 583ce7140c6 [FLINK-38488][tests] Use 'throws Exception' instead of 
try-catch-fail in tests in flink-tests (#27092)
583ce7140c6 is described below

commit 583ce7140c669ce6537d6ee76ed1ba331892aced
Author: Mingliang Liu <[email protected]>
AuthorDate: Thu Nov 6 00:51:25 2025 -0800

    [FLINK-38488][tests] Use 'throws Exception' instead of try-catch-fail in 
tests in flink-tests (#27092)
---
 .../EventTimeAllWindowCheckpointingITCase.java     | 549 ++++++++-------
 .../EventTimeWindowCheckpointingITCase.java        | 733 ++++++++++-----------
 .../ProcessingTimeWindowCheckpointingITCase.java   | 352 +++++-----
 .../test/checkpointing/RegionFailoverITCase.java   |  15 +-
 .../StreamCheckpointNotifierITCase.java            |  98 ++-
 .../StreamFaultToleranceTestBase.java              |  32 +-
 .../TimestampedFileInputSplitTest.java             |  12 +-
 .../test/example/client/LocalExecutorITCase.java   |  43 +-
 .../apache/flink/test/io/InputOutputITCase.java    |  11 +-
 .../flink/test/misc/CustomSerializationITCase.java |  21 +-
 .../flink/test/misc/MiscellaneousIssuesITCase.java | 123 ++--
 ...tractTaskManagerProcessFailureRecoveryTest.java |   4 +-
 .../flink/test/runtime/IPv6HostnamesITCase.java    |  82 +--
 .../test/state/StateHandleSerializationTest.java   |  21 +-
 .../test/streaming/runtime/PartitionerITCase.java  |   9 +-
 15 files changed, 968 insertions(+), 1137 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index 003308e9a6c..5ad625971d5 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -47,7 +47,6 @@ import java.time.Duration;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * This verifies that checkpointing works correctly with event time windows.
@@ -79,320 +78,300 @@ public class EventTimeAllWindowCheckpointingITCase 
extends TestLogger {
     // ------------------------------------------------------------------------
 
     @Test
-    public void testTumblingTimeWindow() {
+    public void testTumblingTimeWindow() throws Exception {
         final int numElementsPerKey = 3000;
         final int windowSize = 100;
         final int numKeys = 1;
 
-        try {
-            StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-            env.setParallelism(PARALLELISM);
-            env.enableCheckpointing(100);
-            RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 
0L);
-
-            env.addSource(
-                            new FailingSource(
-                                    new 
EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(
-                                            numKeys, windowSize),
-                                    numElementsPerKey))
-                    .rebalance()
-                    
.windowAll(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize)))
-                    .apply(
-                            new RichAllWindowFunction<
-                                    Tuple2<Long, IntType>,
-                                    Tuple4<Long, Long, Long, IntType>,
-                                    TimeWindow>() {
-
-                                private boolean open = false;
-
-                                @Override
-                                public void open(OpenContext openContext) {
-                                    assertEquals(
-                                            1,
-                                            getRuntimeContext()
-                                                    .getTaskInfo()
-                                                    
.getNumberOfParallelSubtasks());
-                                    open = true;
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+        env.enableCheckpointing(100);
+        RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L);
+
+        env.addSource(
+                        new FailingSource(
+                                new 
EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(
+                                        numKeys, windowSize),
+                                numElementsPerKey))
+                .rebalance()
+                
.windowAll(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize)))
+                .apply(
+                        new RichAllWindowFunction<
+                                Tuple2<Long, IntType>,
+                                Tuple4<Long, Long, Long, IntType>,
+                                TimeWindow>() {
+
+                            private boolean open = false;
+
+                            @Override
+                            public void open(OpenContext openContext) {
+                                assertEquals(
+                                        1,
+                                        getRuntimeContext()
+                                                .getTaskInfo()
+                                                
.getNumberOfParallelSubtasks());
+                                open = true;
+                            }
+
+                            @Override
+                            public void apply(
+                                    TimeWindow window,
+                                    Iterable<Tuple2<Long, IntType>> values,
+                                    Collector<Tuple4<Long, Long, Long, 
IntType>> out) {
+
+                                // validate that the function has been opened 
properly
+                                assertTrue(open);
+
+                                int sum = 0;
+                                long key = -1;
+
+                                for (Tuple2<Long, IntType> value : values) {
+                                    sum += value.f1.value;
+                                    key = value.f0;
                                 }
-
-                                @Override
-                                public void apply(
-                                        TimeWindow window,
-                                        Iterable<Tuple2<Long, IntType>> values,
-                                        Collector<Tuple4<Long, Long, Long, 
IntType>> out) {
-
-                                    // validate that the function has been 
opened properly
-                                    assertTrue(open);
-
-                                    int sum = 0;
-                                    long key = -1;
-
-                                    for (Tuple2<Long, IntType> value : values) 
{
-                                        sum += value.f1.value;
-                                        key = value.f0;
-                                    }
-                                    out.collect(
-                                            new Tuple4<>(
-                                                    key,
-                                                    window.getStart(),
-                                                    window.getEnd(),
-                                                    new IntType(sum)));
-                                }
-                            })
-                    .addSink(
-                            new ValidatingSink<>(
-                                    new 
EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(
-                                            numElementsPerKey),
-                                    new 
EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(
-                                            numKeys, numElementsPerKey, 
windowSize)))
-                    .setParallelism(1);
-
-            env.execute("Tumbling Window Test");
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+                                out.collect(
+                                        new Tuple4<>(
+                                                key,
+                                                window.getStart(),
+                                                window.getEnd(),
+                                                new IntType(sum)));
+                            }
+                        })
+                .addSink(
+                        new ValidatingSink<>(
+                                new 
EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(
+                                        numElementsPerKey),
+                                new 
EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(
+                                        numKeys, numElementsPerKey, 
windowSize)))
+                .setParallelism(1);
+
+        env.execute("Tumbling Window Test");
     }
 
     @Test
-    public void testSlidingTimeWindow() {
+    public void testSlidingTimeWindow() throws Exception {
         final int numElementsPerKey = 3000;
         final int windowSize = 1000;
         final int windowSlide = 100;
         final int numKeys = 1;
 
-        try {
-            StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-            env.setParallelism(PARALLELISM);
-            env.enableCheckpointing(100);
-            RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 
0L);
-
-            env.addSource(
-                            new FailingSource(
-                                    new 
EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(
-                                            numKeys, windowSlide),
-                                    numElementsPerKey))
-                    .rebalance()
-                    .windowAll(
-                            SlidingEventTimeWindows.of(
-                                    Duration.ofMillis(windowSize), 
Duration.ofMillis(windowSlide)))
-                    .apply(
-                            new RichAllWindowFunction<
-                                    Tuple2<Long, IntType>,
-                                    Tuple4<Long, Long, Long, IntType>,
-                                    TimeWindow>() {
-
-                                private boolean open = false;
-
-                                @Override
-                                public void open(OpenContext openContext) {
-                                    assertEquals(
-                                            1,
-                                            getRuntimeContext()
-                                                    .getTaskInfo()
-                                                    
.getNumberOfParallelSubtasks());
-                                    open = true;
-                                }
-
-                                @Override
-                                public void apply(
-                                        TimeWindow window,
-                                        Iterable<Tuple2<Long, IntType>> values,
-                                        Collector<Tuple4<Long, Long, Long, 
IntType>> out) {
-
-                                    // validate that the function has been 
opened properly
-                                    assertTrue(open);
-
-                                    int sum = 0;
-                                    long key = -1;
-
-                                    for (Tuple2<Long, IntType> value : values) 
{
-                                        sum += value.f1.value;
-                                        key = value.f0;
-                                    }
-                                    out.collect(
-                                            new Tuple4<>(
-                                                    key,
-                                                    window.getStart(),
-                                                    window.getEnd(),
-                                                    new IntType(sum)));
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+        env.enableCheckpointing(100);
+        RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L);
+
+        env.addSource(
+                        new FailingSource(
+                                new 
EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(
+                                        numKeys, windowSlide),
+                                numElementsPerKey))
+                .rebalance()
+                .windowAll(
+                        SlidingEventTimeWindows.of(
+                                Duration.ofMillis(windowSize), 
Duration.ofMillis(windowSlide)))
+                .apply(
+                        new RichAllWindowFunction<
+                                Tuple2<Long, IntType>,
+                                Tuple4<Long, Long, Long, IntType>,
+                                TimeWindow>() {
+
+                            private boolean open = false;
+
+                            @Override
+                            public void open(OpenContext openContext) {
+                                assertEquals(
+                                        1,
+                                        getRuntimeContext()
+                                                .getTaskInfo()
+                                                
.getNumberOfParallelSubtasks());
+                                open = true;
+                            }
+
+                            @Override
+                            public void apply(
+                                    TimeWindow window,
+                                    Iterable<Tuple2<Long, IntType>> values,
+                                    Collector<Tuple4<Long, Long, Long, 
IntType>> out) {
+
+                                // validate that the function has been opened 
properly
+                                assertTrue(open);
+
+                                int sum = 0;
+                                long key = -1;
+
+                                for (Tuple2<Long, IntType> value : values) {
+                                    sum += value.f1.value;
+                                    key = value.f0;
                                 }
-                            })
-                    .addSink(
-                            new ValidatingSink<>(
-                                    new 
EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(
-                                            numElementsPerKey),
-                                    new 
EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(
-                                            numKeys, numElementsPerKey, 
windowSlide)))
-                    .setParallelism(1);
-
-            env.execute("Sliding Window Test");
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+                                out.collect(
+                                        new Tuple4<>(
+                                                key,
+                                                window.getStart(),
+                                                window.getEnd(),
+                                                new IntType(sum)));
+                            }
+                        })
+                .addSink(
+                        new ValidatingSink<>(
+                                new 
EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(
+                                        numElementsPerKey),
+                                new 
EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(
+                                        numKeys, numElementsPerKey, 
windowSlide)))
+                .setParallelism(1);
+
+        env.execute("Sliding Window Test");
     }
 
     @Test
-    public void testPreAggregatedTumblingTimeWindow() {
+    public void testPreAggregatedTumblingTimeWindow() throws Exception {
         final int numElementsPerKey = 3000;
         final int windowSize = 100;
         final int numKeys = 1;
 
-        try {
-            StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-            env.setParallelism(PARALLELISM);
-            env.enableCheckpointing(100);
-            RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 
0L);
-
-            env.addSource(
-                            new FailingSource(
-                                    new 
EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(
-                                            numKeys, windowSize),
-                                    numElementsPerKey))
-                    .rebalance()
-                    
.windowAll(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize)))
-                    .reduce(
-                            new ReduceFunction<Tuple2<Long, IntType>>() {
-
-                                @Override
-                                public Tuple2<Long, IntType> reduce(
-                                        Tuple2<Long, IntType> a, Tuple2<Long, 
IntType> b) {
-
-                                    return new Tuple2<>(a.f0, new 
IntType(a.f1.value + b.f1.value));
-                                }
-                            },
-                            new RichAllWindowFunction<
-                                    Tuple2<Long, IntType>,
-                                    Tuple4<Long, Long, Long, IntType>,
-                                    TimeWindow>() {
-
-                                private boolean open = false;
-
-                                @Override
-                                public void open(OpenContext openContext) {
-                                    assertEquals(
-                                            1,
-                                            getRuntimeContext()
-                                                    .getTaskInfo()
-                                                    
.getNumberOfParallelSubtasks());
-                                    open = true;
-                                }
-
-                                @Override
-                                public void apply(
-                                        TimeWindow window,
-                                        Iterable<Tuple2<Long, IntType>> input,
-                                        Collector<Tuple4<Long, Long, Long, 
IntType>> out) {
-
-                                    // validate that the function has been 
opened properly
-                                    assertTrue(open);
-
-                                    for (Tuple2<Long, IntType> in : input) {
-                                        out.collect(
-                                                new Tuple4<>(
-                                                        in.f0,
-                                                        window.getStart(),
-                                                        window.getEnd(),
-                                                        in.f1));
-                                    }
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+        env.enableCheckpointing(100);
+        RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L);
+
+        env.addSource(
+                        new FailingSource(
+                                new 
EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(
+                                        numKeys, windowSize),
+                                numElementsPerKey))
+                .rebalance()
+                
.windowAll(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize)))
+                .reduce(
+                        new ReduceFunction<Tuple2<Long, IntType>>() {
+
+                            @Override
+                            public Tuple2<Long, IntType> reduce(
+                                    Tuple2<Long, IntType> a, Tuple2<Long, 
IntType> b) {
+
+                                return new Tuple2<>(a.f0, new 
IntType(a.f1.value + b.f1.value));
+                            }
+                        },
+                        new RichAllWindowFunction<
+                                Tuple2<Long, IntType>,
+                                Tuple4<Long, Long, Long, IntType>,
+                                TimeWindow>() {
+
+                            private boolean open = false;
+
+                            @Override
+                            public void open(OpenContext openContext) {
+                                assertEquals(
+                                        1,
+                                        getRuntimeContext()
+                                                .getTaskInfo()
+                                                
.getNumberOfParallelSubtasks());
+                                open = true;
+                            }
+
+                            @Override
+                            public void apply(
+                                    TimeWindow window,
+                                    Iterable<Tuple2<Long, IntType>> input,
+                                    Collector<Tuple4<Long, Long, Long, 
IntType>> out) {
+
+                                // validate that the function has been opened 
properly
+                                assertTrue(open);
+
+                                for (Tuple2<Long, IntType> in : input) {
+                                    out.collect(
+                                            new Tuple4<>(
+                                                    in.f0,
+                                                    window.getStart(),
+                                                    window.getEnd(),
+                                                    in.f1));
                                 }
-                            })
-                    .addSink(
-                            new ValidatingSink<>(
-                                    new 
EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(
-                                            numElementsPerKey),
-                                    new 
EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(
-                                            numKeys, numElementsPerKey, 
windowSize)))
-                    .setParallelism(1);
-
-            env.execute("PreAggregated Tumbling Window Test");
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+                            }
+                        })
+                .addSink(
+                        new ValidatingSink<>(
+                                new 
EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(
+                                        numElementsPerKey),
+                                new 
EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(
+                                        numKeys, numElementsPerKey, 
windowSize)))
+                .setParallelism(1);
+
+        env.execute("PreAggregated Tumbling Window Test");
     }
 
     @Test
-    public void testPreAggregatedSlidingTimeWindow() {
+    public void testPreAggregatedSlidingTimeWindow() throws Exception {
         final int numElementsPerKey = 3000;
         final int windowSize = 1000;
         final int windowSlide = 100;
         final int numKeys = 1;
 
-        try {
-            StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-            env.setParallelism(PARALLELISM);
-            env.enableCheckpointing(100);
-            RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 
0L);
-
-            env.addSource(
-                            new FailingSource(
-                                    new 
EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(
-                                            numKeys, windowSlide),
-                                    numElementsPerKey))
-                    .rebalance()
-                    .windowAll(
-                            SlidingEventTimeWindows.of(
-                                    Duration.ofMillis(windowSize), 
Duration.ofMillis(windowSlide)))
-                    .reduce(
-                            new ReduceFunction<Tuple2<Long, IntType>>() {
-
-                                @Override
-                                public Tuple2<Long, IntType> reduce(
-                                        Tuple2<Long, IntType> a, Tuple2<Long, 
IntType> b) {
-
-                                    return new Tuple2<>(a.f0, new 
IntType(a.f1.value + b.f1.value));
-                                }
-                            },
-                            new RichAllWindowFunction<
-                                    Tuple2<Long, IntType>,
-                                    Tuple4<Long, Long, Long, IntType>,
-                                    TimeWindow>() {
-
-                                private boolean open = false;
-
-                                @Override
-                                public void open(OpenContext openContext) {
-                                    assertEquals(
-                                            1,
-                                            getRuntimeContext()
-                                                    .getTaskInfo()
-                                                    
.getNumberOfParallelSubtasks());
-                                    open = true;
-                                }
-
-                                @Override
-                                public void apply(
-                                        TimeWindow window,
-                                        Iterable<Tuple2<Long, IntType>> input,
-                                        Collector<Tuple4<Long, Long, Long, 
IntType>> out) {
-
-                                    // validate that the function has been 
opened properly
-                                    assertTrue(open);
-
-                                    for (Tuple2<Long, IntType> in : input) {
-                                        out.collect(
-                                                new Tuple4<>(
-                                                        in.f0,
-                                                        window.getStart(),
-                                                        window.getEnd(),
-                                                        in.f1));
-                                    }
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+        env.enableCheckpointing(100);
+        RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L);
+
+        env.addSource(
+                        new FailingSource(
+                                new 
EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(
+                                        numKeys, windowSlide),
+                                numElementsPerKey))
+                .rebalance()
+                .windowAll(
+                        SlidingEventTimeWindows.of(
+                                Duration.ofMillis(windowSize), 
Duration.ofMillis(windowSlide)))
+                .reduce(
+                        new ReduceFunction<Tuple2<Long, IntType>>() {
+
+                            @Override
+                            public Tuple2<Long, IntType> reduce(
+                                    Tuple2<Long, IntType> a, Tuple2<Long, 
IntType> b) {
+
+                                return new Tuple2<>(a.f0, new 
IntType(a.f1.value + b.f1.value));
+                            }
+                        },
+                        new RichAllWindowFunction<
+                                Tuple2<Long, IntType>,
+                                Tuple4<Long, Long, Long, IntType>,
+                                TimeWindow>() {
+
+                            private boolean open = false;
+
+                            @Override
+                            public void open(OpenContext openContext) {
+                                assertEquals(
+                                        1,
+                                        getRuntimeContext()
+                                                .getTaskInfo()
+                                                
.getNumberOfParallelSubtasks());
+                                open = true;
+                            }
+
+                            @Override
+                            public void apply(
+                                    TimeWindow window,
+                                    Iterable<Tuple2<Long, IntType>> input,
+                                    Collector<Tuple4<Long, Long, Long, 
IntType>> out) {
+
+                                // validate that the function has been opened 
properly
+                                assertTrue(open);
+
+                                for (Tuple2<Long, IntType> in : input) {
+                                    out.collect(
+                                            new Tuple4<>(
+                                                    in.f0,
+                                                    window.getStart(),
+                                                    window.getEnd(),
+                                                    in.f1));
                                 }
-                            })
-                    .addSink(
-                            new ValidatingSink<>(
-                                    new 
EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(
-                                            numElementsPerKey),
-                                    new 
EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(
-                                            numKeys, numElementsPerKey, 
windowSlide)))
-                    .setParallelism(1);
-
-            env.execute("PreAggregated Sliding Window Test");
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+                            }
+                        })
+                .addSink(
+                        new ValidatingSink<>(
+                                new 
EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(
+                                        numElementsPerKey),
+                                new 
EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(
+                                        numKeys, numElementsPerKey, 
windowSlide)))
+                .setParallelism(1);
+
+        env.execute("PreAggregated Sliding Window Test");
     }
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index ca07e3a6ab2..161062e6826 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -76,7 +76,6 @@ import java.util.stream.Collectors;
 import static 
org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * This verifies that checkpointing works correctly with event time windows. 
This is more strict
@@ -308,435 +307,405 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
     // ------------------------------------------------------------------------
 
     @Test
-    public void testTumblingTimeWindow() {
+    public void testTumblingTimeWindow() throws Exception {
         final int numElementsPerKey = numElementsPerKey();
         final int windowSize = windowSize();
         final int numKeys = numKeys();
 
-        try {
-            StreamExecutionEnvironment env =
-                    
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
-            env.setParallelism(PARALLELISM);
-            env.enableCheckpointing(100);
-            RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 
0L);
-            env.getConfig().setUseSnapshotCompression(true);
-
-            env.addSource(
-                            new FailingSource(
-                                    new KeyedEventTimeGenerator(numKeys, 
windowSize),
-                                    numElementsPerKey))
-                    .rebalance()
-                    .keyBy(x -> x.f0)
-                    
.window(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize)))
-                    .apply(
-                            new RichWindowFunction<
-                                    Tuple2<Long, IntType>,
-                                    Tuple4<Long, Long, Long, IntType>,
-                                    Long,
-                                    TimeWindow>() {
-
-                                private boolean open = false;
-
-                                @Override
-                                public void open(OpenContext openContext) {
-                                    assertEquals(
-                                            PARALLELISM,
-                                            getRuntimeContext()
-                                                    .getTaskInfo()
-                                                    
.getNumberOfParallelSubtasks());
-                                    open = true;
+        StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+        env.setParallelism(PARALLELISM);
+        env.enableCheckpointing(100);
+        RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L);
+        env.getConfig().setUseSnapshotCompression(true);
+
+        env.addSource(
+                        new FailingSource(
+                                new KeyedEventTimeGenerator(numKeys, 
windowSize),
+                                numElementsPerKey))
+                .rebalance()
+                .keyBy(x -> x.f0)
+                
.window(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize)))
+                .apply(
+                        new RichWindowFunction<
+                                Tuple2<Long, IntType>,
+                                Tuple4<Long, Long, Long, IntType>,
+                                Long,
+                                TimeWindow>() {
+
+                            private boolean open = false;
+
+                            @Override
+                            public void open(OpenContext openContext) {
+                                assertEquals(
+                                        PARALLELISM,
+                                        getRuntimeContext()
+                                                .getTaskInfo()
+                                                
.getNumberOfParallelSubtasks());
+                                open = true;
+                            }
+
+                            @Override
+                            public void apply(
+                                    Long l,
+                                    TimeWindow window,
+                                    Iterable<Tuple2<Long, IntType>> values,
+                                    Collector<Tuple4<Long, Long, Long, 
IntType>> out) {
+
+                                // validate that the function has been opened 
properly
+                                assertTrue(open);
+
+                                int sum = 0;
+                                long key = -1;
+
+                                for (Tuple2<Long, IntType> value : values) {
+                                    sum += value.f1.value;
+                                    key = value.f0;
                                 }
 
-                                @Override
-                                public void apply(
-                                        Long l,
-                                        TimeWindow window,
-                                        Iterable<Tuple2<Long, IntType>> values,
-                                        Collector<Tuple4<Long, Long, Long, 
IntType>> out) {
-
-                                    // validate that the function has been 
opened properly
-                                    assertTrue(open);
-
-                                    int sum = 0;
-                                    long key = -1;
-
-                                    for (Tuple2<Long, IntType> value : values) 
{
-                                        sum += value.f1.value;
-                                        key = value.f0;
-                                    }
-
-                                    final Tuple4<Long, Long, Long, IntType> 
result =
-                                            new Tuple4<>(
-                                                    key,
-                                                    window.getStart(),
-                                                    window.getEnd(),
-                                                    new IntType(sum));
-                                    out.collect(result);
-                                }
-                            })
-                    .addSink(
-                            new ValidatingSink<>(
-                                    new 
SinkValidatorUpdateFun(numElementsPerKey),
-                                    new SinkValidatorCheckFun(
-                                            numKeys, numElementsPerKey, 
windowSize)))
-                    .setParallelism(1);
-
-            env.execute("Tumbling Window Test");
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+                                final Tuple4<Long, Long, Long, IntType> result 
=
+                                        new Tuple4<>(
+                                                key,
+                                                window.getStart(),
+                                                window.getEnd(),
+                                                new IntType(sum));
+                                out.collect(result);
+                            }
+                        })
+                .addSink(
+                        new ValidatingSink<>(
+                                new SinkValidatorUpdateFun(numElementsPerKey),
+                                new SinkValidatorCheckFun(numKeys, 
numElementsPerKey, windowSize)))
+                .setParallelism(1);
+
+        env.execute("Tumbling Window Test");
     }
 
     @Test
-    public void testTumblingTimeWindowWithKVStateMinMaxParallelism() {
+    public void testTumblingTimeWindowWithKVStateMinMaxParallelism() throws 
Exception {
         doTestTumblingTimeWindowWithKVState(PARALLELISM);
     }
 
     @Test
-    public void testTumblingTimeWindowWithKVStateMaxMaxParallelism() {
+    public void testTumblingTimeWindowWithKVStateMaxMaxParallelism() throws 
Exception {
         doTestTumblingTimeWindowWithKVState(1 << 15);
     }
 
-    public void doTestTumblingTimeWindowWithKVState(int maxParallelism) {
+    public void doTestTumblingTimeWindowWithKVState(int maxParallelism) throws 
Exception {
         final int numElementsPerKey = numElementsPerKey();
         final int windowSize = windowSize();
         final int numKeys = numKeys();
 
-        try {
-            StreamExecutionEnvironment env =
-                    
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
-            env.setParallelism(PARALLELISM);
-            env.setMaxParallelism(maxParallelism);
-            env.enableCheckpointing(100);
-            RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 
0L);
-            env.getConfig().setUseSnapshotCompression(true);
-
-            env.addSource(
-                            new FailingSource(
-                                    new KeyedEventTimeGenerator(numKeys, 
windowSize),
-                                    numElementsPerKey))
-                    .rebalance()
-                    .keyBy(x -> x.f0)
-                    
.window(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize)))
-                    .apply(
-                            new RichWindowFunction<
-                                    Tuple2<Long, IntType>,
-                                    Tuple4<Long, Long, Long, IntType>,
-                                    Long,
-                                    TimeWindow>() {
-
-                                private boolean open = false;
-
-                                private ValueState<Integer> count;
-
-                                @Override
-                                public void open(OpenContext openContext) {
-                                    assertEquals(
-                                            PARALLELISM,
-                                            getRuntimeContext()
-                                                    .getTaskInfo()
-                                                    
.getNumberOfParallelSubtasks());
-                                    open = true;
-                                    count =
-                                            getRuntimeContext()
-                                                    .getState(
-                                                            new 
ValueStateDescriptor<>(
-                                                                    "count", 
Integer.class, 0));
+        StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+        env.setParallelism(PARALLELISM);
+        env.setMaxParallelism(maxParallelism);
+        env.enableCheckpointing(100);
+        RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L);
+        env.getConfig().setUseSnapshotCompression(true);
+
+        env.addSource(
+                        new FailingSource(
+                                new KeyedEventTimeGenerator(numKeys, 
windowSize),
+                                numElementsPerKey))
+                .rebalance()
+                .keyBy(x -> x.f0)
+                
.window(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize)))
+                .apply(
+                        new RichWindowFunction<
+                                Tuple2<Long, IntType>,
+                                Tuple4<Long, Long, Long, IntType>,
+                                Long,
+                                TimeWindow>() {
+
+                            private boolean open = false;
+
+                            private ValueState<Integer> count;
+
+                            @Override
+                            public void open(OpenContext openContext) {
+                                assertEquals(
+                                        PARALLELISM,
+                                        getRuntimeContext()
+                                                .getTaskInfo()
+                                                
.getNumberOfParallelSubtasks());
+                                open = true;
+                                count =
+                                        getRuntimeContext()
+                                                .getState(
+                                                        new 
ValueStateDescriptor<>(
+                                                                "count", 
Integer.class, 0));
+                            }
+
+                            @Override
+                            public void apply(
+                                    Long l,
+                                    TimeWindow window,
+                                    Iterable<Tuple2<Long, IntType>> values,
+                                    Collector<Tuple4<Long, Long, Long, 
IntType>> out)
+                                    throws Exception {
+
+                                // the window count state starts with the key, 
so that we get
+                                // different count results for each key
+                                if (count.value() == 0) {
+                                    count.update(l.intValue());
                                 }
 
-                                @Override
-                                public void apply(
-                                        Long l,
-                                        TimeWindow window,
-                                        Iterable<Tuple2<Long, IntType>> values,
-                                        Collector<Tuple4<Long, Long, Long, 
IntType>> out)
-                                        throws Exception {
-
-                                    // the window count state starts with the 
key, so that we get
-                                    // different count results for each key
-                                    if (count.value() == 0) {
-                                        count.update(l.intValue());
-                                    }
-
-                                    // validate that the function has been 
opened properly
-                                    assertTrue(open);
-
-                                    count.update(count.value() + 1);
-                                    out.collect(
-                                            new Tuple4<>(
-                                                    l,
-                                                    window.getStart(),
-                                                    window.getEnd(),
-                                                    new 
IntType(count.value())));
-                                }
-                            })
-                    .addSink(
-                            new ValidatingSink<>(
-                                    new CountingSinkValidatorUpdateFun(),
-                                    new SinkValidatorCheckFun(
-                                            numKeys, numElementsPerKey, 
windowSize)))
-                    .setParallelism(1);
-
-            env.execute("Tumbling Window Test");
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+                                // validate that the function has been opened 
properly
+                                assertTrue(open);
+
+                                count.update(count.value() + 1);
+                                out.collect(
+                                        new Tuple4<>(
+                                                l,
+                                                window.getStart(),
+                                                window.getEnd(),
+                                                new IntType(count.value())));
+                            }
+                        })
+                .addSink(
+                        new ValidatingSink<>(
+                                new CountingSinkValidatorUpdateFun(),
+                                new SinkValidatorCheckFun(numKeys, 
numElementsPerKey, windowSize)))
+                .setParallelism(1);
+
+        env.execute("Tumbling Window Test");
     }
 
     @Test
-    public void testSlidingTimeWindow() {
+    public void testSlidingTimeWindow() throws Exception {
         final int numElementsPerKey = numElementsPerKey();
         final int windowSize = windowSize();
         final int windowSlide = windowSlide();
         final int numKeys = numKeys();
 
-        try {
-            StreamExecutionEnvironment env =
-                    
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
-            env.setMaxParallelism(2 * PARALLELISM);
-            env.setParallelism(PARALLELISM);
-            env.enableCheckpointing(100);
-            RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 
0L);
-            env.getConfig().setUseSnapshotCompression(true);
-
-            env.addSource(
-                            new FailingSource(
-                                    new KeyedEventTimeGenerator(numKeys, 
windowSlide),
-                                    numElementsPerKey))
-                    .rebalance()
-                    .keyBy(x -> x.f0)
-                    .window(
-                            SlidingEventTimeWindows.of(
-                                    Duration.ofMillis(windowSize), 
Duration.ofMillis(windowSlide)))
-                    .apply(
-                            new RichWindowFunction<
-                                    Tuple2<Long, IntType>,
-                                    Tuple4<Long, Long, Long, IntType>,
-                                    Long,
-                                    TimeWindow>() {
-
-                                private boolean open = false;
-
-                                @Override
-                                public void open(OpenContext openContext) {
-                                    assertEquals(
-                                            PARALLELISM,
-                                            getRuntimeContext()
-                                                    .getTaskInfo()
-                                                    
.getNumberOfParallelSubtasks());
-                                    open = true;
-                                }
-
-                                @Override
-                                public void apply(
-                                        Long l,
-                                        TimeWindow window,
-                                        Iterable<Tuple2<Long, IntType>> values,
-                                        Collector<Tuple4<Long, Long, Long, 
IntType>> out) {
-
-                                    // validate that the function has been 
opened properly
-                                    assertTrue(open);
-
-                                    int sum = 0;
-                                    long key = -1;
-
-                                    for (Tuple2<Long, IntType> value : values) 
{
-                                        sum += value.f1.value;
-                                        key = value.f0;
-                                    }
-                                    final Tuple4<Long, Long, Long, IntType> 
output =
-                                            new Tuple4<>(
-                                                    key,
-                                                    window.getStart(),
-                                                    window.getEnd(),
-                                                    new IntType(sum));
-                                    out.collect(output);
+        StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+        env.setMaxParallelism(2 * PARALLELISM);
+        env.setParallelism(PARALLELISM);
+        env.enableCheckpointing(100);
+        RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L);
+        env.getConfig().setUseSnapshotCompression(true);
+
+        env.addSource(
+                        new FailingSource(
+                                new KeyedEventTimeGenerator(numKeys, 
windowSlide),
+                                numElementsPerKey))
+                .rebalance()
+                .keyBy(x -> x.f0)
+                .window(
+                        SlidingEventTimeWindows.of(
+                                Duration.ofMillis(windowSize), 
Duration.ofMillis(windowSlide)))
+                .apply(
+                        new RichWindowFunction<
+                                Tuple2<Long, IntType>,
+                                Tuple4<Long, Long, Long, IntType>,
+                                Long,
+                                TimeWindow>() {
+
+                            private boolean open = false;
+
+                            @Override
+                            public void open(OpenContext openContext) {
+                                assertEquals(
+                                        PARALLELISM,
+                                        getRuntimeContext()
+                                                .getTaskInfo()
+                                                
.getNumberOfParallelSubtasks());
+                                open = true;
+                            }
+
+                            @Override
+                            public void apply(
+                                    Long l,
+                                    TimeWindow window,
+                                    Iterable<Tuple2<Long, IntType>> values,
+                                    Collector<Tuple4<Long, Long, Long, 
IntType>> out) {
+
+                                // validate that the function has been opened 
properly
+                                assertTrue(open);
+
+                                int sum = 0;
+                                long key = -1;
+
+                                for (Tuple2<Long, IntType> value : values) {
+                                    sum += value.f1.value;
+                                    key = value.f0;
                                 }
-                            })
-                    .addSink(
-                            new ValidatingSink<>(
-                                    new 
SinkValidatorUpdateFun(numElementsPerKey),
-                                    new SinkValidatorCheckFun(
-                                            numKeys, numElementsPerKey, 
windowSlide)))
-                    .setParallelism(1);
-
-            env.execute("Tumbling Window Test");
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+                                final Tuple4<Long, Long, Long, IntType> output 
=
+                                        new Tuple4<>(
+                                                key,
+                                                window.getStart(),
+                                                window.getEnd(),
+                                                new IntType(sum));
+                                out.collect(output);
+                            }
+                        })
+                .addSink(
+                        new ValidatingSink<>(
+                                new SinkValidatorUpdateFun(numElementsPerKey),
+                                new SinkValidatorCheckFun(numKeys, 
numElementsPerKey, windowSlide)))
+                .setParallelism(1);
+
+        env.execute("Tumbling Window Test");
     }
 
     @Test
-    public void testPreAggregatedTumblingTimeWindow() {
+    public void testPreAggregatedTumblingTimeWindow() throws Exception {
         final int numElementsPerKey = numElementsPerKey();
         final int windowSize = windowSize();
         final int numKeys = numKeys();
 
-        try {
-            StreamExecutionEnvironment env =
-                    
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
-            env.setParallelism(PARALLELISM);
-            env.enableCheckpointing(100);
-            RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 
0L);
-            env.getConfig().setUseSnapshotCompression(true);
-
-            env.addSource(
-                            new FailingSource(
-                                    new KeyedEventTimeGenerator(numKeys, 
windowSize),
-                                    numElementsPerKey))
-                    .rebalance()
-                    .keyBy(x -> x.f0)
-                    
.window(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize)))
-                    .reduce(
-                            new ReduceFunction<Tuple2<Long, IntType>>() {
-
-                                @Override
-                                public Tuple2<Long, IntType> reduce(
-                                        Tuple2<Long, IntType> a, Tuple2<Long, 
IntType> b) {
-                                    return new Tuple2<>(a.f0, new 
IntType(a.f1.value + b.f1.value));
-                                }
-                            },
-                            new RichWindowFunction<
-                                    Tuple2<Long, IntType>,
-                                    Tuple4<Long, Long, Long, IntType>,
-                                    Long,
-                                    TimeWindow>() {
-
-                                private boolean open = false;
-
-                                @Override
-                                public void open(OpenContext openContext) {
-                                    assertEquals(
-                                            PARALLELISM,
-                                            getRuntimeContext()
-                                                    .getTaskInfo()
-                                                    
.getNumberOfParallelSubtasks());
-                                    open = true;
-                                }
-
-                                @Override
-                                public void apply(
-                                        Long l,
-                                        TimeWindow window,
-                                        Iterable<Tuple2<Long, IntType>> input,
-                                        Collector<Tuple4<Long, Long, Long, 
IntType>> out) {
-
-                                    // validate that the function has been 
opened properly
-                                    assertTrue(open);
-
-                                    for (Tuple2<Long, IntType> in : input) {
-                                        final Tuple4<Long, Long, Long, 
IntType> output =
-                                                new Tuple4<>(
-                                                        in.f0,
-                                                        window.getStart(),
-                                                        window.getEnd(),
-                                                        in.f1);
-                                        out.collect(output);
-                                    }
+        StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+        env.setParallelism(PARALLELISM);
+        env.enableCheckpointing(100);
+        RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L);
+        env.getConfig().setUseSnapshotCompression(true);
+
+        env.addSource(
+                        new FailingSource(
+                                new KeyedEventTimeGenerator(numKeys, 
windowSize),
+                                numElementsPerKey))
+                .rebalance()
+                .keyBy(x -> x.f0)
+                
.window(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize)))
+                .reduce(
+                        new ReduceFunction<Tuple2<Long, IntType>>() {
+
+                            @Override
+                            public Tuple2<Long, IntType> reduce(
+                                    Tuple2<Long, IntType> a, Tuple2<Long, 
IntType> b) {
+                                return new Tuple2<>(a.f0, new 
IntType(a.f1.value + b.f1.value));
+                            }
+                        },
+                        new RichWindowFunction<
+                                Tuple2<Long, IntType>,
+                                Tuple4<Long, Long, Long, IntType>,
+                                Long,
+                                TimeWindow>() {
+
+                            private boolean open = false;
+
+                            @Override
+                            public void open(OpenContext openContext) {
+                                assertEquals(
+                                        PARALLELISM,
+                                        getRuntimeContext()
+                                                .getTaskInfo()
+                                                
.getNumberOfParallelSubtasks());
+                                open = true;
+                            }
+
+                            @Override
+                            public void apply(
+                                    Long l,
+                                    TimeWindow window,
+                                    Iterable<Tuple2<Long, IntType>> input,
+                                    Collector<Tuple4<Long, Long, Long, 
IntType>> out) {
+
+                                // validate that the function has been opened 
properly
+                                assertTrue(open);
+
+                                for (Tuple2<Long, IntType> in : input) {
+                                    final Tuple4<Long, Long, Long, IntType> 
output =
+                                            new Tuple4<>(
+                                                    in.f0,
+                                                    window.getStart(),
+                                                    window.getEnd(),
+                                                    in.f1);
+                                    out.collect(output);
                                 }
-                            })
-                    .addSink(
-                            new ValidatingSink<>(
-                                    new 
SinkValidatorUpdateFun(numElementsPerKey),
-                                    new SinkValidatorCheckFun(
-                                            numKeys, numElementsPerKey, 
windowSize)))
-                    .setParallelism(1);
-
-            env.execute("Tumbling Window Test");
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+                            }
+                        })
+                .addSink(
+                        new ValidatingSink<>(
+                                new SinkValidatorUpdateFun(numElementsPerKey),
+                                new SinkValidatorCheckFun(numKeys, 
numElementsPerKey, windowSize)))
+                .setParallelism(1);
+
+        env.execute("Tumbling Window Test");
     }
 
     @Test
-    public void testPreAggregatedSlidingTimeWindow() {
+    public void testPreAggregatedSlidingTimeWindow() throws Exception {
         final int numElementsPerKey = numElementsPerKey();
         final int windowSize = windowSize();
         final int windowSlide = windowSlide();
         final int numKeys = numKeys();
 
-        try {
-            StreamExecutionEnvironment env =
-                    
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
-            env.setParallelism(PARALLELISM);
-            env.enableCheckpointing(100);
-            RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 
0L);
-            env.getConfig().setUseSnapshotCompression(true);
-
-            env.addSource(
-                            new FailingSource(
-                                    new KeyedEventTimeGenerator(numKeys, 
windowSlide),
-                                    numElementsPerKey))
-                    .rebalance()
-                    .keyBy(x -> x.f0)
-                    .window(
-                            SlidingEventTimeWindows.of(
-                                    Duration.ofMillis(windowSize), 
Duration.ofMillis(windowSlide)))
-                    .reduce(
-                            new ReduceFunction<Tuple2<Long, IntType>>() {
-
-                                @Override
-                                public Tuple2<Long, IntType> reduce(
-                                        Tuple2<Long, IntType> a, Tuple2<Long, 
IntType> b) {
-
-                                    // validate that the function has been 
opened properly
-                                    return new Tuple2<>(a.f0, new 
IntType(a.f1.value + b.f1.value));
-                                }
-                            },
-                            new RichWindowFunction<
-                                    Tuple2<Long, IntType>,
-                                    Tuple4<Long, Long, Long, IntType>,
-                                    Long,
-                                    TimeWindow>() {
-
-                                private boolean open = false;
-
-                                @Override
-                                public void open(OpenContext openContext) {
-                                    assertEquals(
-                                            PARALLELISM,
-                                            getRuntimeContext()
-                                                    .getTaskInfo()
-                                                    
.getNumberOfParallelSubtasks());
-                                    open = true;
-                                }
-
-                                @Override
-                                public void apply(
-                                        Long l,
-                                        TimeWindow window,
-                                        Iterable<Tuple2<Long, IntType>> input,
-                                        Collector<Tuple4<Long, Long, Long, 
IntType>> out) {
-
-                                    // validate that the function has been 
opened properly
-                                    assertTrue(open);
-
-                                    for (Tuple2<Long, IntType> in : input) {
-                                        out.collect(
-                                                new Tuple4<>(
-                                                        in.f0,
-                                                        window.getStart(),
-                                                        window.getEnd(),
-                                                        in.f1));
-                                    }
+        StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+        env.setParallelism(PARALLELISM);
+        env.enableCheckpointing(100);
+        RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L);
+        env.getConfig().setUseSnapshotCompression(true);
+
+        env.addSource(
+                        new FailingSource(
+                                new KeyedEventTimeGenerator(numKeys, 
windowSlide),
+                                numElementsPerKey))
+                .rebalance()
+                .keyBy(x -> x.f0)
+                .window(
+                        SlidingEventTimeWindows.of(
+                                Duration.ofMillis(windowSize), 
Duration.ofMillis(windowSlide)))
+                .reduce(
+                        new ReduceFunction<Tuple2<Long, IntType>>() {
+
+                            @Override
+                            public Tuple2<Long, IntType> reduce(
+                                    Tuple2<Long, IntType> a, Tuple2<Long, 
IntType> b) {
+
+                                // validate that the function has been opened 
properly
+                                return new Tuple2<>(a.f0, new 
IntType(a.f1.value + b.f1.value));
+                            }
+                        },
+                        new RichWindowFunction<
+                                Tuple2<Long, IntType>,
+                                Tuple4<Long, Long, Long, IntType>,
+                                Long,
+                                TimeWindow>() {
+
+                            private boolean open = false;
+
+                            @Override
+                            public void open(OpenContext openContext) {
+                                assertEquals(
+                                        PARALLELISM,
+                                        getRuntimeContext()
+                                                .getTaskInfo()
+                                                
.getNumberOfParallelSubtasks());
+                                open = true;
+                            }
+
+                            @Override
+                            public void apply(
+                                    Long l,
+                                    TimeWindow window,
+                                    Iterable<Tuple2<Long, IntType>> input,
+                                    Collector<Tuple4<Long, Long, Long, 
IntType>> out) {
+
+                                // validate that the function has been opened 
properly
+                                assertTrue(open);
+
+                                for (Tuple2<Long, IntType> in : input) {
+                                    out.collect(
+                                            new Tuple4<>(
+                                                    in.f0,
+                                                    window.getStart(),
+                                                    window.getEnd(),
+                                                    in.f1));
                                 }
-                            })
-                    .addSink(
-                            new ValidatingSink<>(
-                                    new 
SinkValidatorUpdateFun(numElementsPerKey),
-                                    new SinkValidatorCheckFun(
-                                            numKeys, numElementsPerKey, 
windowSlide)))
-                    .setParallelism(1);
-
-            env.execute("Tumbling Window Test");
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+                            }
+                        })
+                .addSink(
+                        new ValidatingSink<>(
+                                new SinkValidatorUpdateFun(numElementsPerKey),
+                                new SinkValidatorCheckFun(numKeys, 
numElementsPerKey, windowSlide)))
+                .setParallelism(1);
+
+        env.execute("Tumbling Window Test");
     }
 
     // ------------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ProcessingTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ProcessingTimeWindowCheckpointingITCase.java
index 1ebbbd35f0a..d58905e1ef6 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ProcessingTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ProcessingTimeWindowCheckpointingITCase.java
@@ -78,216 +78,190 @@ public class ProcessingTimeWindowCheckpointingITCase 
extends TestLogger {
     // ------------------------------------------------------------------------
 
     @Test
-    public void testTumblingProcessingTimeWindow() {
+    public void testTumblingProcessingTimeWindow() throws Exception {
         final int numElements = 3000;
 
-        try {
-            StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-            env.setParallelism(PARALLELISM);
-            env.getConfig().setAutoWatermarkInterval(10);
-            env.enableCheckpointing(100);
-            RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 
0L);
-
-            SinkValidatorUpdaterAndChecker updaterAndChecker =
-                    new SinkValidatorUpdaterAndChecker(numElements, 1);
-
-            env.addSource(new FailingSource(new Generator(), numElements, 
true))
-                    .rebalance()
-                    .keyBy(x -> x.f0)
-                    
.window(TumblingProcessingTimeWindows.of(Duration.ofMillis(100)))
-                    .apply(
-                            new RichWindowFunction<
-                                    Tuple2<Long, IntType>,
-                                    Tuple2<Long, IntType>,
-                                    Long,
-                                    TimeWindow>() {
-
-                                private boolean open = false;
-
-                                @Override
-                                public void open(OpenContext openContext) {
-                                    assertEquals(
-                                            PARALLELISM,
-                                            getRuntimeContext()
-                                                    .getTaskInfo()
-                                                    
.getNumberOfParallelSubtasks());
-                                    open = true;
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+        env.getConfig().setAutoWatermarkInterval(10);
+        env.enableCheckpointing(100);
+        RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L);
+
+        SinkValidatorUpdaterAndChecker updaterAndChecker =
+                new SinkValidatorUpdaterAndChecker(numElements, 1);
+
+        env.addSource(new FailingSource(new Generator(), numElements, true))
+                .rebalance()
+                .keyBy(x -> x.f0)
+                
.window(TumblingProcessingTimeWindows.of(Duration.ofMillis(100)))
+                .apply(
+                        new RichWindowFunction<
+                                Tuple2<Long, IntType>, Tuple2<Long, IntType>, 
Long, TimeWindow>() {
+
+                            private boolean open = false;
+
+                            @Override
+                            public void open(OpenContext openContext) {
+                                assertEquals(
+                                        PARALLELISM,
+                                        getRuntimeContext()
+                                                .getTaskInfo()
+                                                
.getNumberOfParallelSubtasks());
+                                open = true;
+                            }
+
+                            @Override
+                            public void apply(
+                                    Long l,
+                                    TimeWindow window,
+                                    Iterable<Tuple2<Long, IntType>> values,
+                                    Collector<Tuple2<Long, IntType>> out) {
+
+                                // validate that the function has been opened 
properly
+                                assertTrue(open);
+
+                                for (Tuple2<Long, IntType> value : values) {
+                                    assertEquals(value.f0.intValue(), 
value.f1.value);
+                                    out.collect(new Tuple2<>(value.f0, new 
IntType(1)));
                                 }
+                            }
+                        })
+                .addSink(new ValidatingSink<>(updaterAndChecker, 
updaterAndChecker, true))
+                .setParallelism(1);
 
-                                @Override
-                                public void apply(
-                                        Long l,
-                                        TimeWindow window,
-                                        Iterable<Tuple2<Long, IntType>> values,
-                                        Collector<Tuple2<Long, IntType>> out) {
-
-                                    // validate that the function has been 
opened properly
-                                    assertTrue(open);
-
-                                    for (Tuple2<Long, IntType> value : values) 
{
-                                        assertEquals(value.f0.intValue(), 
value.f1.value);
-                                        out.collect(new Tuple2<>(value.f0, new 
IntType(1)));
-                                    }
-                                }
-                            })
-                    .addSink(new ValidatingSink<>(updaterAndChecker, 
updaterAndChecker, true))
-                    .setParallelism(1);
-
-            tryExecute(env, "Tumbling Window Test");
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+        tryExecute(env, "Tumbling Window Test");
     }
 
     @Test
-    public void testSlidingProcessingTimeWindow() {
+    public void testSlidingProcessingTimeWindow() throws Exception {
         final int numElements = 3000;
 
-        try {
-            StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-            env.setParallelism(PARALLELISM);
-            env.getConfig().setAutoWatermarkInterval(10);
-            env.enableCheckpointing(100);
-            RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 
0L);
-            SinkValidatorUpdaterAndChecker updaterAndChecker =
-                    new SinkValidatorUpdaterAndChecker(numElements, 3);
-            env.addSource(new FailingSource(new Generator(), numElements, 
true))
-                    .rebalance()
-                    .keyBy(x -> x.f0)
-                    .window(
-                            SlidingProcessingTimeWindows.of(
-                                    Duration.ofMillis(150), 
Duration.ofMillis(50)))
-                    .apply(
-                            new RichWindowFunction<
-                                    Tuple2<Long, IntType>,
-                                    Tuple2<Long, IntType>,
-                                    Long,
-                                    TimeWindow>() {
-
-                                private boolean open = false;
-
-                                @Override
-                                public void open(OpenContext openContext) {
-                                    assertEquals(
-                                            PARALLELISM,
-                                            getRuntimeContext()
-                                                    .getTaskInfo()
-                                                    
.getNumberOfParallelSubtasks());
-                                    open = true;
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+        env.getConfig().setAutoWatermarkInterval(10);
+        env.enableCheckpointing(100);
+        RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L);
+        SinkValidatorUpdaterAndChecker updaterAndChecker =
+                new SinkValidatorUpdaterAndChecker(numElements, 3);
+        env.addSource(new FailingSource(new Generator(), numElements, true))
+                .rebalance()
+                .keyBy(x -> x.f0)
+                .window(
+                        SlidingProcessingTimeWindows.of(
+                                Duration.ofMillis(150), Duration.ofMillis(50)))
+                .apply(
+                        new RichWindowFunction<
+                                Tuple2<Long, IntType>, Tuple2<Long, IntType>, 
Long, TimeWindow>() {
+
+                            private boolean open = false;
+
+                            @Override
+                            public void open(OpenContext openContext) {
+                                assertEquals(
+                                        PARALLELISM,
+                                        getRuntimeContext()
+                                                .getTaskInfo()
+                                                
.getNumberOfParallelSubtasks());
+                                open = true;
+                            }
+
+                            @Override
+                            public void apply(
+                                    Long l,
+                                    TimeWindow window,
+                                    Iterable<Tuple2<Long, IntType>> values,
+                                    Collector<Tuple2<Long, IntType>> out) {
+
+                                // validate that the function has been opened 
properly
+                                assertTrue(open);
+
+                                for (Tuple2<Long, IntType> value : values) {
+                                    assertEquals(value.f0.intValue(), 
value.f1.value);
+                                    out.collect(new Tuple2<>(value.f0, new 
IntType(1)));
                                 }
+                            }
+                        })
+                .addSink(new ValidatingSink<>(updaterAndChecker, 
updaterAndChecker, true))
+                .setParallelism(1);
 
-                                @Override
-                                public void apply(
-                                        Long l,
-                                        TimeWindow window,
-                                        Iterable<Tuple2<Long, IntType>> values,
-                                        Collector<Tuple2<Long, IntType>> out) {
-
-                                    // validate that the function has been 
opened properly
-                                    assertTrue(open);
-
-                                    for (Tuple2<Long, IntType> value : values) 
{
-                                        assertEquals(value.f0.intValue(), 
value.f1.value);
-                                        out.collect(new Tuple2<>(value.f0, new 
IntType(1)));
-                                    }
-                                }
-                            })
-                    .addSink(new ValidatingSink<>(updaterAndChecker, 
updaterAndChecker, true))
-                    .setParallelism(1);
-
-            tryExecute(env, "Sliding Window Test");
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+        tryExecute(env, "Sliding Window Test");
     }
 
     @Test
-    public void testAggregatingTumblingProcessingTimeWindow() {
+    public void testAggregatingTumblingProcessingTimeWindow() throws Exception 
{
         final int numElements = 3000;
 
-        try {
-            StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-            env.setParallelism(PARALLELISM);
-            env.getConfig().setAutoWatermarkInterval(10);
-            env.enableCheckpointing(100);
-            RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 
0L);
-            SinkValidatorUpdaterAndChecker updaterAndChecker =
-                    new SinkValidatorUpdaterAndChecker(numElements, 1);
-            env.addSource(new FailingSource(new Generator(), numElements, 
true))
-                    .map(
-                            new MapFunction<Tuple2<Long, IntType>, 
Tuple2<Long, IntType>>() {
-                                @Override
-                                public Tuple2<Long, IntType> map(Tuple2<Long, 
IntType> value) {
-                                    value.f1.value = 1;
-                                    return value;
-                                }
-                            })
-                    .rebalance()
-                    .keyBy(x -> x.f0)
-                    
.window(TumblingProcessingTimeWindows.of(Duration.ofMillis(100)))
-                    .reduce(
-                            new ReduceFunction<Tuple2<Long, IntType>>() {
-
-                                @Override
-                                public Tuple2<Long, IntType> reduce(
-                                        Tuple2<Long, IntType> a, Tuple2<Long, 
IntType> b) {
-                                    return new Tuple2<>(a.f0, new IntType(1));
-                                }
-                            })
-                    .addSink(new ValidatingSink<>(updaterAndChecker, 
updaterAndChecker, true))
-                    .setParallelism(1);
-
-            tryExecute(env, "Aggregating Tumbling Window Test");
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+        env.getConfig().setAutoWatermarkInterval(10);
+        env.enableCheckpointing(100);
+        RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L);
+        SinkValidatorUpdaterAndChecker updaterAndChecker =
+                new SinkValidatorUpdaterAndChecker(numElements, 1);
+        env.addSource(new FailingSource(new Generator(), numElements, true))
+                .map(
+                        new MapFunction<Tuple2<Long, IntType>, Tuple2<Long, 
IntType>>() {
+                            @Override
+                            public Tuple2<Long, IntType> map(Tuple2<Long, 
IntType> value) {
+                                value.f1.value = 1;
+                                return value;
+                            }
+                        })
+                .rebalance()
+                .keyBy(x -> x.f0)
+                
.window(TumblingProcessingTimeWindows.of(Duration.ofMillis(100)))
+                .reduce(
+                        new ReduceFunction<Tuple2<Long, IntType>>() {
+
+                            @Override
+                            public Tuple2<Long, IntType> reduce(
+                                    Tuple2<Long, IntType> a, Tuple2<Long, 
IntType> b) {
+                                return new Tuple2<>(a.f0, new IntType(1));
+                            }
+                        })
+                .addSink(new ValidatingSink<>(updaterAndChecker, 
updaterAndChecker, true))
+                .setParallelism(1);
+
+        tryExecute(env, "Aggregating Tumbling Window Test");
     }
 
     @Test
-    public void testAggregatingSlidingProcessingTimeWindow() {
+    public void testAggregatingSlidingProcessingTimeWindow() throws Exception {
         final int numElements = 3000;
 
-        try {
-            StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-            env.setParallelism(PARALLELISM);
-            env.getConfig().setAutoWatermarkInterval(10);
-            env.enableCheckpointing(100);
-            RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 
0L);
-            SinkValidatorUpdaterAndChecker updaterAndChecker =
-                    new SinkValidatorUpdaterAndChecker(numElements, 3);
-            env.addSource(new FailingSource(new Generator(), numElements, 
true))
-                    .map(
-                            new MapFunction<Tuple2<Long, IntType>, 
Tuple2<Long, IntType>>() {
-                                @Override
-                                public Tuple2<Long, IntType> map(Tuple2<Long, 
IntType> value) {
-                                    value.f1.value = 1;
-                                    return value;
-                                }
-                            })
-                    .rebalance()
-                    .keyBy(x -> x.f0)
-                    .window(
-                            SlidingProcessingTimeWindows.of(
-                                    Duration.ofMillis(150), 
Duration.ofMillis(50)))
-                    .reduce(
-                            new ReduceFunction<Tuple2<Long, IntType>>() {
-                                @Override
-                                public Tuple2<Long, IntType> reduce(
-                                        Tuple2<Long, IntType> a, Tuple2<Long, 
IntType> b) {
-                                    return new Tuple2<>(a.f0, new IntType(1));
-                                }
-                            })
-                    .addSink(new ValidatingSink<>(updaterAndChecker, 
updaterAndChecker, true))
-                    .setParallelism(1);
-
-            tryExecute(env, "Aggregating Sliding Window Test");
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+        env.getConfig().setAutoWatermarkInterval(10);
+        env.enableCheckpointing(100);
+        RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L);
+        SinkValidatorUpdaterAndChecker updaterAndChecker =
+                new SinkValidatorUpdaterAndChecker(numElements, 3);
+        env.addSource(new FailingSource(new Generator(), numElements, true))
+                .map(
+                        new MapFunction<Tuple2<Long, IntType>, Tuple2<Long, 
IntType>>() {
+                            @Override
+                            public Tuple2<Long, IntType> map(Tuple2<Long, 
IntType> value) {
+                                value.f1.value = 1;
+                                return value;
+                            }
+                        })
+                .rebalance()
+                .keyBy(x -> x.f0)
+                .window(
+                        SlidingProcessingTimeWindows.of(
+                                Duration.ofMillis(150), Duration.ofMillis(50)))
+                .reduce(
+                        new ReduceFunction<Tuple2<Long, IntType>>() {
+                            @Override
+                            public Tuple2<Long, IntType> reduce(
+                                    Tuple2<Long, IntType> a, Tuple2<Long, 
IntType> b) {
+                                return new Tuple2<>(a.f0, new IntType(1));
+                            }
+                        })
+                .addSink(new ValidatingSink<>(updaterAndChecker, 
updaterAndChecker, true))
+                .setParallelism(1);
+
+        tryExecute(env, "Aggregating Sliding Window Test");
     }
 
     // ------------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
index 39bf862c0e2..89658ec82e7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
@@ -141,16 +141,11 @@ public class RegionFailoverITCase extends TestLogger {
      * completed checkpoint's.
      */
     @Test(timeout = 60000)
-    public void testMultiRegionFailover() {
-        try {
-            JobGraph jobGraph = createJobGraph();
-            ClusterClient<?> client = cluster.getClusterClient();
-            submitJobAndWaitForResult(client, jobGraph, 
getClass().getClassLoader());
-            verifyAfterJobExecuted();
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail(e.getMessage());
-        }
+    public void testMultiRegionFailover() throws Exception {
+        JobGraph jobGraph = createJobGraph();
+        ClusterClient<?> client = cluster.getClusterClient();
+        submitJobAndWaitForResult(client, jobGraph, 
getClass().getClassLoader());
+        verifyAfterJobExecuted();
     }
 
     private void verifyAfterJobExecuted() {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
index 62451cd39fb..57871604682 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
@@ -53,7 +53,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * Integration test for the {@link CheckpointListener} interface. The test 
ensures that {@link
@@ -83,74 +82,67 @@ public class StreamCheckpointNotifierITCase extends 
AbstractTestBaseJUnit4 {
      * </pre>
      */
     @Test
-    public void testProgram() {
-        try {
-            final StreamExecutionEnvironment env =
-                    StreamExecutionEnvironment.getExecutionEnvironment();
-            assertEquals("test setup broken", PARALLELISM, 
env.getParallelism());
+    public void testProgram() throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        assertEquals("test setup broken", PARALLELISM, env.getParallelism());
 
-            env.enableCheckpointing(500);
-            RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 
Integer.MAX_VALUE, 0L);
+        env.enableCheckpointing(500);
+        RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 
Integer.MAX_VALUE, 0L);
 
-            final int numElements = 10000;
-            final int numTaskTotal = PARALLELISM * 5;
+        final int numElements = 10000;
+        final int numTaskTotal = PARALLELISM * 5;
 
-            DataStream<Long> stream =
-                    env.addSource(new GeneratingSourceFunction(numElements, 
numTaskTotal));
+        DataStream<Long> stream =
+                env.addSource(new GeneratingSourceFunction(numElements, 
numTaskTotal));
 
-            stream
-                    // -------------- first vertex, chained to the src 
----------------
-                    .filter(new LongRichFilterFunction())
+        stream
+                // -------------- first vertex, chained to the src 
----------------
+                .filter(new LongRichFilterFunction())
 
-                    // -------------- second vertex, applying the co-map 
----------------
-                    .connect(stream)
-                    .flatMap(new LeftIdentityCoRichFlatMapFunction())
+                // -------------- second vertex, applying the co-map 
----------------
+                .connect(stream)
+                .flatMap(new LeftIdentityCoRichFlatMapFunction())
 
-                    // -------------- third vertex - the stateful one that 
also fails
-                    // ----------------
-                    .map(new IdentityMapFunction())
-                    .startNewChain()
+                // -------------- third vertex - the stateful one that also 
fails
+                // ----------------
+                .map(new IdentityMapFunction())
+                .startNewChain()
 
-                    // -------------- fourth vertex - reducer and the sink 
----------------
-                    .keyBy(x -> x.f0)
-                    .reduce(new OnceFailingReducer(numElements))
-                    .sinkTo(new DiscardingSink<>());
+                // -------------- fourth vertex - reducer and the sink 
----------------
+                .keyBy(x -> x.f0)
+                .reduce(new OnceFailingReducer(numElements))
+                .sinkTo(new DiscardingSink<>());
 
-            env.execute();
+        env.execute();
 
-            final long failureCheckpointID = 
OnceFailingReducer.failureCheckpointID;
-            assertNotEquals(0L, failureCheckpointID);
+        final long failureCheckpointID = 
OnceFailingReducer.failureCheckpointID;
+        assertNotEquals(0L, failureCheckpointID);
 
-            List<List<Long>[]> allLists =
-                    Arrays.asList(
-                            GeneratingSourceFunction.COMPLETED_CHECKPOINTS,
-                            LongRichFilterFunction.COMPLETED_CHECKPOINTS,
-                            
LeftIdentityCoRichFlatMapFunction.COMPLETED_CHECKPOINTS,
-                            IdentityMapFunction.COMPLETED_CHECKPOINTS,
-                            OnceFailingReducer.COMPLETED_CHECKPOINTS);
+        List<List<Long>[]> allLists =
+                Arrays.asList(
+                        GeneratingSourceFunction.COMPLETED_CHECKPOINTS,
+                        LongRichFilterFunction.COMPLETED_CHECKPOINTS,
+                        
LeftIdentityCoRichFlatMapFunction.COMPLETED_CHECKPOINTS,
+                        IdentityMapFunction.COMPLETED_CHECKPOINTS,
+                        OnceFailingReducer.COMPLETED_CHECKPOINTS);
 
-            for (List<Long>[] parallelNotifications : allLists) {
-                for (List<Long> notifications : parallelNotifications) {
+        for (List<Long>[] parallelNotifications : allLists) {
+            for (List<Long> notifications : parallelNotifications) {
 
-                    assertTrue(
-                            "No checkpoint notification was received.", 
notifications.size() > 0);
+                assertTrue("No checkpoint notification was received.", 
notifications.size() > 0);
 
-                    assertFalse(
-                            "Failure checkpoint was marked as completed.",
-                            notifications.contains(failureCheckpointID));
+                assertFalse(
+                        "Failure checkpoint was marked as completed.",
+                        notifications.contains(failureCheckpointID));
 
-                    assertFalse(
-                            "No checkpoint received after failure.",
-                            notifications.get(notifications.size() - 1) == 
failureCheckpointID);
+                assertFalse(
+                        "No checkpoint received after failure.",
+                        notifications.get(notifications.size() - 1) == 
failureCheckpointID);
 
-                    assertTrue(
-                            "Checkpoint notification was received multiple 
times",
-                            notifications.size() == new 
HashSet<Long>(notifications).size());
-                }
+                assertTrue(
+                        "Checkpoint notification was received multiple times",
+                        notifications.size() == new 
HashSet<Long>(notifications).size());
             }
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
         }
     }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index a0e13ef9d50..61de82d4139 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -123,28 +123,22 @@ public abstract class StreamFaultToleranceTestBase 
extends TestLogger {
      */
     @Test
     public void runCheckpointedProgram() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+        env.enableCheckpointing(500);
+        RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 
Integer.MAX_VALUE, 0L);
+
+        testProgram(env);
+
+        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
         try {
-            StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-            env.setParallelism(PARALLELISM);
-            env.enableCheckpointing(500);
-            RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 
Integer.MAX_VALUE, 0L);
-
-            testProgram(env);
-
-            JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-            try {
-                submitJobAndWaitForResult(
-                        cluster.getClusterClient(), jobGraph, 
getClass().getClassLoader());
-            } catch (Exception e) {
-                Assert.assertTrue(
-                        ExceptionUtils.findThrowable(e, 
SuccessException.class).isPresent());
-            }
-
-            postSubmit();
+            submitJobAndWaitForResult(
+                    cluster.getClusterClient(), jobGraph, 
getClass().getClassLoader());
         } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail(e.getMessage());
+            Assert.assertTrue(ExceptionUtils.findThrowable(e, 
SuccessException.class).isPresent());
         }
+
+        postSubmit();
     }
 
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
index cb93a298e04..e6917f21a8f 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
@@ -87,16 +87,10 @@ public class TimestampedFileInputSplitTest extends 
TestLogger {
         Assert.assertTrue(richThirdSplit.compareTo(richForthSplit) < 0);
     }
 
-    @Test
+    @Test(expected = IllegalArgumentException.class)
     public void testIllegalArgument() {
-        try {
-            new TimestampedFileInputSplit(
-                    -10, 2, new Path("test"), 0, 100, null); // invalid 
modification time
-        } catch (Exception e) {
-            if (!(e instanceof IllegalArgumentException)) {
-                Assert.fail(e.getMessage());
-            }
-        }
+        new TimestampedFileInputSplit(
+                -10, 2, new Path("test"), 0, 100, null); // invalid 
modification time
     }
 
     @Test
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/example/client/LocalExecutorITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/example/client/LocalExecutorITCase.java
index 7cfab00d5e3..e005e39cff3 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/example/client/LocalExecutorITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/example/client/LocalExecutorITCase.java
@@ -36,7 +36,6 @@ import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.testfunctions.Tokenizer;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -67,32 +66,26 @@ public class LocalExecutorITCase extends TestLogger {
     }
 
     @Test(timeout = 60_000)
-    public void testLocalExecutorWithWordCount() throws InterruptedException {
-        try {
-            // set up the files
-            File inFile = File.createTempFile("wctext", ".in");
-            File outFile = File.createTempFile("wctext", ".out");
-            inFile.deleteOnExit();
-            outFile.deleteOnExit();
-
-            try (FileWriter fw = new FileWriter(inFile)) {
-                fw.write(WordCountData.TEXT);
-            }
-
-            final Configuration config = new Configuration();
-            config.set(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
-            config.set(DeploymentOptions.ATTACHED, true);
-
-            StreamGraph wcStreamGraph = getWordCountStreamGraph(inFile, 
outFile, parallelism);
-            JobClient jobClient =
-                    executor.execute(wcStreamGraph, config, 
ClassLoader.getSystemClassLoader())
-                            .get();
-            jobClient.getJobExecutionResult().get();
-        } catch (Exception e) {
-            e.printStackTrace();
-            Assert.fail(e.getMessage());
+    public void testLocalExecutorWithWordCount() throws Exception {
+        // set up the files
+        File inFile = File.createTempFile("wctext", ".in");
+        File outFile = File.createTempFile("wctext", ".out");
+        inFile.deleteOnExit();
+        outFile.deleteOnExit();
+
+        try (FileWriter fw = new FileWriter(inFile)) {
+            fw.write(WordCountData.TEXT);
         }
 
+        final Configuration config = new Configuration();
+        config.set(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
+        config.set(DeploymentOptions.ATTACHED, true);
+
+        StreamGraph wcStreamGraph = getWordCountStreamGraph(inFile, outFile, 
parallelism);
+        JobClient jobClient =
+                executor.execute(wcStreamGraph, config, 
ClassLoader.getSystemClassLoader()).get();
+        jobClient.getJobExecutionResult().get();
+
         assertThat(miniCluster.isRunning(), is(false));
     }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/io/InputOutputITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/io/InputOutputITCase.java
index 124348ac5a4..c1ebd8fea0d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/io/InputOutputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/io/InputOutputITCase.java
@@ -24,8 +24,6 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.sink.legacy.OutputFormatSinkFunction;
 import org.apache.flink.test.util.JavaProgramTestBaseJUnit4;
 
-import static org.junit.Assert.fail;
-
 /**
  * Tests for non rich DataSource and DataSink input output formats being 
correctly used at runtime.
  */
@@ -38,12 +36,7 @@ public class InputOutputITCase extends 
JavaProgramTestBaseJUnit4 {
         TestNonRichOutputFormat output = new TestNonRichOutputFormat();
         env.createInput(new TestNonRichInputFormat())
                 .addSink(new OutputFormatSinkFunction<>(output));
-        try {
-            env.execute();
-        } catch (Exception e) {
-            // we didn't break anything by making everything rich.
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+        env.execute();
+        // we didn't break anything by making everything rich.
     }
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
index 720b99cb5cd..32281cdcfb3 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
@@ -40,7 +40,6 @@ import java.io.IOException;
 
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * Test for proper error messages in case user-defined serialization is broken 
and detected in the
@@ -67,7 +66,7 @@ public class CustomSerializationITCase extends TestLogger {
     }
 
     @Test
-    public void testIncorrectSerializer1() {
+    public void testIncorrectSerializer1() throws Exception {
         try {
             StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
             env.setParallelism(PARLLELISM);
@@ -93,14 +92,11 @@ public class CustomSerializationITCase extends TestLogger {
                                                     .getMessage()
                                                     .contains("broken 
serialization."))
                             .isPresent());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
         }
     }
 
     @Test
-    public void testIncorrectSerializer2() {
+    public void testIncorrectSerializer2() throws Exception {
         try {
             StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
             env.setParallelism(PARLLELISM);
@@ -126,14 +122,11 @@ public class CustomSerializationITCase extends TestLogger 
{
                                                     .getMessage()
                                                     .contains("broken 
serialization."))
                             .isPresent());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
         }
     }
 
     @Test
-    public void testIncorrectSerializer3() {
+    public void testIncorrectSerializer3() throws Exception {
         try {
             StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
             env.setParallelism(PARLLELISM);
@@ -159,14 +152,11 @@ public class CustomSerializationITCase extends TestLogger 
{
                                                     .getMessage()
                                                     .contains("broken 
serialization."))
                             .isPresent());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
         }
     }
 
     @Test
-    public void testIncorrectSerializer4() {
+    public void testIncorrectSerializer4() throws Exception {
         try {
             StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
             env.setParallelism(PARLLELISM);
@@ -192,9 +182,6 @@ public class CustomSerializationITCase extends TestLogger {
                                                     .getMessage()
                                                     .contains("broken 
serialization."))
                             .isPresent());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
         }
     }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
index 6febe2ea473..c8835648486 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
@@ -63,87 +63,70 @@ public class MiscellaneousIssuesITCase extends TestLogger {
                             .build());
 
     @Test
-    public void testNullValues() {
-        try {
-            StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-            env.setParallelism(1);
-
-            DataStream<String> data =
-                    env.fromData("hallo")
-                            .map(
-                                    new MapFunction<String, String>() {
-                                        @Override
-                                        public String map(String value) throws 
Exception {
-                                            return null;
-                                        }
-                                    });
-            data.sinkTo(
-                    FileSink.forRowFormat(
-                                    new Path("/tmp/myTest"), new 
SimpleStringEncoder<String>())
-                            .build());
+    public void testNullValues() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+
+        DataStream<String> data =
+                env.fromData("hallo")
+                        .map(
+                                new MapFunction<String, String>() {
+                                    @Override
+                                    public String map(String value) throws 
Exception {
+                                        return null;
+                                    }
+                                });
+        data.sinkTo(
+                FileSink.forRowFormat(new Path("/tmp/myTest"), new 
SimpleStringEncoder<String>())
+                        .build());
 
-            try {
-                env.execute();
-                fail("this should fail due to null values.");
-            } catch (JobExecutionException e) {
-                assertTrue(findThrowable(e, 
NullPointerException.class).isPresent());
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
+        try {
+            env.execute();
+            fail("this should fail due to null values.");
+        } catch (JobExecutionException e) {
+            assertTrue(findThrowable(e, 
NullPointerException.class).isPresent());
         }
     }
 
     @Test
-    public void testDisjointDataflows() {
-        try {
-            StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-            env.setParallelism(5);
+    public void testDisjointDataflows() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(5);
 
-            // generate two different flows
-            env.fromSequence(1, 10).sinkTo(new DiscardingSink<>());
-            env.fromSequence(1, 10).sinkTo(new DiscardingSink<>());
-
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+        // generate two different flows
+        env.fromSequence(1, 10).sinkTo(new DiscardingSink<>());
+        env.fromSequence(1, 10).sinkTo(new DiscardingSink<>());
     }
 
     @Test
-    public void testAccumulatorsAfterNoOp() {
+    public void testAccumulatorsAfterNoOp() throws Exception {
 
         final String accName = "test_accumulator";
 
-        try {
-            StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-            env.setParallelism(6);
-
-            env.fromSequence(1, 1000000)
-                    .rebalance()
-                    .flatMap(
-                            new RichFlatMapFunction<Long, Long>() {
-
-                                private LongCounter counter;
-
-                                @Override
-                                public void open(OpenContext openContext) {
-                                    counter = 
getRuntimeContext().getLongCounter(accName);
-                                }
-
-                                @Override
-                                public void flatMap(Long value, 
Collector<Long> out) {
-                                    counter.add(1L);
-                                }
-                            })
-                    .sinkTo(new DiscardingSink<>());
-
-            JobExecutionResult result = env.execute();
-
-            assertEquals(1000000L, 
result.getAllAccumulatorResults().get(accName));
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(6);
+
+        env.fromSequence(1, 1000000)
+                .rebalance()
+                .flatMap(
+                        new RichFlatMapFunction<Long, Long>() {
+
+                            private LongCounter counter;
+
+                            @Override
+                            public void open(OpenContext openContext) {
+                                counter = 
getRuntimeContext().getLongCounter(accName);
+                            }
+
+                            @Override
+                            public void flatMap(Long value, Collector<Long> 
out) {
+                                counter.add(1L);
+                            }
+                        })
+                .sinkTo(new DiscardingSink<>());
+
+        JobExecutionResult result = env.execute();
+
+        assertEquals(1000000L, result.getAllAccumulatorResults().get(accName));
     }
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 7a6e353d9d6..b5389bf03a2 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -226,13 +226,11 @@ abstract class 
AbstractTaskManagerProcessFailureRecoveryTest {
 
             // all seems well :-)
         } catch (Exception e) {
-            e.printStackTrace();
             printProcessLog("TaskManager 1", taskManagerProcess1);
             printProcessLog("TaskManager 2", taskManagerProcess2);
             printProcessLog("TaskManager 3", taskManagerProcess3);
-            fail(e.getMessage());
+            throw e;
         } catch (Error e) {
-            e.printStackTrace();
             printProcessLog("TaskManager 1", taskManagerProcess1);
             printProcessLog("TaskManager 2", taskManagerProcess2);
             printProcessLog("TaskManager 3", taskManagerProcess3);
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
index a5d0215a959..99eb787731e 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
@@ -51,8 +51,6 @@ import java.net.ServerSocket;
 import java.util.Enumeration;
 import java.util.List;
 
-import static org.junit.Assert.fail;
-
 /** Test proper handling of IPv6 address literals in URLs. */
 @SuppressWarnings("serial")
 public class IPv6HostnamesITCase extends TestLogger {
@@ -83,52 +81,44 @@ public class IPv6HostnamesITCase extends TestLogger {
     }
 
     @Test
-    public void testClusterWithIPv6host() {
-        try {
-
-            StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-            env.setParallelism(4);
-
-            // get input data
-            DataStream<String> text = 
env.fromData(WordCountData.TEXT.split("\n"));
-
-            DataStream<Tuple2<String, Integer>> counts =
-                    text.flatMap(
-                                    new FlatMapFunction<String, Tuple2<String, 
Integer>>() {
-                                        @Override
-                                        public void flatMap(
-                                                String value,
-                                                Collector<Tuple2<String, 
Integer>> out)
-                                                throws Exception {
-                                            for (String token : 
value.toLowerCase().split("\\W+")) {
-                                                if (token.length() > 0) {
-                                                    out.collect(
-                                                            new Tuple2<String, 
Integer>(token, 1));
-                                                }
+    public void testClusterWithIPv6host() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(4);
+
+        // get input data
+        DataStream<String> text = env.fromData(WordCountData.TEXT.split("\n"));
+
+        DataStream<Tuple2<String, Integer>> counts =
+                text.flatMap(
+                                new FlatMapFunction<String, Tuple2<String, 
Integer>>() {
+                                    @Override
+                                    public void flatMap(
+                                            String value, 
Collector<Tuple2<String, Integer>> out)
+                                            throws Exception {
+                                        for (String token : 
value.toLowerCase().split("\\W+")) {
+                                            if (token.length() > 0) {
+                                                out.collect(new Tuple2<String, 
Integer>(token, 1));
                                             }
                                         }
-                                    })
-                            .keyBy(x -> x.f0)
-                            
.window(GlobalWindows.createWithEndOfStreamTrigger())
-                            .reduce(
-                                    new ReduceFunction<Tuple2<String, 
Integer>>() {
-                                        @Override
-                                        public Tuple2<String, Integer> reduce(
-                                                Tuple2<String, Integer> value1,
-                                                Tuple2<String, Integer> value2)
-                                                throws Exception {
-                                            return Tuple2.of(value1.f0, 
value1.f1 + value2.f1);
-                                        }
-                                    });
-
-            List<Tuple2<String, Integer>> result =
-                    CollectionUtil.iteratorToList(counts.executeAndCollect());
-
-            TestBaseUtils.compareResultAsText(result, 
WordCountData.COUNTS_AS_TUPLES);
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+                                    }
+                                })
+                        .keyBy(x -> x.f0)
+                        .window(GlobalWindows.createWithEndOfStreamTrigger())
+                        .reduce(
+                                new ReduceFunction<Tuple2<String, Integer>>() {
+                                    @Override
+                                    public Tuple2<String, Integer> reduce(
+                                            Tuple2<String, Integer> value1,
+                                            Tuple2<String, Integer> value2)
+                                            throws Exception {
+                                        return Tuple2.of(value1.f0, value1.f1 
+ value2.f1);
+                                    }
+                                });
+
+        List<Tuple2<String, Integer>> result =
+                CollectionUtil.iteratorToList(counts.executeAndCollect());
+
+        TestBaseUtils.compareResultAsText(result, 
WordCountData.COUNTS_AS_TUPLES);
     }
 
     private Inet6Address getLocalIPv6Address() {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/StateHandleSerializationTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/StateHandleSerializationTest.java
index f31dc9e7370..7019f775044 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/StateHandleSerializationTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/StateHandleSerializationTest.java
@@ -36,22 +36,17 @@ import static org.junit.Assert.fail;
 public class StateHandleSerializationTest {
 
     @Test
-    public void ensureStateHandlesHaveSerialVersionUID() {
-        try {
-            Reflections reflections = new Reflections("org.apache.flink");
+    public void ensureStateHandlesHaveSerialVersionUID() throws Exception {
+        Reflections reflections = new Reflections("org.apache.flink");
 
-            // check all state handles
+        // check all state handles
 
-            @SuppressWarnings("unchecked")
-            Set<Class<?>> stateHandleImplementations =
-                    (Set<Class<?>>) (Set<?>) 
reflections.getSubTypesOf(StateObject.class);
+        @SuppressWarnings("unchecked")
+        Set<Class<?>> stateHandleImplementations =
+                (Set<Class<?>>) (Set<?>) 
reflections.getSubTypesOf(StateObject.class);
 
-            for (Class<?> clazz : stateHandleImplementations) {
-                validataSerialVersionUID(clazz);
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
+        for (Class<?> clazz : stateHandleImplementations) {
+            validataSerialVersionUID(clazz);
         }
     }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java
index 6c60f12b705..736670f6283 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java
@@ -80,7 +80,7 @@ public class PartitionerITCase extends AbstractTestBaseJUnit4 
{
     }
 
     @Test
-    public void partitionerTest() {
+    public void partitionerTest() throws Exception {
 
         TestListResultSink<Tuple2<Integer, String>> hashPartitionResultSink =
                 new TestListResultSink<Tuple2<Integer, String>>();
@@ -145,12 +145,7 @@ public class PartitionerITCase extends 
AbstractTestBaseJUnit4 {
         // partition global
         src.global().map(new 
SubtaskIndexAssigner()).addSink(globalPartitionResultSink);
 
-        try {
-            env.execute();
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+        env.execute();
 
         List<Tuple2<Integer, String>> hashPartitionResult = 
hashPartitionResultSink.getResult();
         List<Tuple2<Integer, String>> customPartitionResult = 
customPartitionResultSink.getResult();

Reply via email to