http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index 7722d8a..b73f961 100644
--- 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -36,17 +36,26 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
+import akka.actor.ActorRef;
+import akka.dispatch.Futures;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.hadoop.fs.FileSystem;
 import org.junit.Assert;
 
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 public abstract class AbstractTestBase {
        protected static final int MINIMUM_HEAP_SIZE_MB = 192;
@@ -59,7 +68,7 @@ public abstract class AbstractTestBase {
 
        protected final Configuration config;
        
-       protected LocalFlinkMiniCluster executor;
+       protected TestingCluster executor;
 
        private final List<File> tempFiles;
 
@@ -67,10 +76,15 @@ public abstract class AbstractTestBase {
 
        protected int numTaskManagers = DEFAULT_NUM_TASK_MANAGERS;
 
+       private final FiniteDuration timeout;
+
        public AbstractTestBase(Configuration config) {
                verifyJvmOptions();
                this.config = config;
                this.tempFiles = new ArrayList<File>();
+
+               timeout = new 
FiniteDuration(config.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
+                               ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), 
TimeUnit.SECONDS);
        }
 
        private void verifyJvmOptions() {
@@ -90,7 +104,7 @@ public abstract class AbstractTestBase {
                config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 
TASK_MANAGER_MEMORY_SIZE);
                config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
taskManagerNumSlots);
                
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 
numTaskManagers);
-               this.executor = new LocalFlinkMiniCluster(config);
+               this.executor = new TestingCluster(config);
        }
        
        public void stopCluster() throws Exception {
@@ -101,13 +115,23 @@ public abstract class AbstractTestBase {
                        int numActiveConnections = 0;
 
                        {
-                               TaskManager[] tms = executor.getTaskManagers();
+                               List<ActorRef> tms = 
executor.getTaskManagersAsJava();
+                               List<Future<Object>> responseFutures = new 
ArrayList<Future<Object>>();
 
-                               if (tms != null) {
-                                       for (TaskManager tm : tms) {
-                                               numUnreleasedBCVars += 
tm.getBroadcastVariableManager().getNumberOfVariablesWithReferences();
-                                               numActiveConnections += 
tm.getChannelManager().getNetworkConnectionManager().getNumberOfActiveConnections();
-                                       }
+                               for(ActorRef tm: tms){
+                                       responseFutures.add(Patterns.ask(tm, 
TestingTaskManagerMessages
+                                                       
.RequestBroadcastVariablesWithReferences$.MODULE$, new Timeout
+                                                       (timeout)));
+                               }
+
+                               Future<Iterable<Object>> futureResponses = 
Futures.sequence(
+                                               responseFutures, 
AkkaUtils.globalExecutionContext());
+
+                               Iterable<Object> responses = 
Await.result(futureResponses, timeout);
+
+                               for(Object response: responses) {
+                                       numUnreleasedBCVars += 
((TestingTaskManagerMessages
+                                                       
.ResponseBroadcastVariablesWithReferences) response).number();
                                }
                        }
                        

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
index ca3d294..83dd73b 100644
--- 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
+++ 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
@@ -31,7 +31,7 @@ import 
org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
 import org.junit.Assert;
 import org.junit.Test;
 import org.apache.flink.api.java.CollectionEnvironment;
@@ -194,12 +194,12 @@ public abstract class JavaProgramTestBase extends 
AbstractTestBase {
        
        private static final class TestEnvironment extends ExecutionEnvironment 
{
 
-               private final LocalFlinkMiniCluster executor;
+               private final FlinkMiniCluster executor;
 
                private JobExecutionResult latestResult;
                
                
-               private TestEnvironment(LocalFlinkMiniCluster executor, int 
degreeOfParallelism) {
+               private TestEnvironment(FlinkMiniCluster executor, int 
degreeOfParallelism) {
                        this.executor = executor;
                        setDegreeOfParallelism(degreeOfParallelism);
                }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java 
b/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java
index 0d27f8d..e8b716b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java
@@ -19,7 +19,7 @@
 package org.apache.flink.test.util;
 
 import akka.actor.ActorRef;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
 import org.junit.Assert;
 
 import org.apache.flink.runtime.client.JobClient;
@@ -120,7 +120,7 @@ public abstract class FailingTestBase extends 
RecordAPITestBase {
                // reference to the timeout thread
                private final Thread timeoutThread;
                // cluster to submit the job to.
-               private final LocalFlinkMiniCluster executor;
+               private final FlinkMiniCluster executor;
                // job graph of the failing job (submitted first)
                private final JobGraph failingJob;
                // job graph of the working job (submitted after return from 
failing job)
@@ -129,7 +129,7 @@ public abstract class FailingTestBase extends 
RecordAPITestBase {
                private volatile Exception error;
                
 
-               public SubmissionThread(Thread timeoutThread, 
LocalFlinkMiniCluster executor, JobGraph failingJob,
+               public SubmissionThread(Thread timeoutThread, FlinkMiniCluster 
executor, JobGraph failingJob,
                                                                JobGraph job) {
                        this.timeoutThread = timeoutThread;
                        this.executor = executor;

Reply via email to