[FLINK-8967][tests] Port NetworkStackThroughputITCase to flip6 This closes #5870.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f619f22d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f619f22d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f619f22d Branch: refs/heads/release-1.5 Commit: f619f22dc9febb5d94152c165d0a5bb8cb70049e Parents: 4af40d7 Author: zentol <ches...@apache.org> Authored: Tue Apr 17 15:24:22 2018 +0200 Committer: zentol <ches...@apache.org> Committed: Wed Apr 25 09:33:18 2018 +0200 ---------------------------------------------------------------------- .../runtime/NetworkStackThroughputITCase.java | 40 +++++++++++--------- 1 file changed, 23 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f619f22d/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java index e6401c0..3b93ca2 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java @@ -19,10 +19,11 @@ package org.apache.flink.test.runtime; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.reader.RecordReader; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; @@ -32,9 +33,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; -import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.test.util.MiniClusterResource; import org.apache.flink.util.TestLogger; import org.junit.Ignore; @@ -42,7 +41,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Arrays; import java.util.concurrent.TimeUnit; /** @@ -234,43 +232,51 @@ public class NetworkStackThroughputITCase extends TestLogger { final int numTaskManagers = parallelism / numSlotsPerTaskManager; - final LocalFlinkMiniCluster localFlinkMiniCluster = TestBaseUtils.startCluster( - numTaskManagers, - numSlotsPerTaskManager, - false, - false, - true); + final MiniClusterResource cluster = new MiniClusterResource( + new MiniClusterResource.MiniClusterResourceConfiguration( + new Configuration(), + numTaskManagers, + numSlotsPerTaskManager + ), + true + ); + cluster.before(); try { - System.out.println(Arrays.toString(p)); + System.out.println(String.format("Running test with parameters: dataVolumeGB=%s, useForwarder=%s, isSlowSender=%s, isSlowReceiver=%s, parallelism=%s, numSlotsPerTM=%s", + dataVolumeGb, useForwarder, isSlowSender, isSlowReceiver, parallelism, numSlotsPerTaskManager)); testProgram( - localFlinkMiniCluster, + cluster, dataVolumeGb, useForwarder, isSlowSender, isSlowReceiver, parallelism); } finally { - TestBaseUtils.stopCluster(localFlinkMiniCluster, FutureUtils.toFiniteDuration(TestingUtils.TIMEOUT())); + cluster.after(); } } } private void testProgram( - LocalFlinkMiniCluster localFlinkMiniCluster, + final MiniClusterResource cluster, final int dataVolumeGb, final boolean useForwarder, final boolean isSlowSender, final boolean isSlowReceiver, final int parallelism) throws Exception { - JobExecutionResult jer = localFlinkMiniCluster.submitJobAndWait( + ClusterClient<?> client = cluster.getClusterClient(); + client.setDetached(false); + client.setPrintStatusDuringExecution(false); + + JobExecutionResult jer = (JobExecutionResult) client.submitJob( createJobGraph( dataVolumeGb, useForwarder, isSlowSender, isSlowReceiver, parallelism), - false); + getClass().getClassLoader()); long dataVolumeMbit = dataVolumeGb * 8192; long runtimeSecs = jer.getNetRuntime(TimeUnit.SECONDS);