Repository: flink Updated Branches: refs/heads/master 95765b6d8 -> 665c7e399
http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index 12dab93..20260c7 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -18,8 +18,9 @@ package org.apache.flink.runtime.testingUtils -import java.util.UUID -import java.util.concurrent.Executor +import java.util +import java.util.{Collections, UUID} +import java.util.concurrent._ import akka.actor.{ActorRef, ActorSystem, Kill, Props} import akka.pattern.ask @@ -42,8 +43,9 @@ import org.apache.flink.runtime.testutils.TestingResourceManager import org.apache.flink.runtime.util.LeaderRetrievalUtils import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages} +import scala.concurrent.duration.TimeUnit import scala.concurrent.duration._ -import scala.concurrent.{Await, ExecutionContext} +import scala.concurrent.{ExecutionContextExecutor, Await, ExecutionContext} import scala.language.postfixOps /** @@ -51,8 +53,10 @@ import scala.language.postfixOps */ object TestingUtils { - val testConfig = ConfigFactory.parseString(getDefaultTestingActorSystemConfigString) + private var sharedExecutorInstance: ScheduledExecutorService = _ + val testConfig = ConfigFactory.parseString(getDefaultTestingActorSystemConfigString) + val TESTING_DURATION = 2 minute val DEFAULT_AKKA_ASK_TIMEOUT = "200 s" @@ -87,12 +91,25 @@ object TestingUtils { cluster } - /** Returns the global [[ExecutionContext]] which is a [[scala.concurrent.forkjoin.ForkJoinPool]] - * with a default parallelism equal to the number of available cores. - * - * @return ExecutionContext.global + /** + * Gets the shared global testing execution context */ - def defaultExecutionContext = ExecutionContext.global + def defaultExecutionContext: ExecutionContextExecutor = { + ExecutionContext.fromExecutor(defaultExecutor) + } + + /** + * Gets the shared global testing scheduled executor + */ + def defaultExecutor: ScheduledExecutorService = { + synchronized { + if (sharedExecutorInstance == null || sharedExecutorInstance.isShutdown) { + sharedExecutorInstance = Executors.newSingleThreadScheduledExecutor() + } + + sharedExecutorInstance + } + } /** Returns an [[ExecutionContext]] which uses the current thread to execute the runnable. * @@ -108,11 +125,9 @@ object TestingUtils { /** [[ExecutionContext]] which queues [[Runnable]] up in an [[ActionQueue]] instead of * execution them. If the automatic execution mode is activated, then the [[Runnable]] are * executed. - * - * @param actionQueue */ class QueuedActionExecutionContext private[testingUtils] (val actionQueue: ActionQueue) - extends ExecutionContext with Executor { + extends AbstractExecutorService with ExecutionContext with ScheduledExecutorService { var automaticExecution = false @@ -131,18 +146,53 @@ object TestingUtils { override def reportFailure(t: Throwable): Unit = { t.printStackTrace() } + + override def scheduleAtFixedRate( + command: Runnable, + initialDelay: Long, + period: Long, + unit: TimeUnit): ScheduledFuture[_] = { + throw new UnsupportedOperationException() + } + + override def schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture[_] = { + throw new UnsupportedOperationException() + } + + override def schedule[V](callable: Callable[V], delay: Long, unit: TimeUnit) + : ScheduledFuture[V] = { + throw new UnsupportedOperationException() + } + + override def scheduleWithFixedDelay( + command: Runnable, + initialDelay: Long, + delay: Long, + unit: TimeUnit): ScheduledFuture[_] = { + throw new UnsupportedOperationException() + } + + override def shutdown(): Unit = () + + override def isTerminated: Boolean = false + + override def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = false + + override def shutdownNow(): util.List[Runnable] = Collections.emptyList() + + override def isShutdown: Boolean = false } /** Queue which stores [[Runnable]] */ class ActionQueue { private val runnables = scala.collection.mutable.Queue[Runnable]() - def triggerNextAction { + def triggerNextAction() { val r = runnables.dequeue r.run() } - def popNextAction: Runnable = { + def popNextAction(): Runnable = { runnables.dequeue() } @@ -309,7 +359,7 @@ object TestingUtils { */ def createJobManager( actorSystem: ActorSystem, - futureExecutor: Executor, + futureExecutor: ScheduledExecutorService, ioExecutor: Executor, configuration: Configuration) : ActorGateway = { @@ -335,7 +385,7 @@ object TestingUtils { */ def createJobManager( actorSystem: ActorSystem, - futureExecutor: Executor, + futureExecutor: ScheduledExecutorService, ioExecutor: Executor, configuration: Configuration, prefix: String) @@ -362,7 +412,7 @@ object TestingUtils { */ def createJobManager( actorSystem: ActorSystem, - futureExecutor: Executor, + futureExecutor: ScheduledExecutorService, ioExecutor: Executor, configuration: Configuration, jobManagerClass: Class[_ <: JobManager]) @@ -385,7 +435,7 @@ object TestingUtils { */ def createJobManager( actorSystem: ActorSystem, - futureExecutor: Executor, + futureExecutor: ScheduledExecutorService, ioExecutor: Executor, configuration: Configuration, jobManagerClass: Class[_ <: JobManager], http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java index e77cbb3..3c81112 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java @@ -138,8 +138,8 @@ public class RescalePartitionerTest extends TestLogger { assertEquals(2, sinkVertex.getParallelism()); ExecutionGraph eg = new ExecutionGraph( - TestingUtils.defaultExecutionContext(), - TestingUtils.defaultExecutionContext(), + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), jobId, jobName, cfg, http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java index 942b212..515570d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.util.NetUtils; import org.apache.flink.util.TestLogger; @@ -131,8 +132,8 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test ActorRef jmActor = JobManager.startJobManagerActors( jmConfig, jmActorSystem, - jmActorSystem.dispatcher(), - jmActorSystem.dispatcher(), + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), JobManager.class, MemoryArchivist.class)._1(); http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java index 8243e97..dde0b9e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.util.NetUtils; @@ -104,8 +105,8 @@ public class ProcessFailureCancelingITCase { ActorRef jmActor = JobManager.startJobManagerActors( jmConfig, jmActorSystem, - jmActorSystem.dispatcher(), - jmActorSystem.dispatcher(), + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), JobManager.class, MemoryArchivist.class)._1(); http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala index 5244124..b539961 100644 --- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala +++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala @@ -18,7 +18,7 @@ package org.apache.flink.yarn -import java.util.concurrent.Executor +import java.util.concurrent.{Executor, ScheduledExecutorService} import akka.actor.ActorRef import org.apache.flink.configuration.Configuration @@ -54,7 +54,7 @@ import scala.concurrent.duration.FiniteDuration */ class TestingYarnJobManager( flinkConfiguration: Configuration, - futureExecutor: Executor, + futureExecutor: ScheduledExecutorService, ioExecutor: Executor, instanceManager: InstanceManager, scheduler: Scheduler, http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index 2193174..29f1827 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -75,6 +75,7 @@ import java.util.Collections; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; @@ -227,7 +228,7 @@ public class YarnApplicationMasterRunner { int numberProcessors = Hardware.getNumberCPUCores(); - final ExecutorService futureExecutor = Executors.newFixedThreadPool( + final ScheduledExecutorService futureExecutor = Executors.newScheduledThreadPool( numberProcessors, new NamedThreadFactory("yarn-jobmanager-future-", "-thread-")); http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala index db4eea8..efb4801 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala @@ -18,7 +18,7 @@ package org.apache.flink.yarn -import java.util.concurrent.{Executor, TimeUnit} +import java.util.concurrent.{Executor, ScheduledExecutorService, TimeUnit} import akka.actor.ActorRef import org.apache.flink.configuration.{ConfigConstants, Configuration => FlinkConfiguration} @@ -54,7 +54,7 @@ import scala.language.postfixOps */ class YarnJobManager( flinkConfiguration: FlinkConfiguration, - futureExecutor: Executor, + futureExecutor: ScheduledExecutorService, ioExecutor: Executor, instanceManager: InstanceManager, scheduler: FlinkScheduler,