Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5193#discussion_r158398663 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java --- @@ -823,39 +839,44 @@ public void testPointSessions() throws Exception { 0, null /* late data output tag */); - OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + OperatorStateHandles snapshot; - testHarness.open(); + try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = + createTestHarness(operator)) { + testHarness.open(); - // add elements out-of-order - testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0)); - testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 33), 1000)); + // add elements out-of-order + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 33), 1000)); - // do a snapshot, close and restore again - OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L); - testHarness.close(); - testHarness.setup(); - testHarness.initializeState(snapshot); - testHarness.open(); + // do a snapshot, close and restore again + snapshot = testHarness.snapshot(0L, 0L); + } + try (OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness = --- End diff -- nit: empty line above here.
---