Repository: flink
Updated Branches:
  refs/heads/master 95765b6d8 -> 665c7e399


http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 12dab93..20260c7 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -18,8 +18,9 @@
 
 package org.apache.flink.runtime.testingUtils
 
-import java.util.UUID
-import java.util.concurrent.Executor
+import java.util
+import java.util.{Collections, UUID}
+import java.util.concurrent._
 
 import akka.actor.{ActorRef, ActorSystem, Kill, Props}
 import akka.pattern.ask
@@ -42,8 +43,9 @@ import 
org.apache.flink.runtime.testutils.TestingResourceManager
 import org.apache.flink.runtime.util.LeaderRetrievalUtils
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, 
LogMessages}
 
+import scala.concurrent.duration.TimeUnit
 import scala.concurrent.duration._
-import scala.concurrent.{Await, ExecutionContext}
+import scala.concurrent.{ExecutionContextExecutor, Await, ExecutionContext}
 import scala.language.postfixOps
 
 /**
@@ -51,8 +53,10 @@ import scala.language.postfixOps
  */
 object TestingUtils {
 
-  val testConfig = 
ConfigFactory.parseString(getDefaultTestingActorSystemConfigString)
+  private var sharedExecutorInstance: ScheduledExecutorService = _
 
+  val testConfig = 
ConfigFactory.parseString(getDefaultTestingActorSystemConfigString)
+  
   val TESTING_DURATION = 2 minute
 
   val DEFAULT_AKKA_ASK_TIMEOUT = "200 s"
@@ -87,12 +91,25 @@ object TestingUtils {
     cluster
   }
 
-  /** Returns the global [[ExecutionContext]] which is a 
[[scala.concurrent.forkjoin.ForkJoinPool]]
-    * with a default parallelism equal to the number of available cores.
-    *
-    * @return ExecutionContext.global
+  /** 
+    * Gets the shared global testing execution context 
     */
-  def defaultExecutionContext = ExecutionContext.global
+  def defaultExecutionContext: ExecutionContextExecutor = {
+    ExecutionContext.fromExecutor(defaultExecutor)
+  }
+
+  /**
+   * Gets the shared global testing scheduled executor
+   */
+  def defaultExecutor: ScheduledExecutorService = {
+    synchronized {
+      if (sharedExecutorInstance == null || sharedExecutorInstance.isShutdown) 
{
+        sharedExecutorInstance = Executors.newSingleThreadScheduledExecutor()
+      }
+
+      sharedExecutorInstance
+    }
+  }
 
   /** Returns an [[ExecutionContext]] which uses the current thread to execute 
the runnable.
     *
@@ -108,11 +125,9 @@ object TestingUtils {
   /** [[ExecutionContext]] which queues [[Runnable]] up in an [[ActionQueue]] 
instead of
     * execution them. If the automatic execution mode is activated, then the 
[[Runnable]] are
     * executed.
-    *
-    * @param actionQueue
     */
   class QueuedActionExecutionContext private[testingUtils] (val actionQueue: 
ActionQueue)
-    extends ExecutionContext with Executor {
+    extends AbstractExecutorService with ExecutionContext with 
ScheduledExecutorService {
 
     var automaticExecution = false
 
@@ -131,18 +146,53 @@ object TestingUtils {
     override def reportFailure(t: Throwable): Unit = {
       t.printStackTrace()
     }
+
+    override def scheduleAtFixedRate(
+        command: Runnable,
+        initialDelay: Long,
+        period: Long,
+        unit: TimeUnit): ScheduledFuture[_] = {
+      throw new UnsupportedOperationException()
+    }
+
+    override def schedule(command: Runnable, delay: Long, unit: TimeUnit): 
ScheduledFuture[_] = {
+      throw new UnsupportedOperationException()
+    }
+
+    override def schedule[V](callable: Callable[V], delay: Long, unit: 
TimeUnit)
+        : ScheduledFuture[V] = {
+      throw new UnsupportedOperationException()
+    }
+
+    override def scheduleWithFixedDelay(
+        command: Runnable,
+        initialDelay: Long,
+        delay: Long,
+        unit: TimeUnit): ScheduledFuture[_] = {
+      throw new UnsupportedOperationException()
+    }
+
+    override def shutdown(): Unit = ()
+
+    override def isTerminated: Boolean = false
+
+    override def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = 
false
+
+    override def shutdownNow(): util.List[Runnable] = Collections.emptyList()
+
+    override def isShutdown: Boolean = false
   }
 
   /** Queue which stores [[Runnable]] */
   class ActionQueue {
     private val runnables = scala.collection.mutable.Queue[Runnable]()
 
-    def triggerNextAction {
+    def triggerNextAction() {
       val r = runnables.dequeue
       r.run()
     }
 
-    def popNextAction: Runnable = {
+    def popNextAction(): Runnable = {
       runnables.dequeue()
     }
 
@@ -309,7 +359,7 @@ object TestingUtils {
     */
   def createJobManager(
       actorSystem: ActorSystem,
-      futureExecutor: Executor,
+      futureExecutor: ScheduledExecutorService,
       ioExecutor: Executor,
       configuration: Configuration)
     : ActorGateway = {
@@ -335,7 +385,7 @@ object TestingUtils {
     */
   def createJobManager(
       actorSystem: ActorSystem,
-      futureExecutor: Executor,
+      futureExecutor: ScheduledExecutorService,
       ioExecutor: Executor,
       configuration: Configuration,
       prefix: String)
@@ -362,7 +412,7 @@ object TestingUtils {
     */
   def createJobManager(
       actorSystem: ActorSystem,
-      futureExecutor: Executor,
+      futureExecutor: ScheduledExecutorService,
       ioExecutor: Executor,
       configuration: Configuration,
       jobManagerClass: Class[_ <: JobManager])
@@ -385,7 +435,7 @@ object TestingUtils {
     */
   def createJobManager(
       actorSystem: ActorSystem,
-      futureExecutor: Executor,
+      futureExecutor: ScheduledExecutorService,
       ioExecutor: Executor,
       configuration: Configuration,
       jobManagerClass: Class[_ <: JobManager],

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
index e77cbb3..3c81112 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -138,8 +138,8 @@ public class RescalePartitionerTest extends TestLogger {
                assertEquals(2, sinkVertex.getParallelism());
 
                ExecutionGraph eg = new ExecutionGraph(
-                       TestingUtils.defaultExecutionContext(),
-                       TestingUtils.defaultExecutionContext(),
+                       TestingUtils.defaultExecutor(),
+                       TestingUtils.defaultExecutor(),
                        jobId,
                        jobName,
                        cfg,

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 942b212..515570d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
@@ -131,8 +132,8 @@ public abstract class 
AbstractTaskManagerProcessFailureRecoveryTest extends Test
                        ActorRef jmActor = JobManager.startJobManagerActors(
                                jmConfig,
                                jmActorSystem,
-                               jmActorSystem.dispatcher(),
-                               jmActorSystem.dispatcher(),
+                               TestingUtils.defaultExecutor(),
+                               TestingUtils.defaultExecutor(),
                                JobManager.class,
                                MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index 8243e97..dde0b9e 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.util.NetUtils;
 
@@ -104,8 +105,8 @@ public class ProcessFailureCancelingITCase {
                        ActorRef jmActor = JobManager.startJobManagerActors(
                                jmConfig,
                                jmActorSystem,
-                               jmActorSystem.dispatcher(),
-                               jmActorSystem.dispatcher(),
+                               TestingUtils.defaultExecutor(),
+                               TestingUtils.defaultExecutor(),
                                JobManager.class,
                                MemoryArchivist.class)._1();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
 
b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
index 5244124..b539961 100644
--- 
a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ 
b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.yarn
 
-import java.util.concurrent.Executor
+import java.util.concurrent.{Executor, ScheduledExecutorService}
 
 import akka.actor.ActorRef
 import org.apache.flink.configuration.Configuration
@@ -54,7 +54,7 @@ import scala.concurrent.duration.FiniteDuration
   */
 class TestingYarnJobManager(
     flinkConfiguration: Configuration,
-    futureExecutor: Executor,
+    futureExecutor: ScheduledExecutorService,
     ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: Scheduler,

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 2193174..29f1827 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -75,6 +75,7 @@ import java.util.Collections;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
@@ -227,7 +228,7 @@ public class YarnApplicationMasterRunner {
 
                int numberProcessors = Hardware.getNumberCPUCores();
 
-               final ExecutorService futureExecutor = 
Executors.newFixedThreadPool(
+               final ScheduledExecutorService futureExecutor = 
Executors.newScheduledThreadPool(
                        numberProcessors,
                        new NamedThreadFactory("yarn-jobmanager-future-", 
"-thread-"));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/665c7e39/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index db4eea8..efb4801 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.yarn
 
-import java.util.concurrent.{Executor, TimeUnit}
+import java.util.concurrent.{Executor, ScheduledExecutorService, TimeUnit}
 
 import akka.actor.ActorRef
 import org.apache.flink.configuration.{ConfigConstants, Configuration => 
FlinkConfiguration}
@@ -54,7 +54,7 @@ import scala.language.postfixOps
   */
 class YarnJobManager(
     flinkConfiguration: FlinkConfiguration,
-    futureExecutor: Executor,
+    futureExecutor: ScheduledExecutorService,
     ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,

Reply via email to