Repository: flink Updated Branches: refs/heads/master 360f02b1f -> 8fc7e7af2
[hotfix] Reduce the heavy sysout verbosity for certain tests Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f042e78 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f042e78 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f042e78 Branch: refs/heads/master Commit: 6f042e7894be388fa8e400a08002584c10781e60 Parents: 21a7158 Author: Stephan Ewen <se...@apache.org> Authored: Mon Feb 1 16:46:03 2016 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Tue Feb 2 16:55:44 2016 +0100 ---------------------------------------------------------------------- .../jar/CheckpointedStreamingProgram.java | 10 ---------- .../flink/test/recovery/FastFailuresITCase.java | 18 +++++++++++++++--- 2 files changed, 15 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6f042e78/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java index 47253da..cda5a7b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java @@ -27,8 +27,6 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.lang.RuntimeException; -import java.net.URL; -import java.net.URLClassLoader; /** * A simple streaming program, which is using the state checkpointing of Flink. @@ -40,14 +38,6 @@ public class CheckpointedStreamingProgram { private static final int CHECKPOINT_INTERVALL = 100; public static void main(String[] args) throws Exception { - ClassLoader cl = ClassLoader.getSystemClassLoader(); - URL[] urls = ((URLClassLoader)cl).getURLs(); - - for(URL url: urls){ - System.out.println(url.getFile()); - } - System.out.println("CheckpointedStreamingProgram classpath: "); - final String jarFile = args[0]; final String host = args[1]; final int port = Integer.parseInt(args[2]); http://git-wip-us.apache.org/repos/asf/flink/blob/6f042e78/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java index 0684fde..2a139c7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/FastFailuresITCase.java @@ -21,11 +21,13 @@ package org.apache.flink.test.recovery; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.junit.Test; @@ -33,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.fail; +@SuppressWarnings("serial") public class FastFailuresITCase { static final AtomicInteger FAILURES_SO_FAR = new AtomicInteger(); @@ -40,12 +43,21 @@ public class FastFailuresITCase { @Test public void testThis() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2); + + ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(config, false); + cluster.start(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( + "localhost", cluster.getLeaderRPCPort()); + + env.getConfig().disableSysoutLogging(); env.getConfig().setExecutionRetryDelay(0); env.setParallelism(4); env.enableCheckpointing(1000); - + DataStream<Tuple2<Integer, Integer>> input = env.addSource(new RichSourceFunction<Tuple2<Integer, Integer>>() { @Override