Repository: flink
Updated Branches:
  refs/heads/master 9d3f127e4 -> 0182141d4


[FLINK-6294] Add close without input test for BucketingSink

And earlier version of the code was throwing an NPE if the sink was
closed without ever seeing any input.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/04aae306
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/04aae306
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/04aae306

Branch: refs/heads/master
Commit: 04aae3061aa0184e3ad610cda454c661f872a45d
Parents: 9d3f127
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Tue Apr 18 15:00:41 2017 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Tue Apr 18 17:42:05 2017 +0200

----------------------------------------------------------------------
 .../connectors/fs/bucketing/BucketingSinkTest.java     | 13 +++++++++++++
 1 file changed, 13 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/04aae306/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
index d671874..090c54a 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
@@ -151,6 +151,19 @@ public class BucketingSinkTest {
        }
 
        @Test
+       public void testClosingWithoutInput() throws Exception {
+               final File outDir = tempFolder.newFolder();
+
+               OneInputStreamOperatorTestHarness<String, Object> testHarness = 
createRescalingTestSink(outDir, 1, 0, 100);
+               testHarness.setup();
+               testHarness.open();
+
+               // verify that we can close without ever having an input. An 
earlier version of the code
+               // was throwing an NPE because we never initialized some 
internal state
+               testHarness.close();
+       }
+
+       @Test
        public void testInactivityPeriodWithLateNotify() throws Exception {
                final File outDir = tempFolder.newFolder();
 

Reply via email to