http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java index 7722d8a..b73f961 100644 --- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java +++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java @@ -36,17 +36,26 @@ import java.util.Collections; import java.util.Comparator; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.TimeUnit; +import akka.actor.ActorRef; +import akka.dispatch.Futures; +import akka.pattern.Patterns; +import akka.util.Timeout; import org.apache.commons.io.FileUtils; import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.testingUtils.TestingCluster; +import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages; import org.apache.hadoop.fs.FileSystem; import org.junit.Assert; import com.google.common.base.Charsets; import com.google.common.io.Files; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; public abstract class AbstractTestBase { protected static final int MINIMUM_HEAP_SIZE_MB = 192; @@ -59,7 +68,7 @@ public abstract class AbstractTestBase { protected final Configuration config; - protected LocalFlinkMiniCluster executor; + protected TestingCluster executor; private final List<File> tempFiles; @@ -67,10 +76,15 @@ public abstract class AbstractTestBase { protected int numTaskManagers = DEFAULT_NUM_TASK_MANAGERS; + private final FiniteDuration timeout; + public AbstractTestBase(Configuration config) { verifyJvmOptions(); this.config = config; this.tempFiles = new ArrayList<File>(); + + timeout = new FiniteDuration(config.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT, + ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS); } private void verifyJvmOptions() { @@ -90,7 +104,7 @@ public abstract class AbstractTestBase { config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, TASK_MANAGER_MEMORY_SIZE); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots); config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers); - this.executor = new LocalFlinkMiniCluster(config); + this.executor = new TestingCluster(config); } public void stopCluster() throws Exception { @@ -101,13 +115,23 @@ public abstract class AbstractTestBase { int numActiveConnections = 0; { - TaskManager[] tms = executor.getTaskManagers(); + List<ActorRef> tms = executor.getTaskManagersAsJava(); + List<Future<Object>> responseFutures = new ArrayList<Future<Object>>(); - if (tms != null) { - for (TaskManager tm : tms) { - numUnreleasedBCVars += tm.getBroadcastVariableManager().getNumberOfVariablesWithReferences(); - numActiveConnections += tm.getChannelManager().getNetworkConnectionManager().getNumberOfActiveConnections(); - } + for(ActorRef tm: tms){ + responseFutures.add(Patterns.ask(tm, TestingTaskManagerMessages + .RequestBroadcastVariablesWithReferences$.MODULE$, new Timeout + (timeout))); + } + + Future<Iterable<Object>> futureResponses = Futures.sequence( + responseFutures, AkkaUtils.globalExecutionContext()); + + Iterable<Object> responses = Await.result(futureResponses, timeout); + + for(Object response: responses) { + numUnreleasedBCVars += ((TestingTaskManagerMessages + .ResponseBroadcastVariablesWithReferences) response).number(); } }
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java index ca3d294..83dd73b 100644 --- a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java +++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java @@ -31,7 +31,7 @@ import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.client.JobClient; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.minicluster.FlinkMiniCluster; import org.junit.Assert; import org.junit.Test; import org.apache.flink.api.java.CollectionEnvironment; @@ -194,12 +194,12 @@ public abstract class JavaProgramTestBase extends AbstractTestBase { private static final class TestEnvironment extends ExecutionEnvironment { - private final LocalFlinkMiniCluster executor; + private final FlinkMiniCluster executor; private JobExecutionResult latestResult; - private TestEnvironment(LocalFlinkMiniCluster executor, int degreeOfParallelism) { + private TestEnvironment(FlinkMiniCluster executor, int degreeOfParallelism) { this.executor = executor; setDegreeOfParallelism(degreeOfParallelism); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java index 0d27f8d..e8b716b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java @@ -19,7 +19,7 @@ package org.apache.flink.test.util; import akka.actor.ActorRef; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.minicluster.FlinkMiniCluster; import org.junit.Assert; import org.apache.flink.runtime.client.JobClient; @@ -120,7 +120,7 @@ public abstract class FailingTestBase extends RecordAPITestBase { // reference to the timeout thread private final Thread timeoutThread; // cluster to submit the job to. - private final LocalFlinkMiniCluster executor; + private final FlinkMiniCluster executor; // job graph of the failing job (submitted first) private final JobGraph failingJob; // job graph of the working job (submitted after return from failing job) @@ -129,7 +129,7 @@ public abstract class FailingTestBase extends RecordAPITestBase { private volatile Exception error; - public SubmissionThread(Thread timeoutThread, LocalFlinkMiniCluster executor, JobGraph failingJob, + public SubmissionThread(Thread timeoutThread, FlinkMiniCluster executor, JobGraph failingJob, JobGraph job) { this.timeoutThread = timeoutThread; this.executor = executor;
