[FLINK-8967][tests] Port NetworkStackThroughputITCase to flip6

This closes #5870.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f619f22d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f619f22d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f619f22d

Branch: refs/heads/release-1.5
Commit: f619f22dc9febb5d94152c165d0a5bb8cb70049e
Parents: 4af40d7
Author: zentol <ches...@apache.org>
Authored: Tue Apr 17 15:24:22 2018 +0200
Committer: zentol <ches...@apache.org>
Committed: Wed Apr 25 09:33:18 2018 +0200

----------------------------------------------------------------------
 .../runtime/NetworkStackThroughputITCase.java   | 40 +++++++++++---------
 1 file changed, 23 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f619f22d/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index e6401c0..3b93ca2 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -19,10 +19,11 @@
 package org.apache.flink.test.runtime;
 
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.reader.RecordReader;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
@@ -32,9 +33,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Ignore;
@@ -42,7 +41,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -234,43 +232,51 @@ public class NetworkStackThroughputITCase extends 
TestLogger {
 
                        final int numTaskManagers = parallelism / 
numSlotsPerTaskManager;
 
-                       final LocalFlinkMiniCluster localFlinkMiniCluster = 
TestBaseUtils.startCluster(
-                               numTaskManagers,
-                               numSlotsPerTaskManager,
-                               false,
-                               false,
-                               true);
+                       final MiniClusterResource cluster = new 
MiniClusterResource(
+                               new 
MiniClusterResource.MiniClusterResourceConfiguration(
+                                       new Configuration(),
+                                       numTaskManagers,
+                                       numSlotsPerTaskManager
+                               ),
+                               true
+                       );
+                       cluster.before();
 
                        try {
-                               System.out.println(Arrays.toString(p));
+                               System.out.println(String.format("Running test 
with parameters: dataVolumeGB=%s, useForwarder=%s, isSlowSender=%s, 
isSlowReceiver=%s, parallelism=%s, numSlotsPerTM=%s",
+                                       dataVolumeGb, useForwarder, 
isSlowSender, isSlowReceiver, parallelism, numSlotsPerTaskManager));
                                testProgram(
-                                       localFlinkMiniCluster,
+                                       cluster,
                                        dataVolumeGb,
                                        useForwarder,
                                        isSlowSender,
                                        isSlowReceiver,
                                        parallelism);
                        } finally {
-                               
TestBaseUtils.stopCluster(localFlinkMiniCluster, 
FutureUtils.toFiniteDuration(TestingUtils.TIMEOUT()));
+                               cluster.after();
                        }
                }
        }
 
        private void testProgram(
-                       LocalFlinkMiniCluster localFlinkMiniCluster,
+                       final MiniClusterResource cluster,
                        final int dataVolumeGb,
                        final boolean useForwarder,
                        final boolean isSlowSender,
                        final boolean isSlowReceiver,
                        final int parallelism) throws Exception {
-               JobExecutionResult jer = localFlinkMiniCluster.submitJobAndWait(
+               ClusterClient<?> client = cluster.getClusterClient();
+               client.setDetached(false);
+               client.setPrintStatusDuringExecution(false);
+
+               JobExecutionResult jer = (JobExecutionResult) client.submitJob(
                        createJobGraph(
                                dataVolumeGb,
                                useForwarder,
                                isSlowSender,
                                isSlowReceiver,
                                parallelism),
-                       false);
+                       getClass().getClassLoader());
 
                long dataVolumeMbit = dataVolumeGb * 8192;
                long runtimeSecs = jer.getNetRuntime(TimeUnit.SECONDS);

Reply via email to