http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java index 9bd8cc3..2738d22 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java @@ -21,7 +21,7 @@ package org.apache.flink.test.runtime.leaderelection; import akka.actor.ActorSystem; import akka.actor.Kill; import akka.actor.PoisonPill; -import org.apache.commons.io.FileUtils; +import org.apache.curator.test.TestingServer; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -35,14 +35,19 @@ import org.apache.flink.runtime.jobmanager.Tasks; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.testingUtils.TestingCluster; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testutils.ZooKeeperTestUtils; import org.apache.flink.runtime.util.LeaderRetrievalUtils; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Await; @@ -52,7 +57,6 @@ import scala.concurrent.duration.FiniteDuration; import scala.concurrent.impl.Promise; import java.io.File; -import java.io.IOException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -61,22 +65,20 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger { private static final FiniteDuration timeout = TestingUtils.TESTING_DURATION(); - private static final File tempDirectory; + private static TestingServer zkServer; - static { - try { - tempDirectory = org.apache.flink.runtime.testutils - .CommonTestUtils.createTempDirectory(); - } - catch (IOException e) { - throw new RuntimeException("Test setup failed", e); - } + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @BeforeClass + public static void setup() throws Exception { + zkServer = new TestingServer(true); } @AfterClass public static void tearDown() throws Exception { - if (tempDirectory != null) { - FileUtils.deleteDirectory(tempDirectory); + if (zkServer != null) { + zkServer.close(); } } @@ -86,18 +88,19 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger { */ @Test public void testTaskManagerRegistrationAtReelectedLeader() throws Exception { - Configuration configuration = new Configuration(); + File rootFolder = tempFolder.getRoot(); + + Configuration configuration = ZooKeeperTestUtils.createZooKeeperHAConfig( + zkServer.getConnectString(), + rootFolder.getPath()); int numJMs = 10; int numTMs = 3; - configuration.setString(ConfigConstants.HA_MODE, "zookeeper"); configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs); configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs); - configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem"); - configuration.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, tempDirectory.getAbsoluteFile().toURI().toString()); - ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration); + TestingCluster cluster = new TestingCluster(configuration); try { cluster.start(); @@ -137,14 +140,15 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger { int numSlotsPerTM = 3; int parallelism = numTMs * numSlotsPerTM; - Configuration configuration = new Configuration(); + File rootFolder = tempFolder.getRoot(); + + Configuration configuration = ZooKeeperTestUtils.createZooKeeperHAConfig( + zkServer.getConnectString(), + rootFolder.getPath()); - configuration.setString(ConfigConstants.HA_MODE, "zookeeper"); configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs); configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs); configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM); - configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem"); - configuration.setString(ConfigConstants.HA_ZOOKEEPER_STORAGE_PATH, tempDirectory.getAbsoluteFile().toURI().toString()); // we "effectively" disable the automatic RecoverAllJobs message and sent it manually to make // sure that all TMs have registered to the JM prior to issueing the RecoverAllJobs message @@ -169,7 +173,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger { final JobGraph graph = new JobGraph("Blocking test job", sender, receiver); - final ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration); + final TestingCluster cluster = new TestingCluster(configuration); ActorSystem clientActorSystem = null; @@ -250,14 +254,14 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger { boolean finished = false; final ActorSystem clientActorSystem; - final ForkableFlinkMiniCluster cluster; + final LocalFlinkMiniCluster cluster; final JobGraph graph; final Promise<JobExecutionResult> resultPromise = new Promise.DefaultPromise<>(); public JobSubmitterRunnable( ActorSystem actorSystem, - ForkableFlinkMiniCluster cluster, + LocalFlinkMiniCluster cluster, JobGraph graph) { this.clientActorSystem = actorSystem; this.cluster = cluster;
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java index d693aaa..2ed759d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java @@ -27,6 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -43,7 +44,6 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.util.TestLogger; import org.apache.flink.util.TestLogger; @@ -75,7 +75,7 @@ public class TimestampITCase extends TestLogger { static MultiShotLatch latch; - private static ForkableFlinkMiniCluster cluster; + private static LocalFlinkMiniCluster cluster; @Before public void setupLatch() { @@ -92,7 +92,7 @@ public class TimestampITCase extends TestLogger { config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS); config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12); - cluster = new ForkableFlinkMiniCluster(config, false); + cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); } http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java index fc90994..a8482ac 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java @@ -29,11 +29,11 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.testutils.StoppableInvokable; import org.apache.flink.runtime.webmonitor.WebMonitorUtils; import org.apache.flink.runtime.webmonitor.files.MimeTypes; import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.util.TestLogger; @@ -62,7 +62,7 @@ public class WebFrontendITCase extends TestLogger { private static final int NUM_TASK_MANAGERS = 2; private static final int NUM_SLOTS = 4; - private static ForkableFlinkMiniCluster cluster; + private static LocalFlinkMiniCluster cluster; private static int port = -1; @@ -86,7 +86,7 @@ public class WebFrontendITCase extends TestLogger { config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.getAbsolutePath()); config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath()); - cluster = new ForkableFlinkMiniCluster(config, false); + cluster = new LocalFlinkMiniCluster(config, false); cluster.start(); port = cluster.webMonitor().get().getServerPort(); http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala index ac661f3..1b2838d 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala @@ -21,7 +21,6 @@ package org.apache.flink.api.scala.runtime.jobmanager import akka.actor.{ActorSystem, PoisonPill} import akka.testkit.{ImplicitSender, TestKit} import org.apache.flink.configuration.{ConfigConstants, Configuration} -import org.apache.flink.api.common.{ExecutionConfig, ExecutionConfigTest} import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour} import org.apache.flink.runtime.jobgraph.{JobGraph, JobVertex} import org.apache.flink.runtime.jobmanager.Tasks.{BlockingNoOpInvokable, NoOpInvokable} @@ -30,8 +29,7 @@ import org.apache.flink.runtime.messages.Messages.Acknowledge import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.{JobManagerTerminated, NotifyWhenJobManagerTerminated} -import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils} -import org.apache.flink.test.util.ForkableFlinkMiniCluster +import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingCluster, TestingUtils} import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} @@ -140,12 +138,12 @@ class JobManagerFailsITCase(_system: ActorSystem) } } - def startDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): ForkableFlinkMiniCluster = { + def startDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): TestingCluster = { val config = new Configuration() config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskmanagers) - val cluster = new ForkableFlinkMiniCluster(config, singleActorSystem = false) + val cluster = new TestingCluster(config, singleActorSystem = false) cluster.start() http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala index 258f6df..3b39b3f 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala @@ -20,7 +20,6 @@ package org.apache.flink.api.scala.runtime.taskmanager import akka.actor.{ActorSystem, Kill, PoisonPill} import akka.testkit.{ImplicitSender, TestKit} -import org.apache.flink.api.common.{ExecutionConfig, ExecutionConfigTest} import org.apache.flink.configuration.ConfigConstants import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour} @@ -31,8 +30,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages._ import org.apache.flink.runtime.messages.TaskManagerMessages.{NotifyWhenRegisteredAtJobManager, RegisteredAtJobManager} import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect -import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils} -import org.apache.flink.test.util.ForkableFlinkMiniCluster +import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingCluster, TestingUtils} import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} @@ -100,7 +98,7 @@ class TaskManagerFailsITCase(_system: ActorSystem) val jobGraph = new JobGraph("Pointwise Job", sender, receiver) val jobID = jobGraph.getJobID - val cluster = ForkableFlinkMiniCluster.startCluster(num_tasks, 2) + val cluster = TestingUtils.startTestingCluster(num_tasks, 2) val jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION) @@ -152,7 +150,7 @@ class TaskManagerFailsITCase(_system: ActorSystem) val jobGraph = new JobGraph("Pointwise Job", sender, receiver) val jobID = jobGraph.getJobID - val cluster = ForkableFlinkMiniCluster.startCluster(num_tasks, 2) + val cluster = TestingUtils.startTestingCluster(num_tasks, 2) val taskManagers = cluster.getTaskManagers val jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION) @@ -239,11 +237,11 @@ class TaskManagerFailsITCase(_system: ActorSystem) } } - def createDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): ForkableFlinkMiniCluster = { + def createDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): TestingCluster = { val config = new Configuration() config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskmanagers) - new ForkableFlinkMiniCluster(config, singleActorSystem = false) + new TestingCluster(config, singleActorSystem = false) } } http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-yarn-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml index 8c211ef..ffdca36 100644 --- a/flink-yarn-tests/pom.xml +++ b/flink-yarn-tests/pom.xml @@ -48,6 +48,14 @@ under the License. <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <!-- Needed for the streaming wordcount example --> <dependency> <groupId>org.apache.flink</groupId> http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java index 0243012..31a3d98 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -48,7 +48,6 @@ import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.rules.TemporaryFolder; import org.mockito.Mockito; -import org.mockito.verification.VerificationMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.Marker; http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/tools/maven/scalastyle-config.xml ---------------------------------------------------------------------- diff --git a/tools/maven/scalastyle-config.xml b/tools/maven/scalastyle-config.xml index f7bb0d4..0f7f6bb 100644 --- a/tools/maven/scalastyle-config.xml +++ b/tools/maven/scalastyle-config.xml @@ -86,7 +86,7 @@ <!-- </check> --> <check level="error" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true"> <parameters> - <parameter name="maxParameters"><![CDATA[10]]></parameter> + <parameter name="maxParameters"><![CDATA[15]]></parameter> </parameters> </check> <!-- <check level="error" class="org.scalastyle.scalariform.MagicNumberChecker" enabled="true"> -->