http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index 9bd8cc3..2738d22 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -21,7 +21,7 @@ package org.apache.flink.test.runtime.leaderelection;
 import akka.actor.ActorSystem;
 import akka.actor.Kill;
 import akka.actor.PoisonPill;
-import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.TestingServer;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -35,14 +35,19 @@ import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
@@ -52,7 +57,6 @@ import scala.concurrent.duration.FiniteDuration;
 import scala.concurrent.impl.Promise;
 
 import java.io.File;
-import java.io.IOException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -61,22 +65,20 @@ public class ZooKeeperLeaderElectionITCase extends 
TestLogger {
 
        private static final FiniteDuration timeout = 
TestingUtils.TESTING_DURATION();
 
-       private static final File tempDirectory;
+       private static TestingServer zkServer;
 
-       static {
-               try {
-                       tempDirectory = org.apache.flink.runtime.testutils
-                                       .CommonTestUtils.createTempDirectory();
-               }
-               catch (IOException e) {
-                       throw new RuntimeException("Test setup failed", e);
-               }
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @BeforeClass
+       public static void setup() throws Exception {
+               zkServer = new TestingServer(true);
        }
 
        @AfterClass
        public static void tearDown() throws Exception {
-               if (tempDirectory != null) {
-                       FileUtils.deleteDirectory(tempDirectory);
+               if (zkServer != null) {
+                       zkServer.close();
                }
        }
 
@@ -86,18 +88,19 @@ public class ZooKeeperLeaderElectionITCase extends 
TestLogger {
         */
        @Test
        public void testTaskManagerRegistrationAtReelectedLeader() throws 
Exception {
-               Configuration configuration = new Configuration();
+               File rootFolder = tempFolder.getRoot();
+
+               Configuration configuration = 
ZooKeeperTestUtils.createZooKeeperHAConfig(
+                       zkServer.getConnectString(),
+                       rootFolder.getPath());
 
                int numJMs = 10;
                int numTMs = 3;
 
-               configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
                
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
                
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
-               configuration.setString(ConfigConstants.STATE_BACKEND, 
"filesystem");
-               
configuration.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, 
tempDirectory.getAbsoluteFile().toURI().toString());
 
-               ForkableFlinkMiniCluster cluster = new 
ForkableFlinkMiniCluster(configuration);
+               TestingCluster cluster = new TestingCluster(configuration);
 
                try {
                        cluster.start();
@@ -137,14 +140,15 @@ public class ZooKeeperLeaderElectionITCase extends 
TestLogger {
                int numSlotsPerTM = 3;
                int parallelism = numTMs * numSlotsPerTM;
 
-               Configuration configuration = new Configuration();
+               File rootFolder = tempFolder.getRoot();
+
+               Configuration configuration = 
ZooKeeperTestUtils.createZooKeeperHAConfig(
+                       zkServer.getConnectString(),
+                       rootFolder.getPath());
 
-               configuration.setString(ConfigConstants.HA_MODE, "zookeeper");
                
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
                
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
                
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
numSlotsPerTM);
-               configuration.setString(ConfigConstants.STATE_BACKEND, 
"filesystem");
-               
configuration.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, 
tempDirectory.getAbsoluteFile().toURI().toString());
 
                // we "effectively" disable the automatic RecoverAllJobs 
message and sent it manually to make
                // sure that all TMs have registered to the JM prior to 
issueing the RecoverAllJobs message
@@ -169,7 +173,7 @@ public class ZooKeeperLeaderElectionITCase extends 
TestLogger {
 
                final JobGraph graph = new JobGraph("Blocking test job", 
sender, receiver);
 
-               final ForkableFlinkMiniCluster cluster = new 
ForkableFlinkMiniCluster(configuration);
+               final TestingCluster cluster = new 
TestingCluster(configuration);
 
                ActorSystem clientActorSystem = null;
 
@@ -250,14 +254,14 @@ public class ZooKeeperLeaderElectionITCase extends 
TestLogger {
                boolean finished = false;
 
                final ActorSystem clientActorSystem;
-               final ForkableFlinkMiniCluster cluster;
+               final LocalFlinkMiniCluster cluster;
                final JobGraph graph;
 
                final Promise<JobExecutionResult> resultPromise = new 
Promise.DefaultPromise<>();
 
                public JobSubmitterRunnable(
                                ActorSystem actorSystem,
-                               ForkableFlinkMiniCluster cluster,
+                               LocalFlinkMiniCluster cluster,
                                JobGraph graph) {
                        this.clientActorSystem = actorSystem;
                        this.cluster = cluster;

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index d693aaa..2ed759d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -43,7 +44,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.util.TestLogger;
@@ -75,7 +75,7 @@ public class TimestampITCase extends TestLogger {
        static MultiShotLatch latch;
 
 
-       private static ForkableFlinkMiniCluster cluster;
+       private static LocalFlinkMiniCluster cluster;
 
        @Before
        public void setupLatch() {
@@ -92,7 +92,7 @@ public class TimestampITCase extends TestLogger {
                        
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
                        
config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
 
-                       cluster = new ForkableFlinkMiniCluster(config, false);
+                       cluster = new LocalFlinkMiniCluster(config, false);
 
                        cluster.start();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
index fc90994..a8482ac 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
@@ -29,11 +29,11 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testutils.StoppableInvokable;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.runtime.webmonitor.files.MimeTypes;
 import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.test.util.TestBaseUtils;
 
 import org.apache.flink.util.TestLogger;
@@ -62,7 +62,7 @@ public class WebFrontendITCase extends TestLogger {
        private static final int NUM_TASK_MANAGERS = 2;
        private static final int NUM_SLOTS = 4;
        
-       private static ForkableFlinkMiniCluster cluster;
+       private static LocalFlinkMiniCluster cluster;
 
        private static int port = -1;
        
@@ -86,7 +86,7 @@ public class WebFrontendITCase extends TestLogger {
                config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, 
logFile.getAbsolutePath());
                config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, 
logFile.getAbsolutePath());
 
-               cluster = new ForkableFlinkMiniCluster(config, false);
+               cluster = new LocalFlinkMiniCluster(config, false);
                cluster.start();
                
                port = cluster.webMonitor().get().getServerPort();

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index ac661f3..1b2838d 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -21,7 +21,6 @@ package org.apache.flink.api.scala.runtime.jobmanager
 import akka.actor.{ActorSystem, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.api.common.{ExecutionConfig, ExecutionConfigTest}
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.jobgraph.{JobGraph, JobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks.{BlockingNoOpInvokable, 
NoOpInvokable}
@@ -30,8 +29,7 @@ import org.apache.flink.runtime.messages.Messages.Acknowledge
 import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered
 import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect
 import 
org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.{JobManagerTerminated,
 NotifyWhenJobManagerTerminated}
-import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
-import org.apache.flink.test.util.ForkableFlinkMiniCluster
+import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, 
TestingCluster, TestingUtils}
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
@@ -140,12 +138,12 @@ class JobManagerFailsITCase(_system: ActorSystem)
     }
   }
 
-  def startDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): 
ForkableFlinkMiniCluster = {
+  def startDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): 
TestingCluster = {
     val config = new Configuration()
     config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
     config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
numTaskmanagers)
 
-    val cluster = new ForkableFlinkMiniCluster(config, singleActorSystem = 
false)
+    val cluster = new TestingCluster(config, singleActorSystem = false)
 
     cluster.start()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index 258f6df..3b39b3f 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -20,7 +20,6 @@ package org.apache.flink.api.scala.runtime.taskmanager
 
 import akka.actor.{ActorSystem, Kill, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.api.common.{ExecutionConfig, ExecutionConfigTest}
 import org.apache.flink.configuration.ConfigConstants
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
@@ -31,8 +30,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages._
 import 
org.apache.flink.runtime.messages.TaskManagerMessages.{NotifyWhenRegisteredAtJobManager,
 RegisteredAtJobManager}
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect
-import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
-import org.apache.flink.test.util.ForkableFlinkMiniCluster
+import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, 
TestingCluster, TestingUtils}
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
@@ -100,7 +98,7 @@ class TaskManagerFailsITCase(_system: ActorSystem)
       val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
       val jobID = jobGraph.getJobID
 
-      val cluster = ForkableFlinkMiniCluster.startCluster(num_tasks, 2)
+      val cluster = TestingUtils.startTestingCluster(num_tasks, 2)
 
       val jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
 
@@ -152,7 +150,7 @@ class TaskManagerFailsITCase(_system: ActorSystem)
       val jobGraph = new JobGraph("Pointwise Job", sender, receiver)
       val jobID = jobGraph.getJobID
 
-      val cluster = ForkableFlinkMiniCluster.startCluster(num_tasks, 2)
+      val cluster = TestingUtils.startTestingCluster(num_tasks, 2)
 
       val taskManagers = cluster.getTaskManagers
       val jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
@@ -239,11 +237,11 @@ class TaskManagerFailsITCase(_system: ActorSystem)
     }
   }
 
-  def createDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): 
ForkableFlinkMiniCluster = {
+  def createDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): 
TestingCluster = {
     val config = new Configuration()
     config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
     config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
numTaskmanagers)
 
-    new ForkableFlinkMiniCluster(config, singleActorSystem = false)
+    new TestingCluster(config, singleActorSystem = false)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-yarn-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml
index 8c211ef..ffdca36 100644
--- a/flink-yarn-tests/pom.xml
+++ b/flink-yarn-tests/pom.xml
@@ -48,6 +48,14 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-runtime_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
                <!-- Needed for the streaming wordcount example -->
                <dependency>
                        <groupId>org.apache.flink</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 0243012..31a3d98 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -48,7 +48,6 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.rules.TemporaryFolder;
 import org.mockito.Mockito;
-import org.mockito.verification.VerificationMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.Marker;

http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/tools/maven/scalastyle-config.xml
----------------------------------------------------------------------
diff --git a/tools/maven/scalastyle-config.xml 
b/tools/maven/scalastyle-config.xml
index f7bb0d4..0f7f6bb 100644
--- a/tools/maven/scalastyle-config.xml
+++ b/tools/maven/scalastyle-config.xml
@@ -86,7 +86,7 @@
  <!-- </check> -->
  <check level="error" 
class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
   <parameters>
-   <parameter name="maxParameters"><![CDATA[10]]></parameter>
+   <parameter name="maxParameters"><![CDATA[15]]></parameter>
   </parameters>
  </check>
  <!-- <check level="error" 
class="org.scalastyle.scalariform.MagicNumberChecker" enabled="true"> -->

Reply via email to