Repository: flink Updated Branches: refs/heads/master 9d3f127e4 -> 0182141d4
[FLINK-6294] Add close without input test for BucketingSink And earlier version of the code was throwing an NPE if the sink was closed without ever seeing any input. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/04aae306 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/04aae306 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/04aae306 Branch: refs/heads/master Commit: 04aae3061aa0184e3ad610cda454c661f872a45d Parents: 9d3f127 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Tue Apr 18 15:00:41 2017 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Tue Apr 18 17:42:05 2017 +0200 ---------------------------------------------------------------------- .../connectors/fs/bucketing/BucketingSinkTest.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/04aae306/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java index d671874..090c54a 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java @@ -151,6 +151,19 @@ public class BucketingSinkTest { } @Test + public void testClosingWithoutInput() throws Exception { + final File outDir = tempFolder.newFolder(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness = createRescalingTestSink(outDir, 1, 0, 100); + testHarness.setup(); + testHarness.open(); + + // verify that we can close without ever having an input. An earlier version of the code + // was throwing an NPE because we never initialized some internal state + testHarness.close(); + } + + @Test public void testInactivityPeriodWithLateNotify() throws Exception { final File outDir = tempFolder.newFolder();