http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
new file mode 100644
index 0000000..ec513e3
--- /dev/null
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import java.util.Map
+
+import akka.actor.ActorRef
+import org.apache.flink.api.common.JobID
+import org.apache.flink.api.common.accumulators.Accumulator
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint
+import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, 
ExecutionGraph}
+import org.apache.flink.runtime.instance.ActorGateway
+import org.apache.flink.runtime.jobgraph.JobStatus
+
+object TestingJobManagerMessages {
+
+  case class RequestExecutionGraph(jobID: JobID)
+
+  sealed trait ResponseExecutionGraph {
+    def jobID: JobID
+  }
+
+  case class ExecutionGraphFound(jobID: JobID, executionGraph: ExecutionGraph) 
extends
+  ResponseExecutionGraph
+
+  case class ExecutionGraphNotFound(jobID: JobID) extends 
ResponseExecutionGraph
+
+  case class WaitForAllVerticesToBeRunning(jobID: JobID)
+  case class WaitForAllVerticesToBeRunningOrFinished(jobID: JobID)
+  case class AllVerticesRunning(jobID: JobID)
+
+  case class NotifyWhenJobRemoved(jobID: JobID)
+
+  case class RequestWorkingTaskManager(jobID: JobID)
+  case class WorkingTaskManager(gatewayOption: Option[ActorGateway])
+
+  case class NotifyWhenJobStatus(jobID: JobID, state: JobStatus)
+  case class JobStatusIs(jobID: JobID, state: JobStatus)
+
+  case object NotifyListeners
+
+  case class NotifyWhenTaskManagerTerminated(taskManager: ActorRef)
+  case class TaskManagerTerminated(taskManager: ActorRef)
+
+  /**
+   * Registers a listener to receive a message when accumulators changed.
+   * The change must be explicitly triggered by the TestingTaskManager which 
can receive an
+   * 
[[org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged]]
+   * message by a task that changed the accumulators. This message is then
+   * forwarded to the JobManager which will send the accumulators in the 
[[UpdatedAccumulators]]
+   * message when the next Heartbeat occurs.
+   */
+  case class NotifyWhenAccumulatorChange(jobID: JobID)
+
+  /**
+   * Reports updated accumulators back to the listener.
+   */
+  case class UpdatedAccumulators(jobID: JobID,
+    flinkAccumulators: Map[ExecutionAttemptID, Map[AccumulatorRegistry.Metric, 
Accumulator[_,_]]],
+    userAccumulators: Map[String, Accumulator[_,_]])
+
+  /** Notifies the sender when the [[TestingJobManager]] has been elected as 
the leader
+   *
+   */
+  case object NotifyWhenLeader
+
+  /**
+   * Registers to be notified by an 
[[org.apache.flink.runtime.messages.Messages.Acknowledge]]
+   * message when at least numRegisteredTaskManager have registered at the 
JobManager.
+   *
+   * @param numRegisteredTaskManager minimum number of registered TMs before 
the sender is notified
+   */
+  case class 
NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager: Int)
+
+  /** Disables the post stop method of the [[TestingJobManager]].
+    *
+    * Only the leaderElectionService is stopped in the postStop method call to 
revoke the leadership
+    */
+  case object DisablePostStop
+
+  /**
+    * Requests a savepoint from the job manager.
+    *
+    * @param savepointPath The path of the savepoint to request.
+    */
+  case class RequestSavepoint(savepointPath: String)
+
+  /**
+    * Response to a savepoint request.
+    *
+    * @param savepoint The requested savepoint or null if none available.
+    */
+  case class ResponseSavepoint(savepoint: CompletedCheckpoint)
+
+  def getNotifyWhenLeader(): AnyRef = NotifyWhenLeader
+  def getDisablePostStop(): AnyRef = DisablePostStop
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
new file mode 100644
index 0000000..48a1ddd
--- /dev/null
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import org.apache.flink.runtime.jobmanager.MemoryArchivist
+import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{ExecutionGraphFound,
 ExecutionGraphNotFound, RequestExecutionGraph}
