[hotfix][streaming] Refactor TwoPhaseCommitSinkFunctionTest

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3f4de57b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3f4de57b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3f4de57b

Branch: refs/heads/master
Commit: 3f4de57b1e2dfd532ed7b95805365eec340ebe64
Parents: 87e5b8b
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Authored: Thu Aug 17 15:46:47 2017 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Thu Aug 24 11:33:34 2017 +0200

----------------------------------------------------------------------
 .../sink/TwoPhaseCommitSinkFunctionTest.java    | 123 +++++++++++--------
 1 file changed, 75 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3f4de57b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
index e5bb630..9d01e74 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
@@ -24,6 +24,8 @@ import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.BufferedWriter;
@@ -31,7 +33,6 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileWriter;
 import java.io.IOException;
-import java.io.Writer;
 import java.nio.charset.Charset;
 import java.nio.file.Files;
 import java.util.ArrayList;
@@ -43,56 +44,52 @@ import java.util.UUID;
 import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for {@link TwoPhaseCommitSinkFunction}.
  */
 public class TwoPhaseCommitSinkFunctionTest {
-       @Test
-       public void testNotifyOfCompletedCheckpoint() throws Exception {
-               File tmpDirectory = 
Files.createTempDirectory(this.getClass().getSimpleName() + "_tmp").toFile();
-               File targetDirectory = 
Files.createTempDirectory(this.getClass().getSimpleName() + "_target").toFile();
-               OneInputStreamOperatorTestHarness<String, Object> testHarness = 
createTestHarness(tmpDirectory, targetDirectory);
-
-               testHarness.setup();
-               testHarness.open();
-               testHarness.processElement("42", 0);
-               testHarness.snapshot(0, 1);
-               testHarness.processElement("43", 2);
-               testHarness.snapshot(1, 3);
-               testHarness.processElement("44", 4);
-               testHarness.snapshot(2, 5);
-               testHarness.notifyOfCompletedCheckpoint(1);
-
-               assertExactlyOnceForDirectory(targetDirectory, 
Arrays.asList("42", "43"));
-               assertEquals(2, tmpDirectory.listFiles().length); // one for 
checkpointId 2 and second for the currentTransaction
-               testHarness.close();
+       TestContext context;
+
+       @Before
+       public void setUp() throws Exception {
+               context = new TestContext();
        }
 
-       public OneInputStreamOperatorTestHarness<String, Object> 
createTestHarness(File tmpDirectory, File targetDirectory) throws Exception {
-               tmpDirectory.deleteOnExit();
-               targetDirectory.deleteOnExit();
-               FileBasedSinkFunction sinkFunction = new 
FileBasedSinkFunction(tmpDirectory, targetDirectory);
-               return new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(sinkFunction), StringSerializer.INSTANCE);
+       @After
+       public void tearDown() throws Exception {
+               context.close();
+       }
+
+       @Test
+       public void testNotifyOfCompletedCheckpoint() throws Exception {
+               context.harness.open();
+               context.harness.processElement("42", 0);
+               context.harness.snapshot(0, 1);
+               context.harness.processElement("43", 2);
+               context.harness.snapshot(1, 3);
+               context.harness.processElement("44", 4);
+               context.harness.snapshot(2, 5);
+               context.harness.notifyOfCompletedCheckpoint(1);
+
+               assertExactlyOnceForDirectory(context.targetDirectory, 
Arrays.asList("42", "43"));
+               assertEquals(2, context.tmpDirectory.listFiles().length); // 
one for checkpointId 2 and second for the currentTransaction
        }
 
        @Test
        public void testFailBeforeNotify() throws Exception {
-               File tmpDirectory = 
Files.createTempDirectory(this.getClass().getSimpleName() + "_tmp").toFile();
-               File targetDirectory = 
Files.createTempDirectory(this.getClass().getSimpleName() + "_target").toFile();
-               OneInputStreamOperatorTestHarness<String, Object> testHarness = 
createTestHarness(tmpDirectory, targetDirectory);
-
-               testHarness.setup();
-               testHarness.open();
-               testHarness.processElement("42", 0);
-               testHarness.snapshot(0, 1);
-               testHarness.processElement("43", 2);
-               OperatorStateHandles snapshot = testHarness.snapshot(1, 3);
-
-               assertTrue(tmpDirectory.setWritable(false));
+               context.harness.open();
+               context.harness.processElement("42", 0);
+               context.harness.snapshot(0, 1);
+               context.harness.processElement("43", 2);
+               OperatorStateHandles snapshot = context.harness.snapshot(1, 3);
+
+               assertTrue(context.tmpDirectory.setWritable(false));
                try {
-                       testHarness.processElement("44", 4);
-                       testHarness.snapshot(2, 5);
+                       context.harness.processElement("44", 4);
+                       context.harness.snapshot(2, 5);
+                       fail("something should fail");
                }
                catch (Exception ex) {
                        if (!(ex.getCause() instanceof FileNotFoundException)) {
@@ -100,17 +97,17 @@ public class TwoPhaseCommitSinkFunctionTest {
                        }
                        // ignore
                }
-               testHarness.close();
+               context.close();
+
+               assertTrue(context.tmpDirectory.setWritable(true));
 
-               assertTrue(tmpDirectory.setWritable(true));
+               context.open();
+               context.harness.initializeState(snapshot);
 
-               testHarness = createTestHarness(tmpDirectory, targetDirectory);
-               testHarness.setup();
-               testHarness.initializeState(snapshot);
-               testHarness.close();
+               assertExactlyOnceForDirectory(context.targetDirectory, 
Arrays.asList("42", "43"));
+               context.close();
 
-               assertExactlyOnceForDirectory(targetDirectory, 
Arrays.asList("42", "43"));
-               assertEquals(0, tmpDirectory.listFiles().length);
+               assertEquals(0, context.tmpDirectory.listFiles().length);
        }
 
        private void assertExactlyOnceForDirectory(File targetDirectory, 
List<String> expectedValues) throws IOException {
@@ -184,11 +181,41 @@ public class TwoPhaseCommitSinkFunctionTest {
 
        private static class FileTransaction {
                private final File tmpFile;
-               private final transient Writer writer;
+               private final transient BufferedWriter writer;
 
                public FileTransaction(File tmpFile) throws IOException {
                        this.tmpFile = tmpFile;
                        this.writer = new BufferedWriter(new 
FileWriter(tmpFile));
                }
+
+               @Override
+               public String toString() {
+                       return String.format("FileTransaction[%s]", 
tmpFile.getName());
+               }
+       }
+
+       private static class TestContext implements AutoCloseable {
+               public final File tmpDirectory = 
Files.createTempDirectory(TwoPhaseCommitSinkFunctionTest.class.getSimpleName() 
+ "_tmp").toFile();
+               public final File targetDirectory = 
Files.createTempDirectory(TwoPhaseCommitSinkFunctionTest.class.getSimpleName() 
+ "_target").toFile();
+
+               public FileBasedSinkFunction sinkFunction;
+               public OneInputStreamOperatorTestHarness<String, Object> 
harness;
+
+               private TestContext() throws Exception {
+                       tmpDirectory.deleteOnExit();
+                       targetDirectory.deleteOnExit();
+                       open();
+               }
+
+               @Override
+               public void close() throws Exception {
+                       harness.close();
+               }
+
+               public void open() throws Exception {
+                       sinkFunction = new FileBasedSinkFunction(tmpDirectory, 
targetDirectory);
+                       harness = new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(sinkFunction), StringSerializer.INSTANCE);
+                       harness.setup();
+               }
        }
 }

Reply via email to