This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 19a0a5f [BEAM-5267] Make Flink Runner compile compatible with Flink
1.6.0 (#6331)
19a0a5f is described below
commit 19a0a5f20e87aab33d1346eeec0485fe9e4f15bc
Author: Maximilian Michels <[email protected]>
AuthorDate: Tue Sep 18 14:20:39 2018 +0200
[BEAM-5267] Make Flink Runner compile compatible with Flink 1.6.0 (#6331)
---
.../beam/runners/flink/ReadSourceStreamingTest.java | 19 +++++++++++--------
.../runners/flink/streaming/GroupByNullKeyTest.java | 19 +++++++++++--------
.../flink/streaming/TopWikipediaSessionsTest.java | 19 +++++++++++--------
3 files changed, 33 insertions(+), 24 deletions(-)
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java
index af95364..ded53af 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingTest.java
@@ -24,10 +24,13 @@ import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
/** Reads from a bounded source in streaming. */
-public class ReadSourceStreamingTest extends StreamingProgramTestBase {
+public class ReadSourceStreamingTest extends AbstractTestBase {
protected String resultDir;
protected String resultPath;
@@ -37,8 +40,8 @@ public class ReadSourceStreamingTest extends
StreamingProgramTestBase {
private static final String[] EXPECTED_RESULT =
new String[] {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
- @Override
- protected void preSubmit() throws Exception {
+ @Before
+ public void preSubmit() throws Exception {
// Beam Write will add shard suffix to fileName, see ShardNameTemplate.
// So tempFile need have a parent to compare.
File resultParent = createAndRegisterTempFile("result");
@@ -46,13 +49,13 @@ public class ReadSourceStreamingTest extends
StreamingProgramTestBase {
resultPath = new File(resultParent, "file.txt").getAbsolutePath();
}
- @Override
- protected void postSubmit() throws Exception {
+ @After
+ public void postSubmit() throws Exception {
compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT),
resultDir);
}
- @Override
- protected void testProgram() throws Exception {
+ @Test
+ public void testProgram() throws Exception {
runProgram(resultPath);
}
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
index 58b301d..6660019 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
@@ -33,12 +33,15 @@ import
org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
import org.joda.time.Duration;
import org.joda.time.Instant;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
/** Test for GroupByNullKey. */
-public class GroupByNullKeyTest extends StreamingProgramTestBase implements
Serializable {
+public class GroupByNullKeyTest extends AbstractTestBase implements
Serializable {
protected String resultDir;
protected String resultPath;
@@ -48,8 +51,8 @@ public class GroupByNullKeyTest extends
StreamingProgramTestBase implements Seri
public GroupByNullKeyTest() {}
- @Override
- protected void preSubmit() throws Exception {
+ @Before
+ public void preSubmit() throws Exception {
// Beam Write will add shard suffix to fileName, see ShardNameTemplate.
// So tempFile need have a parent to compare.
File resultParent = createAndRegisterTempFile("result");
@@ -57,8 +60,8 @@ public class GroupByNullKeyTest extends
StreamingProgramTestBase implements Seri
resultPath = new File(resultParent, "file.txt").getAbsolutePath();
}
- @Override
- protected void postSubmit() throws Exception {
+ @After
+ public void postSubmit() throws Exception {
compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT),
resultDir);
}
@@ -78,8 +81,8 @@ public class GroupByNullKeyTest extends
StreamingProgramTestBase implements Seri
// suppress since toString() of Void is called and key is deliberately null
@SuppressWarnings("ObjectToString")
- @Override
- protected void testProgram() throws Exception {
+ @Test
+ public void testProgram() throws Exception {
Pipeline p = FlinkTestPipeline.createForStreaming();
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsTest.java
index a6c0b16..cce44ba 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsTest.java
@@ -33,12 +33,15 @@ import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
import org.joda.time.Duration;
import org.joda.time.Instant;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
/** Session window test. */
-public class TopWikipediaSessionsTest extends StreamingProgramTestBase
implements Serializable {
+public class TopWikipediaSessionsTest extends AbstractTestBase implements
Serializable {
protected String resultDir;
protected String resultPath;
@@ -55,8 +58,8 @@ public class TopWikipediaSessionsTest extends
StreamingProgramTestBase implement
"user: user3 value:2"
};
- @Override
- protected void preSubmit() throws Exception {
+ @Before
+ public void preSubmit() throws Exception {
// Beam Write will add shard suffix to fileName, see ShardNameTemplate.
// So tempFile need have a parent to compare.
File resultParent = createAndRegisterTempFile("result");
@@ -64,13 +67,13 @@ public class TopWikipediaSessionsTest extends
StreamingProgramTestBase implement
resultPath = new File(resultParent, "file.txt").getAbsolutePath();
}
- @Override
- protected void postSubmit() throws Exception {
+ @After
+ public void postSubmit() throws Exception {
compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT),
resultDir);
}
- @Override
- protected void testProgram() throws Exception {
+ @Test
+ public void testProgram() throws Exception {
Pipeline p = FlinkTestPipeline.createForStreaming();