This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 19a0a5f  [BEAM-5267] Make Flink Runner compile compatible with Flink 
1.6.0 (#6331)
19a0a5f is described below

commit 19a0a5f20e87aab33d1346eeec0485fe9e4f15bc
Author: Maximilian Michels <[email protected]>
AuthorDate: Tue Sep 18 14:20:39 2018 +0200

    [BEAM-5267] Make Flink Runner compile compatible with Flink 1.6.0 (#6331)
---
 .../beam/runners/flink/ReadSourceStreamingTest.java   | 19 +++++++++++--------
 .../runners/flink/streaming/GroupByNullKeyTest.java   | 19 +++++++++++--------
 .../flink/streaming/TopWikipediaSessionsTest.java     | 19 +++++++++++--------
 3 files changed, 33 insertions(+), 24 deletions(-)

diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java
index af95364..ded53af 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java
@@ -24,10 +24,13 @@ import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 
 /** Reads from a bounded source in streaming. */
-public class ReadSourceStreamingTest extends StreamingProgramTestBase {
+public class ReadSourceStreamingTest extends AbstractTestBase {
 
   protected String resultDir;
   protected String resultPath;
@@ -37,8 +40,8 @@ public class ReadSourceStreamingTest extends 
StreamingProgramTestBase {
   private static final String[] EXPECTED_RESULT =
       new String[] {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
 
-  @Override
-  protected void preSubmit() throws Exception {
+  @Before
+  public void preSubmit() throws Exception {
     // Beam Write will add shard suffix to fileName, see ShardNameTemplate.
     // So tempFile need have a parent to compare.
     File resultParent = createAndRegisterTempFile("result");
@@ -46,13 +49,13 @@ public class ReadSourceStreamingTest extends 
StreamingProgramTestBase {
     resultPath = new File(resultParent, "file.txt").getAbsolutePath();
   }
 
-  @Override
-  protected void postSubmit() throws Exception {
+  @After
+  public void postSubmit() throws Exception {
     compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultDir);
   }
 
-  @Override
-  protected void testProgram() throws Exception {
+  @Test
+  public void testProgram() throws Exception {
     runProgram(resultPath);
   }
 
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
index 58b301d..6660019 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
@@ -33,12 +33,15 @@ import 
org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 
 /** Test for GroupByNullKey. */
-public class GroupByNullKeyTest extends StreamingProgramTestBase implements 
Serializable {
+public class GroupByNullKeyTest extends AbstractTestBase implements 
Serializable {
 
   protected String resultDir;
   protected String resultPath;
@@ -48,8 +51,8 @@ public class GroupByNullKeyTest extends 
StreamingProgramTestBase implements Seri
 
   public GroupByNullKeyTest() {}
 
-  @Override
-  protected void preSubmit() throws Exception {
+  @Before
+  public void preSubmit() throws Exception {
     // Beam Write will add shard suffix to fileName, see ShardNameTemplate.
     // So tempFile need have a parent to compare.
     File resultParent = createAndRegisterTempFile("result");
@@ -57,8 +60,8 @@ public class GroupByNullKeyTest extends 
StreamingProgramTestBase implements Seri
     resultPath = new File(resultParent, "file.txt").getAbsolutePath();
   }
 
-  @Override
-  protected void postSubmit() throws Exception {
+  @After
+  public void postSubmit() throws Exception {
     compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultDir);
   }
 
@@ -78,8 +81,8 @@ public class GroupByNullKeyTest extends 
StreamingProgramTestBase implements Seri
 
   // suppress since toString() of Void is called and key is deliberately null
   @SuppressWarnings("ObjectToString")
-  @Override
-  protected void testProgram() throws Exception {
+  @Test
+  public void testProgram() throws Exception {
 
     Pipeline p = FlinkTestPipeline.createForStreaming();
 
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsTest.java
index a6c0b16..cce44ba 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsTest.java
@@ -33,12 +33,15 @@ import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 
 /** Session window test. */
-public class TopWikipediaSessionsTest extends StreamingProgramTestBase 
implements Serializable {
+public class TopWikipediaSessionsTest extends AbstractTestBase implements 
Serializable {
 
   protected String resultDir;
   protected String resultPath;
@@ -55,8 +58,8 @@ public class TopWikipediaSessionsTest extends 
StreamingProgramTestBase implement
         "user: user3 value:2"
       };
 
-  @Override
-  protected void preSubmit() throws Exception {
+  @Before
+  public void preSubmit() throws Exception {
     // Beam Write will add shard suffix to fileName, see ShardNameTemplate.
     // So tempFile need have a parent to compare.
     File resultParent = createAndRegisterTempFile("result");
@@ -64,13 +67,13 @@ public class TopWikipediaSessionsTest extends 
StreamingProgramTestBase implement
     resultPath = new File(resultParent, "file.txt").getAbsolutePath();
   }
 
-  @Override
-  protected void postSubmit() throws Exception {
+  @After
+  public void postSubmit() throws Exception {
     compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultDir);
   }
 
-  @Override
-  protected void testProgram() throws Exception {
+  @Test
+  public void testProgram() throws Exception {
 
     Pipeline p = FlinkTestPipeline.createForStreaming();
 

Reply via email to