+
+/** Memory archivist extended by testing messages
+  *
+  * @param maxEntries number of maximum number of archived jobs
+  */
+class TestingMemoryArchivist(maxEntries: Int) extends 
MemoryArchivist(maxEntries) {
+
+  override def handleMessage: Receive = {
+    handleTestingMessage orElse super.handleMessage
+  }
+
+  def handleTestingMessage: Receive = {
+    case RequestExecutionGraph(jobID) =>
+      val executionGraph = graphs.get(jobID)
+      
+      executionGraph match {
+        case Some(graph) => sender ! 
decorateMessage(ExecutionGraphFound(jobID, graph))
+        case None => sender ! decorateMessage(ExecutionGraphNotFound(jobID))
+      }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
new file mode 100644
index 0000000..91d169a
--- /dev/null
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import akka.actor.ActorRef
+import org.apache.flink.api.common.JobID
+
+object TestingMessages {
+
+  case class CheckIfJobRemoved(jobID: JobID)
+
+  case object DisableDisconnect
+
+  case object Alive
+
+  def getAlive: AnyRef = Alive
+
+  def getDisableDisconnect: AnyRef = DisableDisconnect
+
+  case object NotifyOfComponentShutdown
+  case class ComponentShutdown(ref: ActorRef)
+
+  def getNotifyOfComponentShutdown(): AnyRef = NotifyOfComponentShutdown
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
new file mode 100644
index 0000000..2597753
--- /dev/null
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.instance.InstanceConnectionInfo
+import org.apache.flink.runtime.io.disk.iomanager.IOManager
+import org.apache.flink.runtime.io.network.NetworkEnvironment
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
+import org.apache.flink.runtime.memory.MemoryManager
+import org.apache.flink.runtime.taskmanager.{TaskManager, 
TaskManagerConfiguration}
+
+import scala.language.postfixOps
+
+/** Subclass of the [[TaskManager]] to support testing messages
+ */
+class TestingTaskManager(
+    config: TaskManagerConfiguration,
+    resourceID: ResourceID,
+    connectionInfo: InstanceConnectionInfo,
+    memoryManager: MemoryManager,
+    ioManager: IOManager,
+    network: NetworkEnvironment,
+    numberOfSlots: Int,
+    leaderRetrievalService: LeaderRetrievalService)
+  extends TaskManager(
+    config,
+    resourceID,
+    connectionInfo,
+    memoryManager,
+    ioManager,
+    network,
+    numberOfSlots,
+    leaderRetrievalService)
+  with TestingTaskManagerLike {
+
+  def this(
+      config: TaskManagerConfiguration,
+      connectionInfo: InstanceConnectionInfo,
+      memoryManager: MemoryManager,
+      ioManager: IOManager,
+      network: NetworkEnvironment,
+      numberOfSlots: Int,
+      leaderRetrievalService: LeaderRetrievalService) {
+    this(
+      config,
+      ResourceID.generate(),
+      connectionInfo,
+      memoryManager,
+      ioManager,
+      network,
+      numberOfSlots,
+      leaderRetrievalService)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
new file mode 100644
index 0000000..b41db31
--- /dev/null
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import akka.actor.{ActorRef, Terminated}
+import org.apache.flink.api.common.JobID
+import org.apache.flink.runtime.FlinkActor
+import org.apache.flink.runtime.execution.ExecutionState
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
+import 
org.apache.flink.runtime.messages.JobManagerMessages.{RequestLeaderSessionID, 
ResponseLeaderSessionID}
+import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
+import 
org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration,
 AlreadyRegistered}
+import org.apache.flink.runtime.messages.TaskMessages.{SubmitTask, 
TaskInFinalState, UpdateTaskExecutionState}
+import org.apache.flink.runtime.taskmanager.TaskManager
+import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
+import org.apache.flink.runtime.testingUtils.TestingMessages._
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+/** This mixin can be used to decorate a TaskManager with messages for testing 
purposes. */
+trait TestingTaskManagerLike extends FlinkActor {
+  that: TaskManager =>
+
+  import scala.collection.JavaConverters._
+
+  val waitForRemoval = scala.collection.mutable.HashMap[ExecutionAttemptID, 
Set[ActorRef]]()
+  val waitForJobManagerToBeTerminated = 
scala.collection.mutable.HashMap[String, Set[ActorRef]]()
+  val waitForRegisteredAtResourceManager =
+    scala.collection.mutable.HashMap[ActorRef, Set[ActorRef]]()
+  val waitForRunning = scala.collection.mutable.HashMap[ExecutionAttemptID, 
Set[ActorRef]]()
+  val unregisteredTasks = 
scala.collection.mutable.HashSet[ExecutionAttemptID]()
+
+  /** Map of registered task submit listeners */
+  val registeredSubmitTaskListeners = scala.collection.mutable.HashMap[JobID, 
ActorRef]()
+
+  val waitForShutdown = scala.collection.mutable.HashSet[ActorRef]()
+
+  var disconnectDisabled = false
+
+  /**
+   * Handler for testing related messages
+   */
+  abstract override def handleMessage: Receive = {
+    handleTestingMessage orElse super.handleMessage
+  }
+
+  def handleTestingMessage: Receive = {
+    case Alive => sender() ! Acknowledge
+
+    case NotifyWhenTaskIsRunning(executionID) =>
+      Option(runningTasks.get(executionID)) match {
+        case Some(task) if task.getExecutionState == ExecutionState.RUNNING =>
+          sender ! decorateMessage(true)
+
+        case _ =>
+          val listeners = waitForRunning.getOrElse(executionID, Set())
+          waitForRunning += (executionID -> (listeners + sender))
+      }
+
+    case RequestRunningTasks =>
+      sender ! 
decorateMessage(ResponseRunningTasks(runningTasks.asScala.toMap))
+
+    case NotifyWhenTaskRemoved(executionID) =>
+      Option(runningTasks.get(executionID)) match {
+        case Some(_) =>
+          val set = waitForRemoval.getOrElse(executionID, Set())
+          waitForRemoval += (executionID -> (set + sender))
+        case None =>
+          if(unregisteredTasks.contains(executionID)) {
+            sender ! decorateMessage(true)
+          } else {
+            val set = waitForRemoval.getOrElse(executionID, Set())
+            waitForRemoval += (executionID -> (set + sender))
+          }
+      }
+
+    case TaskInFinalState(executionID) =>
+      super.handleMessage(TaskInFinalState(executionID))
+      waitForRemoval.remove(executionID) match {
+        case Some(actors) => for(actor <- actors) actor ! decorateMessage(true)
+        case None =>
+      }
+
+      unregisteredTasks += executionID
+
+    case RequestBroadcastVariablesWithReferences =>
+      sender ! decorateMessage(
+        ResponseBroadcastVariablesWithReferences(
+          bcVarManager.getNumberOfVariablesWithReferences)
+      )
+
+    case RequestNumActiveConnections =>
+      val numActive = if (network.isAssociated) {
+        network.getConnectionManager.getNumberOfActiveConnections
+      } else {
+        0
+      }
+      sender ! decorateMessage(ResponseNumActiveConnections(numActive))
+
+    case NotifyWhenJobRemoved(jobID) =>
+      if(runningTasks.values.asScala.exists(_.getJobID == jobID)){
+        context.system.scheduler.scheduleOnce(
+          200 milliseconds,
+          self,
+          decorateMessage(CheckIfJobRemoved(jobID)))(
+            context.dispatcher,
+            sender()
+          )
+      }else{
+        sender ! decorateMessage(true)
+      }
+
+    case CheckIfJobRemoved(jobID) =>
+      if(runningTasks.values.asScala.forall(_.getJobID != jobID)){
+        sender ! decorateMessage(true)
+      } else {
+        context.system.scheduler.scheduleOnce(
+          200 milliseconds,
+          self,
+          decorateMessage(CheckIfJobRemoved(jobID)))(
+            context.dispatcher,
+            sender()
+          )
+      }
+
+    case NotifyWhenJobManagerTerminated(jobManager) =>
+      val waiting = 
waitForJobManagerToBeTerminated.getOrElse(jobManager.path.name, Set())
+      waitForJobManagerToBeTerminated += jobManager.path.name -> (waiting + 
sender)
+
+    case RegisterSubmitTaskListener(jobId) =>
+      registeredSubmitTaskListeners.put(jobId, sender())
+
+    case msg@SubmitTask(tdd) =>
+      registeredSubmitTaskListeners.get(tdd.getJobID) match {
+        case Some(listenerRef) =>
+          listenerRef ! ResponseSubmitTaskListener(tdd)
+        case None =>
+        // Nothing to do
+      }
+
+      super.handleMessage(msg)
+
+    /**
+     * Message from task manager that accumulator values changed and need to 
be reported immediately
+     * instead of lazily through the
+     * [[org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat]] 
message. We forward this
+     * message to the job manager that it knows it should report to the 
listeners.
+     */
+    case msg: AccumulatorsChanged =>
+      currentJobManager match {
+        case Some(jobManager) =>
+          jobManager.forward(msg)
+          sendHeartbeatToJobManager()
+          sender ! true
+        case None =>
+      }
+
+    case msg@Terminated(jobManager) =>
+      super.handleMessage(msg)
+
+      waitForJobManagerToBeTerminated.remove(jobManager.path.name) foreach {
+        _ foreach {
+          _ ! decorateMessage(JobManagerTerminated(jobManager))
+        }
+      }
+
+    case msg:Disconnect =>
+      if (!disconnectDisabled) {
+        super.handleMessage(msg)
+
+        val jobManager = sender()
+
+        waitForJobManagerToBeTerminated.remove(jobManager.path.name) foreach {
+          _ foreach {
+            _ ! decorateMessage(JobManagerTerminated(jobManager))
+          }
+        }
+      }
+
+    case DisableDisconnect =>
+      disconnectDisabled = true
+
+    case NotifyOfComponentShutdown =>
+      waitForShutdown += sender()
+
+    case msg @ UpdateTaskExecutionState(taskExecutionState) =>
+      super.handleMessage(msg)
+
+      if(taskExecutionState.getExecutionState == ExecutionState.RUNNING) {
+        waitForRunning.get(taskExecutionState.getID) foreach {
+          _ foreach (_ ! decorateMessage(true))
+        }
+      }
+
+    case RequestLeaderSessionID =>
+      sender() ! ResponseLeaderSessionID(leaderSessionID.orNull)
+
+    case NotifyWhenRegisteredAtJobManager(jobManager: ActorRef) =>
+      if(isConnected && jobManager == currentJobManager.get) {
+        sender() ! true
+      } else {
+        val list = waitForRegisteredAtResourceManager.getOrElse(
+          jobManager,
+          Set[ActorRef]())
+
+        waitForRegisteredAtResourceManager += jobManager -> (list + sender())
+      }
+
+    case msg @ (_: AcknowledgeRegistration | _: AlreadyRegistered) =>
+      super.handleMessage(msg)
+
+      val jm = sender()
+
+      waitForRegisteredAtResourceManager.remove(jm).foreach {
+        listeners => listeners.foreach{
+          listener =>
+            listener ! true
+        }
+      }
+  }
+
+  /**
+    * No killing of the VM for testing.
+    */
+  override protected def shutdown(): Unit = {
+    log.info("Shutting down TestingJobManager.")
+    waitForShutdown.foreach(_ ! ComponentShutdown(self))
+    waitForShutdown.clear()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
new file mode 100644
index 0000000..974e4e8
--- /dev/null
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testingUtils
+
+import akka.actor.ActorRef
+import org.apache.flink.api.common.JobID
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
+import org.apache.flink.runtime.taskmanager.Task
+
+/**
+ * Additional messages that the [[TestingTaskManager]] understands.
+ */
+object TestingTaskManagerMessages {
+  
+  case class NotifyWhenTaskRemoved(executionID: ExecutionAttemptID)
+
+  case class NotifyWhenTaskIsRunning(executionID: ExecutionAttemptID)
+  
+  case class ResponseRunningTasks(tasks: Map[ExecutionAttemptID, Task]){
+    import collection.JavaConverters._
+    def asJava: java.util.Map[ExecutionAttemptID, Task] = tasks.asJava
+  }
+  
+  case class ResponseBroadcastVariablesWithReferences(number: Int)
+
+  case object RequestNumActiveConnections
+  case class ResponseNumActiveConnections(number: Int)
+  
+  case object RequestRunningTasks
+  
+  case object RequestBroadcastVariablesWithReferences
+
+  case class NotifyWhenJobManagerTerminated(jobManager: ActorRef)
+
+  case class JobManagerTerminated(jobManager: ActorRef)
+
+  case class NotifyWhenRegisteredAtJobManager(resourceManager: ActorRef)
+
+  /**
+   * Message to give a hint to the task manager that accumulator values were 
updated in the task.
+   * This message is forwarded to the job manager which knows that it needs to 
notify listeners
+   * of accumulator updates.
+   */
+  case class AccumulatorsChanged(jobID: JobID)
+
+  /**
+    * Registers a listener for all 
[[org.apache.flink.runtime.messages.TaskMessages.SubmitTask]]
+    * messages of the given job.
+    *
+    * If a task is submitted with the given job ID the task deployment
+    * descriptor is forwarded to the listener.
+    *
+    * @param jobId The job ID to listen for.
+    */
+  case class RegisterSubmitTaskListener(jobId: JobID)
+
+  /**
+    * A response to a listened job ID containing the submitted task deployment 
descriptor.
+    *
+    * @param tdd The submitted task deployment descriptor.
+    */
+  case class ResponseSubmitTaskListener(tdd: TaskDeploymentDescriptor)
+
+  // --------------------------------------------------------------------------
+  // Utility methods to allow simpler case object access from Java
+  // --------------------------------------------------------------------------
+  
+  def getRequestRunningTasksMessage: AnyRef = {
+    RequestRunningTasks
+  }
+  
+  def getRequestBroadcastVariablesWithReferencesMessage: AnyRef = {
+    RequestBroadcastVariablesWithReferences
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index a801348..dcc98c9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -19,7 +19,8 @@
 package org.apache.flink.runtime.checkpoint;
 
 import akka.actor.ActorSystem;
-import org.apache.flink.api.common.ExecutionConfigTest;
+
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -30,7 +31,10 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
+
 import org.junit.Test;
+
 import scala.concurrent.duration.FiniteDuration;
 
 import java.lang.reflect.Field;
@@ -42,7 +46,7 @@ import java.util.concurrent.TimeUnit;
 import static org.junit.Assert.assertEquals;
 
 public class ExecutionGraphCheckpointCoordinatorTest {
-
+       
        @Test
        public void 
testCheckpointAndSavepointCoordinatorShareCheckpointIDCounter() throws 
Exception {
                ExecutionGraph executionGraph = new ExecutionGraph(
@@ -50,7 +54,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
                        new JobID(),
                        "test",
                        new Configuration(),
-                       ExecutionConfigTest.getSerializedConfig(),
+                       new SerializedValue<>(new ExecutionConfig()),
                        new FiniteDuration(1, TimeUnit.DAYS),
                        new NoRestartStrategy(),
                        Collections.<BlobKey>emptyList(),

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index f0bf694..56da9c8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -27,7 +27,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -37,6 +36,7 @@ import 
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.util.SerializedValue;
+
 import org.junit.Test;
 
 public class TaskDeploymentDescriptorTest {
@@ -58,7 +58,7 @@ public class TaskDeploymentDescriptorTest {
                        final List<InputGateDeploymentDescriptor> inputGates = 
new ArrayList<InputGateDeploymentDescriptor>(0);
                        final List<BlobKey> requiredJars = new 
ArrayList<BlobKey>(0);
                        final List<URL> requiredClasspaths = new 
ArrayList<URL>(0);
-                       final SerializedValue<ExecutionConfig> executionConfig 
= ExecutionConfigTest.getSerializedConfig();
+                       final SerializedValue<ExecutionConfig> executionConfig 
= new SerializedValue<>(new ExecutionConfig());
 
                        final TaskDeploymentDescriptor orig = new 
TaskDeploymentDescriptor(jobID, jobName, vertexID, execId,
                                executionConfig, taskName, indexInSubtaskGroup, 
currentNumberOfSubtasks, attemptNumber,

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java
index d659b45..314471e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java
@@ -26,7 +26,8 @@ import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.flink.runtime.event.AbstractEvent;
-import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.util.InstantiationUtil;
+
 import org.junit.Test;
 
 /**
@@ -43,7 +44,7 @@ public class TaskEventTest {
 
                try {
                        final IntegerTaskEvent orig = new IntegerTaskEvent(11);
-                       final IntegerTaskEvent copy = (IntegerTaskEvent) 
CommonTestUtils.createCopyWritable(orig);
+                       final IntegerTaskEvent copy = 
InstantiationUtil.createCopyWritable(orig);
 
                        assertEquals(orig.getInteger(), copy.getInteger());
                        assertEquals(orig.hashCode(), copy.hashCode());
@@ -63,7 +64,7 @@ public class TaskEventTest {
                try {
 
                        final StringTaskEvent orig = new 
StringTaskEvent("Test");
-                       final StringTaskEvent copy = (StringTaskEvent) 
CommonTestUtils.createCopyWritable(orig);
+                       final StringTaskEvent copy = 
InstantiationUtil.createCopyWritable(orig);
 
                        assertEquals(orig.getString(), copy.getString());
                        assertEquals(orig.hashCode(), copy.hashCode());
@@ -85,7 +86,7 @@ public class TaskEventTest {
                        final EventList orig = new EventList();
                        orig.add(new StringTaskEvent("Test 2"));
                        orig.add(new IntegerTaskEvent(70));
-                       final EventList copy = (EventList) 
CommonTestUtils.createCopyWritable(orig);
+                       final EventList copy = 
InstantiationUtil.createCopyWritable(orig);
 
                        assertEquals(orig.size(), copy.size());
                        final Iterator<AbstractEvent> origIt = orig.iterator();

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
index 8eebe66..6740293 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -26,11 +26,13 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import org.apache.flink.api.common.ExecutionConfigTest;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
+
 import org.junit.Test;
 import org.mockito.Matchers;
 
@@ -61,7 +63,7 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
  * This class contains test concerning the correct conversion from {@link 
JobGraph} to {@link ExecutionGraph} objects.
  */
 public class ExecutionGraphConstructionTest {
-       
+
        /**
         * Creates a JobGraph of the following form:
         * 
@@ -76,7 +78,7 @@ public class ExecutionGraphConstructionTest {
         * </pre>
         */
        @Test
-       public void testCreateSimpleGraphBipartite() {
+       public void testCreateSimpleGraphBipartite() throws Exception {
                
                final JobID jobId = new JobID();
                final String jobName = "Test Job Sample Name";
@@ -107,7 +109,7 @@ public class ExecutionGraphConstructionTest {
                        jobId, 
                        jobName, 
                        cfg,
-                       ExecutionConfigTest.getSerializedConfig(),
+                       new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {
@@ -122,7 +124,7 @@ public class ExecutionGraphConstructionTest {
        }
        
        @Test
-       public void testAttachViaDataSets() {
+       public void testAttachViaDataSets() throws Exception {
                final JobID jobId = new JobID();
                final String jobName = "Test Job Sample Name";
                final Configuration cfg = new Configuration();
@@ -151,8 +153,8 @@ public class ExecutionGraphConstructionTest {
                        TestingUtils.defaultExecutionContext(), 
                        jobId, 
                        jobName, 
-                       cfg, 
-                       ExecutionConfigTest.getSerializedConfig(),
+                       cfg,
+                               new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {
@@ -190,7 +192,7 @@ public class ExecutionGraphConstructionTest {
        }
        
        @Test
-       public void testAttachViaIds() {
+       public void testAttachViaIds() throws Exception {
                final JobID jobId = new JobID();
                final String jobName = "Test Job Sample Name";
                final Configuration cfg = new Configuration();
@@ -219,8 +221,8 @@ public class ExecutionGraphConstructionTest {
                        TestingUtils.defaultExecutionContext(), 
                        jobId, 
                        jobName, 
-                       cfg, 
-                       ExecutionConfigTest.getSerializedConfig(),
+                       cfg,
+                       new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {
@@ -459,7 +461,7 @@ public class ExecutionGraphConstructionTest {
        }
        
        @Test
-       public void testCannotConnectMissingId() {
+       public void testCannotConnectMissingId() throws Exception {
                final JobID jobId = new JobID();
                final String jobName = "Test Job Sample Name";
                final Configuration cfg = new Configuration();
@@ -474,8 +476,8 @@ public class ExecutionGraphConstructionTest {
                        TestingUtils.defaultExecutionContext(), 
                        jobId, 
                        jobName, 
-                       cfg, 
-                       ExecutionConfigTest.getSerializedConfig(),
+                       cfg,
+                       new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {
@@ -502,7 +504,7 @@ public class ExecutionGraphConstructionTest {
        }
 
        @Test
-       public void testCannotConnectWrongOrder() {
+       public void testCannotConnectWrongOrder() throws Exception {
                final JobID jobId = new JobID();
                final String jobName = "Test Job Sample Name";
                final Configuration cfg = new Configuration();
@@ -531,8 +533,8 @@ public class ExecutionGraphConstructionTest {
                        TestingUtils.defaultExecutionContext(), 
                        jobId, 
                        jobName, 
-                       cfg, 
-                       ExecutionConfigTest.getSerializedConfig(),
+                       cfg,
+                       new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {
@@ -593,8 +595,8 @@ public class ExecutionGraphConstructionTest {
                                TestingUtils.defaultExecutionContext(), 
                                jobId, 
                                jobName, 
-                               cfg, 
-                               ExecutionConfigTest.getSerializedConfig(),
+                               cfg,
+                               new SerializedValue<>(new ExecutionConfig()),
                                AkkaUtils.getDefaultTimeout(),
                                new NoRestartStrategy());
                        try {
@@ -639,8 +641,8 @@ public class ExecutionGraphConstructionTest {
                                TestingUtils.defaultExecutionContext(), 
                                jobId, 
                                jobName,
-                               cfg, 
-                               ExecutionConfigTest.getSerializedConfig(),
+                               cfg,
+                               new SerializedValue<>(new ExecutionConfig()),
                                AkkaUtils.getDefaultTimeout(),
                                new NoRestartStrategy());
 
@@ -711,8 +713,8 @@ public class ExecutionGraphConstructionTest {
                                TestingUtils.defaultExecutionContext(), 
                                jobId, 
                                jobName, 
-                               cfg, 
-                               ExecutionConfigTest.getSerializedConfig(),
+                               cfg,
+                               new SerializedValue<>(new ExecutionConfig()),
                                AkkaUtils.getDefaultTimeout(),
                                new NoRestartStrategy());
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index d126acb..a599f42 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -30,7 +30,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.flink.api.common.ExecutionConfigTest;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -47,6 +47,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
+
 import org.junit.Test;
 
 public class ExecutionGraphDeploymentTest {
@@ -85,7 +87,7 @@ public class ExecutionGraphDeploymentTest {
                                jobId, 
                                "some job", 
                                new Configuration(),
-                               ExecutionConfigTest.getSerializedConfig(),
+                               new SerializedValue<>(new ExecutionConfig()),
                                AkkaUtils.getDefaultTimeout(),
                                new NoRestartStrategy());
 
@@ -288,8 +290,8 @@ public class ExecutionGraphDeploymentTest {
                        TestingUtils.directExecutionContext(), 
                        jobId, 
                        "some job", 
-                       new Configuration(),
-                       ExecutionConfigTest.getSerializedConfig(),
+                       new Configuration(), 
+                       new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 26ba04f..c12f346 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.executiongraph;
 
 import akka.dispatch.Futures;
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
@@ -39,7 +38,9 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
@@ -86,7 +87,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                                new JobID(),
                                "test job",
                                new Configuration(),
-                               ExecutionConfigTest.getSerializedConfig(),
+                               new SerializedValue<>(new ExecutionConfig()),
                                AkkaUtils.getDefaultTimeout(),
                                new NoRestartStrategy());
                
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
@@ -141,7 +142,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                        new JobID(),
                        "test job",
                        new Configuration(),
-                       ExecutionConfigTest.getSerializedConfig(),
+                       new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
                        new FixedDelayRestartStrategy(1, 0L));
                
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
@@ -197,7 +198,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                                new JobID(),
                                "Test job",
                                new Configuration(),
-                               ExecutionConfigTest.getSerializedConfig(),
+                               new SerializedValue<>(new ExecutionConfig()),
                                AkkaUtils.getDefaultTimeout(),
                                new FixedDelayRestartStrategy(1, 1000));
                
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
@@ -226,7 +227,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                                new JobID(),
                                "TestJob",
                                new Configuration(),
-                               ExecutionConfigTest.getSerializedConfig(),
+                               new SerializedValue<>(new ExecutionConfig()),
                                AkkaUtils.getDefaultTimeout(),
                                // We want to manually control the restart and 
delay
                                new 
FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE));
@@ -344,7 +345,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                                new JobID(),
                                "TestJob",
                                new Configuration(),
-                               ExecutionConfigTest.getSerializedConfig(),
+                               new SerializedValue<>(new ExecutionConfig()),
                                AkkaUtils.getDefaultTimeout(),
                                // We want to manually control the restart and 
delay
                                new 
FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE));
@@ -427,7 +428,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                        new JobID(),
                        "Test job",
                        new Configuration(),
-                       ExecutionConfigTest.getSerializedConfig(),
+                       new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
                        new FixedDelayRestartStrategy(1, 1000)));
 
@@ -498,7 +499,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                        new JobID(),
                        "test job",
                        new Configuration(),
-                       ExecutionConfigTest.getSerializedConfig(),
+                       new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
                        new FixedDelayRestartStrategy(1, 1000));
 
@@ -594,7 +595,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                                new JobID(),
                                "test job",
                                new Configuration(),
-                               ExecutionConfigTest.getSerializedConfig(),
+                               new SerializedValue<>(new ExecutionConfig()),
                                AkkaUtils.getDefaultTimeout(),
                                new FixedDelayRestartStrategy(1, 1000000));
 
@@ -648,7 +649,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                                new JobID(),
                                "test job",
                                new Configuration(),
-                               ExecutionConfigTest.getSerializedConfig(),
+                               new SerializedValue<>(new ExecutionConfig()),
                                AkkaUtils.getDefaultTimeout(),
                                new FixedDelayRestartStrategy(1, 1000000));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
index bcee5a1..59f2a9b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
@@ -22,7 +22,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.flink.api.common.ExecutionConfigTest;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.StoppingException;
@@ -32,6 +32,8 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
+
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -128,7 +130,7 @@ public class ExecutionGraphSignalsTest {
                        jobId,
                        jobName,
                        cfg,
-                       ExecutionConfigTest.getSerializedConfig(),
+                       new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                eg.attachJobGraph(ordered);

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 92a7402..903d5f9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -22,13 +22,11 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 
-import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.InetAddress;
 
-import org.apache.flink.api.common.ExecutionConfigTest;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -52,6 +50,7 @@ import 
org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPart
 import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
 import org.mockito.Matchers;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -168,7 +167,7 @@ public class ExecutionGraphTestUtils {
 
        public static final String ERROR_MESSAGE = "test_failure_error_message";
 
-       public static ExecutionJobVertex getExecutionVertex(JobVertexID id, 
ExecutionContext executionContext) throws JobException {
+       public static ExecutionJobVertex getExecutionVertex(JobVertexID id, 
ExecutionContext executionContext) throws Exception {
                JobVertex ajv = new JobVertex("TestVertex", id);
                ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
 
@@ -176,8 +175,8 @@ public class ExecutionGraphTestUtils {
                        executionContext, 
                        new JobID(), 
                        "test job", 
-                       new Configuration(), 
-                       ExecutionConfigTest.getSerializedConfig(),
+                       new Configuration(),
+                       new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
 
@@ -198,7 +197,7 @@ public class ExecutionGraphTestUtils {
                return ejv;
        }
        
-       public static ExecutionJobVertex getExecutionVertex(JobVertexID id) 
throws JobException, IOException {
+       public static ExecutionJobVertex getExecutionVertex(JobVertexID id) 
throws Exception {
                return getExecutionVertex(id, 
TestingUtils.defaultExecutionContext());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index 9e4aa6d..f3743c1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -22,9 +22,9 @@ import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.mock;
 
-import java.util.Arrays;
+import java.util.Collections;
 
-import org.apache.flink.api.common.ExecutionConfigTest;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 
 public class ExecutionStateProgressTest {
@@ -54,10 +55,10 @@ public class ExecutionStateProgressTest {
                                jid, 
                                "test job", 
                                new Configuration(),
-                               ExecutionConfigTest.getSerializedConfig(),
+                               new SerializedValue<>(new ExecutionConfig()),
                                AkkaUtils.getDefaultTimeout(),
                                new NoRestartStrategy());
-                       graph.attachJobGraph(Arrays.asList(ajv));
+                       graph.attachJobGraph(Collections.singletonList(ajv));
 
                        setGraphStatus(graph, JobStatus.RUNNING);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
index d7ce0ba..f03370c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
@@ -25,7 +25,7 @@ import static org.mockito.Mockito.when;
 import java.net.InetAddress;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.flink.api.common.ExecutionConfigTest;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.StrictlyLocalAssignment;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.InputSplitSource;
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 
 import scala.concurrent.duration.FiniteDuration;
@@ -273,7 +274,7 @@ public class LocalInputSplitsTest {
                                jobGraph.getJobID(),
                                jobGraph.getName(),  
                                jobGraph.getJobConfiguration(),
-                               ExecutionConfigTest.getSerializedConfig(),
+                               new SerializedValue<>(new ExecutionConfig()),
                                TIMEOUT,
                                new NoRestartStrategy());
                        
@@ -338,7 +339,7 @@ public class LocalInputSplitsTest {
                        jobGraph.getJobID(),
                        jobGraph.getName(),  
                        jobGraph.getJobConfiguration(),
-                       ExecutionConfigTest.getSerializedConfig(),
+                               new SerializedValue<>(new ExecutionConfig()),
                        TIMEOUT,
                        new NoRestartStrategy());
                

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
index 1b369db..0eb0607 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -22,10 +22,12 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import org.apache.flink.api.common.ExecutionConfigTest;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
+
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -46,7 +48,7 @@ public class PointwisePatternTest {
        private final Configuration cfg = new Configuration();
        
        @Test
-       public void testNToN() {
+       public void testNToN() throws Exception {
                final int N = 23;
                
                JobVertex v1 = new JobVertex("vertex1");
@@ -64,7 +66,7 @@ public class PointwisePatternTest {
                        jobId, 
                        jobName, 
                        cfg,
-                       ExecutionConfigTest.getSerializedConfig(),
+                       new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {
@@ -88,7 +90,7 @@ public class PointwisePatternTest {
        }
        
        @Test
-       public void test2NToN() {
+       public void test2NToN() throws Exception {
                final int N = 17;
                
                JobVertex v1 = new JobVertex("vertex1");
@@ -106,7 +108,7 @@ public class PointwisePatternTest {
                        jobId, 
                        jobName, 
                        cfg,
-                       ExecutionConfigTest.getSerializedConfig(),
+                       new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {
@@ -131,7 +133,7 @@ public class PointwisePatternTest {
        }
        
        @Test
-       public void test3NToN() {
+       public void test3NToN() throws Exception {
                final int N = 17;
                
                JobVertex v1 = new JobVertex("vertex1");
@@ -149,7 +151,7 @@ public class PointwisePatternTest {
                        jobId, 
                        jobName, 
                        cfg,
-                       ExecutionConfigTest.getSerializedConfig(),
+                       new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {
@@ -175,7 +177,7 @@ public class PointwisePatternTest {
        }
        
        @Test
-       public void testNTo2N() {
+       public void testNTo2N() throws Exception {
                final int N = 41;
                
                JobVertex v1 = new JobVertex("vertex1");
@@ -193,7 +195,7 @@ public class PointwisePatternTest {
                        jobId, 
                        jobName,
                        cfg,
-                       ExecutionConfigTest.getSerializedConfig(),
+                       new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {
@@ -217,7 +219,7 @@ public class PointwisePatternTest {
        }
        
        @Test
-       public void testNTo7N() {
+       public void testNTo7N() throws Exception {
                final int N = 11;
                
                JobVertex v1 = new JobVertex("vertex1");
@@ -235,7 +237,7 @@ public class PointwisePatternTest {
                        jobId, 
                        jobName, 
                        cfg,
-                       ExecutionConfigTest.getSerializedConfig(),
+                       new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {
@@ -259,7 +261,7 @@ public class PointwisePatternTest {
        }
        
        @Test
-       public void testLowHighIrregular() {
+       public void testLowHighIrregular() throws Exception {
                testLowToHigh(3, 16);
                testLowToHigh(19, 21);
                testLowToHigh(15, 20);
@@ -267,14 +269,14 @@ public class PointwisePatternTest {
        }
        
        @Test
-       public void testHighLowIrregular() {
+       public void testHighLowIrregular() throws Exception {
                testHighToLow(16, 3);
                testHighToLow(21, 19);
                testHighToLow(20, 15);
                testHighToLow(31, 11);
        }
        
-       private void testLowToHigh(int lowDop, int highDop) {
+       private void testLowToHigh(int lowDop, int highDop) throws Exception {
                if (highDop < lowDop) {
                        throw new IllegalArgumentException();
                }
@@ -297,7 +299,7 @@ public class PointwisePatternTest {
                        jobId, 
                        jobName, 
                        cfg,
-                       ExecutionConfigTest.getSerializedConfig(),
+                       new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {
@@ -327,7 +329,7 @@ public class PointwisePatternTest {
                }
        }
        
-       private void testHighToLow(int highDop, int lowDop) {
+       private void testHighToLow(int highDop, int lowDop) throws Exception {
                if (highDop < lowDop) {
                        throw new IllegalArgumentException();
                }
@@ -350,7 +352,7 @@ public class PointwisePatternTest {
                        jobId, 
                        jobName, 
                        cfg,
-                       ExecutionConfigTest.getSerializedConfig(),
+                       new SerializedValue<>(new ExecutionConfig()),
                        AkkaUtils.getDefaultTimeout(),
                        new NoRestartStrategy());
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
index 8bc474b..2a690d9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.api.common.ExecutionConfigTest;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -35,12 +35,14 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
+
 import org.junit.Test;
 
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.InetAddress;
 import java.util.Arrays;
@@ -105,9 +107,6 @@ public class TerminalStateDeadlockTest {
                        final JobID jobId = resource.getJobID();
                        final JobVertexID vid1 = new JobVertexID();
                        final JobVertexID vid2 = new JobVertexID();
-
-                       
-                       final Configuration jobConfig = new Configuration();
                        
                        final List<JobVertex> vertices;
                        {
@@ -182,13 +181,13 @@ public class TerminalStateDeadlockTest {
                
                private volatile boolean done;
 
-               TestExecGraph(JobID jobId) {
+               TestExecGraph(JobID jobId) throws IOException {
                        super(
                                TestingUtils.defaultExecutionContext(),
                                jobId,
                                "test graph",
                                EMPTY_CONFIG,
-                               ExecutionConfigTest.getSerializedConfig(),
+                               new SerializedValue<>(new ExecutionConfig()),
                                TIMEOUT,
                                new FixedDelayRestartStrategy(1, 0));
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
index c483f41..4ee06b36 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
@@ -26,7 +26,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.flink.api.common.ExecutionConfigTest;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.DummyActorGateway;
@@ -43,6 +43,7 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 
 import scala.concurrent.duration.FiniteDuration;
@@ -83,7 +84,7 @@ public class VertexLocationConstraintTest {
                                        jg.getJobID(),
                                        jg.getName(),
                                        jg.getJobConfiguration(),
-                                       
ExecutionConfigTest.getSerializedConfig(),
+                                       new SerializedValue<>(new 
ExecutionConfig()),
                                        timeout,
                                        new NoRestartStrategy());
                        eg.attachJobGraph(Collections.singletonList(jobVertex));
@@ -156,7 +157,7 @@ public class VertexLocationConstraintTest {
                                        jg.getJobID(),
                                        jg.getName(),
                                        jg.getJobConfiguration(),
-                                       
ExecutionConfigTest.getSerializedConfig(),
+                                       new SerializedValue<>(new 
ExecutionConfig()),
                                        timeout,
                                        new NoRestartStrategy());
                        eg.attachJobGraph(Collections.singletonList(jobVertex));
@@ -233,7 +234,7 @@ public class VertexLocationConstraintTest {
                                        jg.getJobID(),
                                        jg.getName(),
                                        jg.getJobConfiguration(),
-                                       
ExecutionConfigTest.getSerializedConfig(),
+                                       new SerializedValue<>(new 
ExecutionConfig()),
                                        timeout,
                                        new NoRestartStrategy());
                        eg.attachJobGraph(Arrays.asList(jobVertex1, 
jobVertex2));
@@ -301,7 +302,7 @@ public class VertexLocationConstraintTest {
                                        jg.getJobID(),
                                        jg.getName(),
                                        jg.getJobConfiguration(),
-                                       
ExecutionConfigTest.getSerializedConfig(),
+                                       new SerializedValue<>(new 
ExecutionConfig()),
                                        timeout,
                                        new NoRestartStrategy());
                        eg.attachJobGraph(Collections.singletonList(jobVertex));
@@ -371,7 +372,7 @@ public class VertexLocationConstraintTest {
                                        jg.getJobID(),
                                        jg.getName(),
                                        jg.getJobConfiguration(),
-                                       
ExecutionConfigTest.getSerializedConfig(),
+                                       new SerializedValue<>(new 
ExecutionConfig()),
                                        timeout,
                                        new NoRestartStrategy());
                        eg.attachJobGraph(Arrays.asList(jobVertex1, 
jobVertex2));
@@ -411,7 +412,7 @@ public class VertexLocationConstraintTest {
                                        jg.getJobID(),
                                        jg.getName(),
                                        jg.getJobConfiguration(),
-                                       
ExecutionConfigTest.getSerializedConfig(),
+                                       new SerializedValue<>(new 
ExecutionConfig()),
                                        timeout,
                                        new NoRestartStrategy());
                        eg.attachJobGraph(Collections.singletonList(vertex));

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
index 7a23e26..6314969 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
@@ -24,7 +24,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.flink.api.common.ExecutionConfigTest;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 
 public class VertexSlotSharingTest {
@@ -76,7 +77,7 @@ public class VertexSlotSharingTest {
                                        new JobID(),
                                        "test job",
                                        new Configuration(),
-                                       
ExecutionConfigTest.getSerializedConfig(),
+                                       new SerializedValue<>(new 
ExecutionConfig()),
                                        AkkaUtils.getDefaultTimeout(),
                                        new NoRestartStrategy());
                        eg.attachJobGraph(vertices);

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
index bc8cd63..3a9488d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
@@ -25,7 +25,8 @@ import static org.junit.Assert.fail;
 
 import java.net.InetAddress;
 
-import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.util.InstantiationUtil;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -97,10 +98,10 @@ public class InstanceConnectionInfoTest {
                        {
                                InstanceConnectionInfo original = new 
InstanceConnectionInfo(InetAddress.getByName("1.2.3.4"), 8888);
                                
-                               InstanceConnectionInfo copy = 
CommonTestUtils.createCopyWritable(original);
+                               InstanceConnectionInfo copy = 
InstantiationUtil.createCopyWritable(original);
                                assertEquals(original, copy);
                                
-                               InstanceConnectionInfo serCopy = 
CommonTestUtils.createCopySerializable(original);
+                               InstanceConnectionInfo serCopy = 
InstantiationUtil.clone(original);
                                assertEquals(original, serCopy);
                        }
                                                
@@ -109,10 +110,10 @@ public class InstanceConnectionInfoTest {
                                InstanceConnectionInfo original = new 
InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 19871);
                                original.getFQDNHostname();
                                
-                               InstanceConnectionInfo copy = 
CommonTestUtils.createCopyWritable(original);
+                               InstanceConnectionInfo copy = 
InstantiationUtil.createCopyWritable(original);
                                assertEquals(original, copy);
                                
-                               InstanceConnectionInfo serCopy = 
CommonTestUtils.createCopySerializable(original);
+                               InstanceConnectionInfo serCopy = 
InstantiationUtil.clone(original);
                                assertEquals(original, serCopy);
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/MultiShotLatch.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/MultiShotLatch.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/MultiShotLatch.java
deleted file mode 100644
index 7c55419..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/MultiShotLatch.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-/**
- * Latch for synchronizing parts of code in tests. In contrast to
- * {@link org.apache.flink.runtime.taskmanager.OneShotLatch} this will reset 
the state once
- * {@link #await()} returns.
- *
- * <p>
- * A part of the code that should only run after other code calls {@link 
#await()}. The call
- * will only return once the other part is finished and calls {@link 
#trigger()}.
- */
-public final class MultiShotLatch {
-       
-       private final Object lock = new Object();
-       
-       private volatile boolean triggered;
-
-       /**
-        * Fires the latch. Code that is blocked on {@link #await()} will now 
return.
-        */
-       public void trigger() {
-               synchronized (lock) {
-                       triggered = true;
-                       lock.notifyAll();
-               }
-       }
-
-       /**
-        * Waits until {@link #trigger())} is called.
-        */
-       public void await() throws InterruptedException {
-               synchronized (lock) {
-                       while (!triggered) {
-                               lock.wait();
-                       }
-                       triggered = false;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/OneShotLatch.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/OneShotLatch.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/OneShotLatch.java
deleted file mode 100644
index 504ccca..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/OneShotLatch.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-/**
- * Latch for synchronizing parts of code in tests. Once the latch has fired 
once calls to
- * {@link #await()} will return immediately in the future.
- *
- * <p>
- * A part of the code that should only run after other code calls {@link 
#await()}. The call
- * will only return once the other part is finished and calls {@link 
#trigger()}.
- */
-public final class OneShotLatch {
-       
-       private final Object lock = new Object();
-       
-       private boolean triggered;
-
-       /**
-        * Fires the latch. Code that is blocked on {@link #await()} will now 
return.
-        */
-       public void trigger() {
-               synchronized (lock) {
-                       triggered = true;
-                       lock.notifyAll();
-               }
-       }
-
-       /**
-        * Waits until {@link #trigger())} is called. Once {@code trigger()} 
has been called this
-        * call will always return immediately.
-        */
-       public void await() throws InterruptedException {
-               synchronized (lock) {
-                       while (!triggered) {
-                               lock.wait();
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index e1f551c..7b55987 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-import org.apache.flink.api.common.ExecutionConfigTest;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -43,6 +44,7 @@ import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.memory.MemoryManager;
 
 import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.util.SerializedValue;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -137,7 +139,7 @@ public class TaskAsyncCallTest {
                }
        }
        
-       private static Task createTask() {
+       private static Task createTask() throws Exception {
                LibraryCacheManager libCache = mock(LibraryCacheManager.class);
                
when(libCache.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader());
                
@@ -150,7 +152,7 @@ public class TaskAsyncCallTest {
 
                TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
                                new JobID(), "Job Name", new JobVertexID(), new 
ExecutionAttemptID(),
-                               ExecutionConfigTest.getSerializedConfig(),
+                               new SerializedValue<>(new ExecutionConfig()),
                                "Test Task", 0, 1, 0,
                                new Configuration(), new Configuration(),
                                CheckpointsInOrderInvokable.class.getName(),

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
index f367799..335f788 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java
@@ -33,6 +33,7 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.lang.reflect.Field;
 import java.net.*;
+import java.util.UUID;
 
 import static org.junit.Assert.*;
 
@@ -109,7 +110,7 @@ public class TaskManagerConfigurationTest {
        @Test
        public void testDefaultFsParameterLoading() {
                final File tmpDir = getTmpDir();
-               final File confFile =  new File(tmpDir.getAbsolutePath() + 
File.separator + CommonTestUtils.getRandomDirectoryName() + ".yaml");
+               final File confFile =  new File(tmpDir, 
UUID.randomUUID().toString() + ".yaml");
 
                try {
                        final URI defaultFS = new URI("otherFS", null, 
"localhost", 1234, null, null, null);
@@ -146,13 +147,7 @@ public class TaskManagerConfigurationTest {
                try {
                        InetAddress localhostAddress = 
InetAddress.getByName(hostname);
                        server = new ServerSocket(0, 50, localhostAddress);
-               }
-               catch (UnknownHostException e) {
-                       // may happen if disconnected. skip test.
-                       System.err.println("Skipping 
'testNetworkInterfaceSelection' test.");
-                       return;
-               }
-               catch (IOException e) {
+               } catch (IOException e) {
                        // may happen in certain test setups, skip test.
                        System.err.println("Skipping 
'testNetworkInterfaceSelection' test.");
                        return;
@@ -181,10 +176,8 @@ public class TaskManagerConfigurationTest {
        }
 
        private File getTmpDir() {
-               File tmpDir = new File(CommonTestUtils.getTempDir() + 
File.separator
-                       + CommonTestUtils.getRandomDirectoryName() + 
File.separator);
-               tmpDir.mkdirs();
-
+               File tmpDir = new File(CommonTestUtils.getTempDir(), 
UUID.randomUUID().toString());
+               assertTrue("could not create temp directory", tmpDir.mkdirs());
                return tmpDir;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index b3ad589..ce88c09 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -24,7 +24,6 @@ import akka.actor.Props;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.ExecutionConfigTest;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -160,7 +159,7 @@ public class TaskManagerTest extends TestLogger {
                                final JobID jid = new JobID();
                                final JobVertexID vid = new JobVertexID();
                                final ExecutionAttemptID eid = new 
ExecutionAttemptID();
-                               final SerializedValue<ExecutionConfig> 
executionConfig = ExecutionConfigTest.getSerializedConfig();
+                               final SerializedValue<ExecutionConfig> 
executionConfig = new SerializedValue<>(new ExecutionConfig());
 
                                final TaskDeploymentDescriptor tdd = new 
TaskDeploymentDescriptor(jid, "TestJob", vid, eid, executionConfig,
                                                "TestTask", 2, 7, 0, new 
Configuration(), new Configuration(),
@@ -263,15 +262,19 @@ public class TaskManagerTest extends TestLogger {
                                final ExecutionAttemptID eid1 = new 
ExecutionAttemptID();
                                final ExecutionAttemptID eid2 = new 
ExecutionAttemptID();
 
-                               final TaskDeploymentDescriptor tdd1 = new 
TaskDeploymentDescriptor(jid1, "TestJob1", vid1, eid1,
-                                               
ExecutionConfigTest.getSerializedConfig(), "TestTask1", 1, 5, 0,
+                               final TaskDeploymentDescriptor tdd1 = new 
TaskDeploymentDescriptor(
+                                               jid1, "TestJob1", vid1, eid1,
+                                               new SerializedValue<>(new 
ExecutionConfig()),
+                                               "TestTask1", 1, 5, 0,
                                                new Configuration(), new 
Configuration(), TestInvokableBlockingCancelable.class.getName(),
                                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
                                                
Collections.<InputGateDeploymentDescriptor>emptyList(),
                                                new ArrayList<BlobKey>(), 
Collections.<URL>emptyList(), 0);
 
-                               final TaskDeploymentDescriptor tdd2 = new 
TaskDeploymentDescriptor(jid2, "TestJob2", vid2, eid2,
-                                                
ExecutionConfigTest.getSerializedConfig(), "TestTask2", 2, 7, 0,
+                               final TaskDeploymentDescriptor tdd2 = new 
TaskDeploymentDescriptor(
+                                               jid2, "TestJob2", vid2, eid2,
+                                               new SerializedValue<>(new 
ExecutionConfig()),
+                                               "TestTask2", 2, 7, 0,
                                                new Configuration(), new 
Configuration(), TestInvokableBlockingCancelable.class.getName(),
                                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
                                                
Collections.<InputGateDeploymentDescriptor>emptyList(),
@@ -367,7 +370,7 @@ public class TaskManagerTest extends TestLogger {
        }
 
        @Test
-       public void testJobSubmissionAndStop() {
+       public void testJobSubmissionAndStop() throws Exception {
                new JavaTestKit(system){{
 
                        ActorGateway jobManager = null;
@@ -397,7 +400,7 @@ public class TaskManagerTest extends TestLogger {
                                final ExecutionAttemptID eid1 = new 
ExecutionAttemptID();
                                final ExecutionAttemptID eid2 = new 
ExecutionAttemptID();
 
-                               final SerializedValue<ExecutionConfig> 
executionConfig = ExecutionConfigTest.getSerializedConfig();
+                               final SerializedValue<ExecutionConfig> 
executionConfig = new SerializedValue<>(new ExecutionConfig());
 
                                final TaskDeploymentDescriptor tdd1 = new 
TaskDeploymentDescriptor(jid1, "TestJob", vid1, eid1, executionConfig,
                                                "TestTask1", 1, 5, 0, new 
Configuration(), new Configuration(), StoppableInvokable.class.getName(),
@@ -525,15 +528,19 @@ public class TaskManagerTest extends TestLogger {
                                final ExecutionAttemptID eid1 = new 
ExecutionAttemptID();
                                final ExecutionAttemptID eid2 = new 
ExecutionAttemptID();
 
-                               final TaskDeploymentDescriptor tdd1 = new 
TaskDeploymentDescriptor(jid, "TestJob", vid1, eid1,
-                                               
ExecutionConfigTest.getSerializedConfig(), "Sender", 0, 1, 0,
+                               final TaskDeploymentDescriptor tdd1 = new 
TaskDeploymentDescriptor(
+                                               jid, "TestJob", vid1, eid1,
+                                               new SerializedValue<>(new 
ExecutionConfig()),
+                                               "Sender", 0, 1, 0,
                                                new Configuration(), new 
Configuration(), Tasks.Sender.class.getName(),
                                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
                                                
Collections.<InputGateDeploymentDescriptor>emptyList(),
                                                new ArrayList<BlobKey>(), 
Collections.<URL>emptyList(), 0);
 
-                               final TaskDeploymentDescriptor tdd2 = new 
TaskDeploymentDescriptor(jid, "TestJob", vid2, eid2,
-                                               
ExecutionConfigTest.getSerializedConfig(), "Receiver", 2, 7, 0,
+                               final TaskDeploymentDescriptor tdd2 = new 
TaskDeploymentDescriptor(
+                                               jid, "TestJob", vid2, eid2,
+                                               new SerializedValue<>(new 
ExecutionConfig()),
+                                               "Receiver", 2, 7, 0,
                                                new Configuration(), new 
Configuration(), Tasks.Receiver.class.getName(),
                                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
                                                
Collections.<InputGateDeploymentDescriptor>emptyList(),
@@ -626,14 +633,18 @@ public class TaskManagerTest extends TestLogger {
                                                                }
                                                );
 
-                               final TaskDeploymentDescriptor tdd1 = new 
TaskDeploymentDescriptor(jid, "TestJob", vid1, eid1,
-                                               
ExecutionConfigTest.getSerializedConfig(), "Sender", 0, 1, 0,
+                               final TaskDeploymentDescriptor tdd1 = new 
TaskDeploymentDescriptor(
+                                               jid, "TestJob", vid1, eid1,
+                                               new SerializedValue<>(new 
ExecutionConfig()),
+                                               "Sender", 0, 1, 0,
                                                new Configuration(), new 
Configuration(), Tasks.Sender.class.getName(),
                                                irpdd, 
Collections.<InputGateDeploymentDescriptor>emptyList(), new 
ArrayList<BlobKey>(),
                                                Collections.<URL>emptyList(), 
0);
 
-                               final TaskDeploymentDescriptor tdd2 = new 
TaskDeploymentDescriptor(jid, "TestJob", vid2, eid2,
-                                               
ExecutionConfigTest.getSerializedConfig(), "Receiver", 2, 7, 0,
+                               final TaskDeploymentDescriptor tdd2 = new 
TaskDeploymentDescriptor(
+                                               jid, "TestJob", vid2, eid2,
+                                               new SerializedValue<>(new 
ExecutionConfig()),
+                                               "Receiver", 2, 7, 0,
                                                new Configuration(), new 
Configuration(), Tasks.Receiver.class.getName(),
                                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
                                                
Collections.singletonList(ircdd),
@@ -767,14 +778,18 @@ public class TaskManagerTest extends TestLogger {
                                                                }
                                                );
 
-                               final TaskDeploymentDescriptor tdd1 = new 
TaskDeploymentDescriptor(jid, "TestJob", vid1, eid1,
-                                               
ExecutionConfigTest.getSerializedConfig(), "Sender", 0, 1, 0,
+                               final TaskDeploymentDescriptor tdd1 = new 
TaskDeploymentDescriptor(
+                                               jid, "TestJob", vid1, eid1,
+                                               new SerializedValue<>(new 
ExecutionConfig()),
+                                               "Sender", 0, 1, 0,
                                                new Configuration(), new 
Configuration(), Tasks.Sender.class.getName(),
                                                irpdd, 
Collections.<InputGateDeploymentDescriptor>emptyList(),
                                                new ArrayList<BlobKey>(), 
Collections.<URL>emptyList(), 0);
 
-                               final TaskDeploymentDescriptor tdd2 = new 
TaskDeploymentDescriptor(jid, "TestJob", vid2, eid2,
-                                               
ExecutionConfigTest.getSerializedConfig(), "Receiver", 2, 7, 0,
+                               final TaskDeploymentDescriptor tdd2 = new 
TaskDeploymentDescriptor(
+                                               jid, "TestJob", vid2, eid2,
+                                               new SerializedValue<>(new 
ExecutionConfig()),
+                                               "Receiver", 2, 7, 0,
                                                new Configuration(), new 
Configuration(), Tasks.BlockingReceiver.class.getName(),
                                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
                                                
Collections.singletonList(ircdd),
@@ -913,7 +928,8 @@ public class TaskManagerTest extends TestLogger {
 
                                final TaskDeploymentDescriptor tdd = new 
TaskDeploymentDescriptor(
                                                jid, "TestJob", vid, eid,
-                                               
ExecutionConfigTest.getSerializedConfig(), "Receiver", 0, 1, 0,
+                                               new SerializedValue<>(new 
ExecutionConfig()),
+                                               "Receiver", 0, 1, 0,
                                                new Configuration(), new 
Configuration(),
                                                
Tasks.AgnosticReceiver.class.getName(),
                                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
@@ -1007,7 +1023,9 @@ public class TaskManagerTest extends TestLogger {
                                                new 
InputGateDeploymentDescriptor(resultId, 0, icdd);
 
                                final TaskDeploymentDescriptor tdd = new 
TaskDeploymentDescriptor(
-                                               jid, "TestJob", vid, eid, 
ExecutionConfigTest.getSerializedConfig(), "Receiver", 0, 1, 0,
+                                               jid, "TestJob", vid, eid,
+                                               new SerializedValue<>(new 
ExecutionConfig()),
+                                               "Receiver", 0, 1, 0,
                                                new Configuration(), new 
Configuration(),
                                                
Tasks.AgnosticReceiver.class.getName(),
                                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
@@ -1084,7 +1102,7 @@ public class TaskManagerTest extends TestLogger {
                                                "Job",
                                                new JobVertexID(),
                                                new ExecutionAttemptID(),
-                                               
ExecutionConfigTest.getSerializedConfig(),
+                                               new SerializedValue<>(new 
ExecutionConfig()),
                                                "Task",
                                                0,
                                                1,

Reply via email to