http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala new file mode 100644 index 0000000..ec513e3 --- /dev/null +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testingUtils + +import java.util.Map + +import akka.actor.ActorRef +import org.apache.flink.api.common.JobID +import org.apache.flink.api.common.accumulators.Accumulator +import org.apache.flink.runtime.accumulators.AccumulatorRegistry +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint +import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph} +import org.apache.flink.runtime.instance.ActorGateway +import org.apache.flink.runtime.jobgraph.JobStatus + +object TestingJobManagerMessages { + + case class RequestExecutionGraph(jobID: JobID) + + sealed trait ResponseExecutionGraph { + def jobID: JobID + } + + case class ExecutionGraphFound(jobID: JobID, executionGraph: ExecutionGraph) extends + ResponseExecutionGraph + + case class ExecutionGraphNotFound(jobID: JobID) extends ResponseExecutionGraph + + case class WaitForAllVerticesToBeRunning(jobID: JobID) + case class WaitForAllVerticesToBeRunningOrFinished(jobID: JobID) + case class AllVerticesRunning(jobID: JobID) + + case class NotifyWhenJobRemoved(jobID: JobID) + + case class RequestWorkingTaskManager(jobID: JobID) + case class WorkingTaskManager(gatewayOption: Option[ActorGateway]) + + case class NotifyWhenJobStatus(jobID: JobID, state: JobStatus) + case class JobStatusIs(jobID: JobID, state: JobStatus) + + case object NotifyListeners + + case class NotifyWhenTaskManagerTerminated(taskManager: ActorRef) + case class TaskManagerTerminated(taskManager: ActorRef) + + /** + * Registers a listener to receive a message when accumulators changed. + * The change must be explicitly triggered by the TestingTaskManager which can receive an + * [[org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged]] + * message by a task that changed the accumulators. This message is then + * forwarded to the JobManager which will send the accumulators in the [[UpdatedAccumulators]] + * message when the next Heartbeat occurs. + */ + case class NotifyWhenAccumulatorChange(jobID: JobID) + + /** + * Reports updated accumulators back to the listener. + */ + case class UpdatedAccumulators(jobID: JobID, + flinkAccumulators: Map[ExecutionAttemptID, Map[AccumulatorRegistry.Metric, Accumulator[_,_]]], + userAccumulators: Map[String, Accumulator[_,_]]) + + /** Notifies the sender when the [[TestingJobManager]] has been elected as the leader + * + */ + case object NotifyWhenLeader + + /** + * Registers to be notified by an [[org.apache.flink.runtime.messages.Messages.Acknowledge]] + * message when at least numRegisteredTaskManager have registered at the JobManager. + * + * @param numRegisteredTaskManager minimum number of registered TMs before the sender is notified + */ + case class NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager: Int) + + /** Disables the post stop method of the [[TestingJobManager]]. + * + * Only the leaderElectionService is stopped in the postStop method call to revoke the leadership + */ + case object DisablePostStop + + /** + * Requests a savepoint from the job manager. + * + * @param savepointPath The path of the savepoint to request. + */ + case class RequestSavepoint(savepointPath: String) + + /** + * Response to a savepoint request. + * + * @param savepoint The requested savepoint or null if none available. + */ + case class ResponseSavepoint(savepoint: CompletedCheckpoint) + + def getNotifyWhenLeader(): AnyRef = NotifyWhenLeader + def getDisablePostStop(): AnyRef = DisablePostStop + +}
http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala new file mode 100644 index 0000000..48a1ddd --- /dev/null +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testingUtils + +import org.apache.flink.runtime.jobmanager.MemoryArchivist +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{ExecutionGraphFound, ExecutionGraphNotFound, RequestExecutionGraph} + +/** Memory archivist extended by testing messages + * + * @param maxEntries number of maximum number of archived jobs + */ +class TestingMemoryArchivist(maxEntries: Int) extends MemoryArchivist(maxEntries) { + + override def handleMessage: Receive = { + handleTestingMessage orElse super.handleMessage + } + + def handleTestingMessage: Receive = { + case RequestExecutionGraph(jobID) => + val executionGraph = graphs.get(jobID) + + executionGraph match { + case Some(graph) => sender ! decorateMessage(ExecutionGraphFound(jobID, graph)) + case None => sender ! decorateMessage(ExecutionGraphNotFound(jobID)) + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala new file mode 100644 index 0000000..91d169a --- /dev/null +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testingUtils + +import akka.actor.ActorRef +import org.apache.flink.api.common.JobID + +object TestingMessages { + + case class CheckIfJobRemoved(jobID: JobID) + + case object DisableDisconnect + + case object Alive + + def getAlive: AnyRef = Alive + + def getDisableDisconnect: AnyRef = DisableDisconnect + + case object NotifyOfComponentShutdown + case class ComponentShutdown(ref: ActorRef) + + def getNotifyOfComponentShutdown(): AnyRef = NotifyOfComponentShutdown +} http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala new file mode 100644 index 0000000..2597753 --- /dev/null +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testingUtils + +import org.apache.flink.runtime.clusterframework.types.ResourceID +import org.apache.flink.runtime.instance.InstanceConnectionInfo +import org.apache.flink.runtime.io.disk.iomanager.IOManager +import org.apache.flink.runtime.io.network.NetworkEnvironment +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService +import org.apache.flink.runtime.memory.MemoryManager +import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration} + +import scala.language.postfixOps + +/** Subclass of the [[TaskManager]] to support testing messages + */ +class TestingTaskManager( + config: TaskManagerConfiguration, + resourceID: ResourceID, + connectionInfo: InstanceConnectionInfo, + memoryManager: MemoryManager, + ioManager: IOManager, + network: NetworkEnvironment, + numberOfSlots: Int, + leaderRetrievalService: LeaderRetrievalService) + extends TaskManager( + config, + resourceID, + connectionInfo, + memoryManager, + ioManager, + network, + numberOfSlots, + leaderRetrievalService) + with TestingTaskManagerLike { + + def this( + config: TaskManagerConfiguration, + connectionInfo: InstanceConnectionInfo, + memoryManager: MemoryManager, + ioManager: IOManager, + network: NetworkEnvironment, + numberOfSlots: Int, + leaderRetrievalService: LeaderRetrievalService) { + this( + config, + ResourceID.generate(), + connectionInfo, + memoryManager, + ioManager, + network, + numberOfSlots, + leaderRetrievalService) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala new file mode 100644 index 0000000..b41db31 --- /dev/null +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testingUtils + +import akka.actor.{ActorRef, Terminated} +import org.apache.flink.api.common.JobID +import org.apache.flink.runtime.FlinkActor +import org.apache.flink.runtime.execution.ExecutionState +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID +import org.apache.flink.runtime.messages.JobManagerMessages.{RequestLeaderSessionID, ResponseLeaderSessionID} +import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect} +import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered} +import org.apache.flink.runtime.messages.TaskMessages.{SubmitTask, TaskInFinalState, UpdateTaskExecutionState} +import org.apache.flink.runtime.taskmanager.TaskManager +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved +import org.apache.flink.runtime.testingUtils.TestingMessages._ +import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._ + +import scala.concurrent.duration._ +import scala.language.postfixOps + +/** This mixin can be used to decorate a TaskManager with messages for testing purposes. */ +trait TestingTaskManagerLike extends FlinkActor { + that: TaskManager => + + import scala.collection.JavaConverters._ + + val waitForRemoval = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]() + val waitForJobManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]() + val waitForRegisteredAtResourceManager = + scala.collection.mutable.HashMap[ActorRef, Set[ActorRef]]() + val waitForRunning = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]() + val unregisteredTasks = scala.collection.mutable.HashSet[ExecutionAttemptID]() + + /** Map of registered task submit listeners */ + val registeredSubmitTaskListeners = scala.collection.mutable.HashMap[JobID, ActorRef]() + + val waitForShutdown = scala.collection.mutable.HashSet[ActorRef]() + + var disconnectDisabled = false + + /** + * Handler for testing related messages + */ + abstract override def handleMessage: Receive = { + handleTestingMessage orElse super.handleMessage + } + + def handleTestingMessage: Receive = { + case Alive => sender() ! Acknowledge + + case NotifyWhenTaskIsRunning(executionID) => + Option(runningTasks.get(executionID)) match { + case Some(task) if task.getExecutionState == ExecutionState.RUNNING => + sender ! decorateMessage(true) + + case _ => + val listeners = waitForRunning.getOrElse(executionID, Set()) + waitForRunning += (executionID -> (listeners + sender)) + } + + case RequestRunningTasks => + sender ! decorateMessage(ResponseRunningTasks(runningTasks.asScala.toMap)) + + case NotifyWhenTaskRemoved(executionID) => + Option(runningTasks.get(executionID)) match { + case Some(_) => + val set = waitForRemoval.getOrElse(executionID, Set()) + waitForRemoval += (executionID -> (set + sender)) + case None => + if(unregisteredTasks.contains(executionID)) { + sender ! decorateMessage(true) + } else { + val set = waitForRemoval.getOrElse(executionID, Set()) + waitForRemoval += (executionID -> (set + sender)) + } + } + + case TaskInFinalState(executionID) => + super.handleMessage(TaskInFinalState(executionID)) + waitForRemoval.remove(executionID) match { + case Some(actors) => for(actor <- actors) actor ! decorateMessage(true) + case None => + } + + unregisteredTasks += executionID + + case RequestBroadcastVariablesWithReferences => + sender ! decorateMessage( + ResponseBroadcastVariablesWithReferences( + bcVarManager.getNumberOfVariablesWithReferences) + ) + + case RequestNumActiveConnections => + val numActive = if (network.isAssociated) { + network.getConnectionManager.getNumberOfActiveConnections + } else { + 0 + } + sender ! decorateMessage(ResponseNumActiveConnections(numActive)) + + case NotifyWhenJobRemoved(jobID) => + if(runningTasks.values.asScala.exists(_.getJobID == jobID)){ + context.system.scheduler.scheduleOnce( + 200 milliseconds, + self, + decorateMessage(CheckIfJobRemoved(jobID)))( + context.dispatcher, + sender() + ) + }else{ + sender ! decorateMessage(true) + } + + case CheckIfJobRemoved(jobID) => + if(runningTasks.values.asScala.forall(_.getJobID != jobID)){ + sender ! decorateMessage(true) + } else { + context.system.scheduler.scheduleOnce( + 200 milliseconds, + self, + decorateMessage(CheckIfJobRemoved(jobID)))( + context.dispatcher, + sender() + ) + } + + case NotifyWhenJobManagerTerminated(jobManager) => + val waiting = waitForJobManagerToBeTerminated.getOrElse(jobManager.path.name, Set()) + waitForJobManagerToBeTerminated += jobManager.path.name -> (waiting + sender) + + case RegisterSubmitTaskListener(jobId) => + registeredSubmitTaskListeners.put(jobId, sender()) + + case msg@SubmitTask(tdd) => + registeredSubmitTaskListeners.get(tdd.getJobID) match { + case Some(listenerRef) => + listenerRef ! ResponseSubmitTaskListener(tdd) + case None => + // Nothing to do + } + + super.handleMessage(msg) + + /** + * Message from task manager that accumulator values changed and need to be reported immediately + * instead of lazily through the + * [[org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat]] message. We forward this + * message to the job manager that it knows it should report to the listeners. + */ + case msg: AccumulatorsChanged => + currentJobManager match { + case Some(jobManager) => + jobManager.forward(msg) + sendHeartbeatToJobManager() + sender ! true + case None => + } + + case msg@Terminated(jobManager) => + super.handleMessage(msg) + + waitForJobManagerToBeTerminated.remove(jobManager.path.name) foreach { + _ foreach { + _ ! decorateMessage(JobManagerTerminated(jobManager)) + } + } + + case msg:Disconnect => + if (!disconnectDisabled) { + super.handleMessage(msg) + + val jobManager = sender() + + waitForJobManagerToBeTerminated.remove(jobManager.path.name) foreach { + _ foreach { + _ ! decorateMessage(JobManagerTerminated(jobManager)) + } + } + } + + case DisableDisconnect => + disconnectDisabled = true + + case NotifyOfComponentShutdown => + waitForShutdown += sender() + + case msg @ UpdateTaskExecutionState(taskExecutionState) => + super.handleMessage(msg) + + if(taskExecutionState.getExecutionState == ExecutionState.RUNNING) { + waitForRunning.get(taskExecutionState.getID) foreach { + _ foreach (_ ! decorateMessage(true)) + } + } + + case RequestLeaderSessionID => + sender() ! ResponseLeaderSessionID(leaderSessionID.orNull) + + case NotifyWhenRegisteredAtJobManager(jobManager: ActorRef) => + if(isConnected && jobManager == currentJobManager.get) { + sender() ! true + } else { + val list = waitForRegisteredAtResourceManager.getOrElse( + jobManager, + Set[ActorRef]()) + + waitForRegisteredAtResourceManager += jobManager -> (list + sender()) + } + + case msg @ (_: AcknowledgeRegistration | _: AlreadyRegistered) => + super.handleMessage(msg) + + val jm = sender() + + waitForRegisteredAtResourceManager.remove(jm).foreach { + listeners => listeners.foreach{ + listener => + listener ! true + } + } + } + + /** + * No killing of the VM for testing. + */ + override protected def shutdown(): Unit = { + log.info("Shutting down TestingJobManager.") + waitForShutdown.foreach(_ ! ComponentShutdown(self)) + waitForShutdown.clear() + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala new file mode 100644 index 0000000..974e4e8 --- /dev/null +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testingUtils + +import akka.actor.ActorRef +import org.apache.flink.api.common.JobID +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID +import org.apache.flink.runtime.taskmanager.Task + +/** + * Additional messages that the [[TestingTaskManager]] understands. + */ +object TestingTaskManagerMessages { + + case class NotifyWhenTaskRemoved(executionID: ExecutionAttemptID) + + case class NotifyWhenTaskIsRunning(executionID: ExecutionAttemptID) + + case class ResponseRunningTasks(tasks: Map[ExecutionAttemptID, Task]){ + import collection.JavaConverters._ + def asJava: java.util.Map[ExecutionAttemptID, Task] = tasks.asJava + } + + case class ResponseBroadcastVariablesWithReferences(number: Int) + + case object RequestNumActiveConnections + case class ResponseNumActiveConnections(number: Int) + + case object RequestRunningTasks + + case object RequestBroadcastVariablesWithReferences + + case class NotifyWhenJobManagerTerminated(jobManager: ActorRef) + + case class JobManagerTerminated(jobManager: ActorRef) + + case class NotifyWhenRegisteredAtJobManager(resourceManager: ActorRef) + + /** + * Message to give a hint to the task manager that accumulator values were updated in the task. + * This message is forwarded to the job manager which knows that it needs to notify listeners + * of accumulator updates. + */ + case class AccumulatorsChanged(jobID: JobID) + + /** + * Registers a listener for all [[org.apache.flink.runtime.messages.TaskMessages.SubmitTask]] + * messages of the given job. + * + * If a task is submitted with the given job ID the task deployment + * descriptor is forwarded to the listener. + * + * @param jobId The job ID to listen for. + */ + case class RegisterSubmitTaskListener(jobId: JobID) + + /** + * A response to a listened job ID containing the submitted task deployment descriptor. + * + * @param tdd The submitted task deployment descriptor. + */ + case class ResponseSubmitTaskListener(tdd: TaskDeploymentDescriptor) + + // -------------------------------------------------------------------------- + // Utility methods to allow simpler case object access from Java + // -------------------------------------------------------------------------- + + def getRequestRunningTasksMessage: AnyRef = { + RequestRunningTasks + } + + def getRequestBroadcastVariablesWithReferencesMessage: AnyRef = { + RequestBroadcastVariablesWithReferences + } + +} + http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java index a801348..dcc98c9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java @@ -19,7 +19,8 @@ package org.apache.flink.runtime.checkpoint; import akka.actor.ActorSystem; -import org.apache.flink.api.common.ExecutionConfigTest; + +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; @@ -30,7 +31,10 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.jobmanager.RecoveryMode; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.SerializedValue; + import org.junit.Test; + import scala.concurrent.duration.FiniteDuration; import java.lang.reflect.Field; @@ -42,7 +46,7 @@ import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; public class ExecutionGraphCheckpointCoordinatorTest { - + @Test public void testCheckpointAndSavepointCoordinatorShareCheckpointIDCounter() throws Exception { ExecutionGraph executionGraph = new ExecutionGraph( @@ -50,7 +54,7 @@ public class ExecutionGraphCheckpointCoordinatorTest { new JobID(), "test", new Configuration(), - ExecutionConfigTest.getSerializedConfig(), + new SerializedValue<>(new ExecutionConfig()), new FiniteDuration(1, TimeUnit.DAYS), new NoRestartStrategy(), Collections.<BlobKey>emptyList(), http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java index f0bf694..56da9c8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java @@ -27,7 +27,6 @@ import java.util.ArrayList; import java.util.List; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.ExecutionConfigTest; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -37,6 +36,7 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.operators.BatchTask; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.util.SerializedValue; + import org.junit.Test; public class TaskDeploymentDescriptorTest { @@ -58,7 +58,7 @@ public class TaskDeploymentDescriptorTest { final List<InputGateDeploymentDescriptor> inputGates = new ArrayList<InputGateDeploymentDescriptor>(0); final List<BlobKey> requiredJars = new ArrayList<BlobKey>(0); final List<URL> requiredClasspaths = new ArrayList<URL>(0); - final SerializedValue<ExecutionConfig> executionConfig = ExecutionConfigTest.getSerializedConfig(); + final SerializedValue<ExecutionConfig> executionConfig = new SerializedValue<>(new ExecutionConfig()); final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor(jobID, jobName, vertexID, execId, executionConfig, taskName, indexInSubtaskGroup, currentNumberOfSubtasks, attemptNumber, http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java index d659b45..314471e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/event/task/TaskEventTest.java @@ -26,7 +26,8 @@ import java.io.IOException; import java.util.Iterator; import org.apache.flink.runtime.event.AbstractEvent; -import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.util.InstantiationUtil; + import org.junit.Test; /** @@ -43,7 +44,7 @@ public class TaskEventTest { try { final IntegerTaskEvent orig = new IntegerTaskEvent(11); - final IntegerTaskEvent copy = (IntegerTaskEvent) CommonTestUtils.createCopyWritable(orig); + final IntegerTaskEvent copy = InstantiationUtil.createCopyWritable(orig); assertEquals(orig.getInteger(), copy.getInteger()); assertEquals(orig.hashCode(), copy.hashCode()); @@ -63,7 +64,7 @@ public class TaskEventTest { try { final StringTaskEvent orig = new StringTaskEvent("Test"); - final StringTaskEvent copy = (StringTaskEvent) CommonTestUtils.createCopyWritable(orig); + final StringTaskEvent copy = InstantiationUtil.createCopyWritable(orig); assertEquals(orig.getString(), copy.getString()); assertEquals(orig.hashCode(), copy.hashCode()); @@ -85,7 +86,7 @@ public class TaskEventTest { final EventList orig = new EventList(); orig.add(new StringTaskEvent("Test 2")); orig.add(new IntegerTaskEvent(70)); - final EventList copy = (EventList) CommonTestUtils.createCopyWritable(orig); + final EventList copy = InstantiationUtil.createCopyWritable(orig); assertEquals(orig.size(), copy.size()); final Iterator<AbstractEvent> origIt = orig.iterator(); http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java index 8eebe66..6740293 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java @@ -26,11 +26,13 @@ import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import org.apache.flink.api.common.ExecutionConfigTest; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.SerializedValue; + import org.junit.Test; import org.mockito.Matchers; @@ -61,7 +63,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; * This class contains test concerning the correct conversion from {@link JobGraph} to {@link ExecutionGraph} objects. */ public class ExecutionGraphConstructionTest { - + /** * Creates a JobGraph of the following form: * @@ -76,7 +78,7 @@ public class ExecutionGraphConstructionTest { * </pre> */ @Test - public void testCreateSimpleGraphBipartite() { + public void testCreateSimpleGraphBipartite() throws Exception { final JobID jobId = new JobID(); final String jobName = "Test Job Sample Name"; @@ -107,7 +109,7 @@ public class ExecutionGraphConstructionTest { jobId, jobName, cfg, - ExecutionConfigTest.getSerializedConfig(), + new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -122,7 +124,7 @@ public class ExecutionGraphConstructionTest { } @Test - public void testAttachViaDataSets() { + public void testAttachViaDataSets() throws Exception { final JobID jobId = new JobID(); final String jobName = "Test Job Sample Name"; final Configuration cfg = new Configuration(); @@ -151,8 +153,8 @@ public class ExecutionGraphConstructionTest { TestingUtils.defaultExecutionContext(), jobId, jobName, - cfg, - ExecutionConfigTest.getSerializedConfig(), + cfg, + new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -190,7 +192,7 @@ public class ExecutionGraphConstructionTest { } @Test - public void testAttachViaIds() { + public void testAttachViaIds() throws Exception { final JobID jobId = new JobID(); final String jobName = "Test Job Sample Name"; final Configuration cfg = new Configuration(); @@ -219,8 +221,8 @@ public class ExecutionGraphConstructionTest { TestingUtils.defaultExecutionContext(), jobId, jobName, - cfg, - ExecutionConfigTest.getSerializedConfig(), + cfg, + new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -459,7 +461,7 @@ public class ExecutionGraphConstructionTest { } @Test - public void testCannotConnectMissingId() { + public void testCannotConnectMissingId() throws Exception { final JobID jobId = new JobID(); final String jobName = "Test Job Sample Name"; final Configuration cfg = new Configuration(); @@ -474,8 +476,8 @@ public class ExecutionGraphConstructionTest { TestingUtils.defaultExecutionContext(), jobId, jobName, - cfg, - ExecutionConfigTest.getSerializedConfig(), + cfg, + new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -502,7 +504,7 @@ public class ExecutionGraphConstructionTest { } @Test - public void testCannotConnectWrongOrder() { + public void testCannotConnectWrongOrder() throws Exception { final JobID jobId = new JobID(); final String jobName = "Test Job Sample Name"; final Configuration cfg = new Configuration(); @@ -531,8 +533,8 @@ public class ExecutionGraphConstructionTest { TestingUtils.defaultExecutionContext(), jobId, jobName, - cfg, - ExecutionConfigTest.getSerializedConfig(), + cfg, + new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -593,8 +595,8 @@ public class ExecutionGraphConstructionTest { TestingUtils.defaultExecutionContext(), jobId, jobName, - cfg, - ExecutionConfigTest.getSerializedConfig(), + cfg, + new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -639,8 +641,8 @@ public class ExecutionGraphConstructionTest { TestingUtils.defaultExecutionContext(), jobId, jobName, - cfg, - ExecutionConfigTest.getSerializedConfig(), + cfg, + new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); @@ -711,8 +713,8 @@ public class ExecutionGraphConstructionTest { TestingUtils.defaultExecutionContext(), jobId, jobName, - cfg, - ExecutionConfigTest.getSerializedConfig(), + cfg, + new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index d126acb..a599f42 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -30,7 +30,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; -import org.apache.flink.api.common.ExecutionConfigTest; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; @@ -47,6 +47,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.operators.BatchTask; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.SerializedValue; + import org.junit.Test; public class ExecutionGraphDeploymentTest { @@ -85,7 +87,7 @@ public class ExecutionGraphDeploymentTest { jobId, "some job", new Configuration(), - ExecutionConfigTest.getSerializedConfig(), + new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); @@ -288,8 +290,8 @@ public class ExecutionGraphDeploymentTest { TestingUtils.directExecutionContext(), jobId, "some job", - new Configuration(), - ExecutionConfigTest.getSerializedConfig(), + new Configuration(), + new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index 26ba04f..c12f346 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.executiongraph; import akka.dispatch.Futures; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.ExecutionConfigTest; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.configuration.Configuration; @@ -39,7 +38,9 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; + import org.junit.Test; import scala.concurrent.Await; import scala.concurrent.Future; @@ -86,7 +87,7 @@ public class ExecutionGraphRestartTest extends TestLogger { new JobID(), "test job", new Configuration(), - ExecutionConfigTest.getSerializedConfig(), + new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); @@ -141,7 +142,7 @@ public class ExecutionGraphRestartTest extends TestLogger { new JobID(), "test job", new Configuration(), - ExecutionConfigTest.getSerializedConfig(), + new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new FixedDelayRestartStrategy(1, 0L)); eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); @@ -197,7 +198,7 @@ public class ExecutionGraphRestartTest extends TestLogger { new JobID(), "Test job", new Configuration(), - ExecutionConfigTest.getSerializedConfig(), + new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new FixedDelayRestartStrategy(1, 1000)); eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); @@ -226,7 +227,7 @@ public class ExecutionGraphRestartTest extends TestLogger { new JobID(), "TestJob", new Configuration(), - ExecutionConfigTest.getSerializedConfig(), + new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), // We want to manually control the restart and delay new FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE)); @@ -344,7 +345,7 @@ public class ExecutionGraphRestartTest extends TestLogger { new JobID(), "TestJob", new Configuration(), - ExecutionConfigTest.getSerializedConfig(), + new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), // We want to manually control the restart and delay new FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE)); @@ -427,7 +428,7 @@ public class ExecutionGraphRestartTest extends TestLogger { new JobID(), "Test job", new Configuration(), - ExecutionConfigTest.getSerializedConfig(), + new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new FixedDelayRestartStrategy(1, 1000))); @@ -498,7 +499,7 @@ public class ExecutionGraphRestartTest extends TestLogger { new JobID(), "test job", new Configuration(), - ExecutionConfigTest.getSerializedConfig(), + new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new FixedDelayRestartStrategy(1, 1000)); @@ -594,7 +595,7 @@ public class ExecutionGraphRestartTest extends TestLogger { new JobID(), "test job", new Configuration(), - ExecutionConfigTest.getSerializedConfig(), + new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new FixedDelayRestartStrategy(1, 1000000)); @@ -648,7 +649,7 @@ public class ExecutionGraphRestartTest extends TestLogger { new JobID(), "test job", new Configuration(), - ExecutionConfigTest.getSerializedConfig(), + new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new FixedDelayRestartStrategy(1, 1000000)); http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java index bcee5a1..59f2a9b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java @@ -22,7 +22,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.apache.flink.api.common.ExecutionConfigTest; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.StoppingException; @@ -32,6 +32,8 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.SerializedValue; + import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -128,7 +130,7 @@ public class ExecutionGraphSignalsTest { jobId, jobName, cfg, - ExecutionConfigTest.getSerializedConfig(), + new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); eg.attachJobGraph(ordered); http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index 92a7402..903d5f9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -22,13 +22,11 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; -import java.io.IOException; import java.lang.reflect.Field; import java.net.InetAddress; -import org.apache.flink.api.common.ExecutionConfigTest; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; @@ -52,6 +50,7 @@ import org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPart import org.apache.flink.runtime.messages.TaskMessages.CancelTask; import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.SerializedValue; import org.mockito.Matchers; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -168,7 +167,7 @@ public class ExecutionGraphTestUtils { public static final String ERROR_MESSAGE = "test_failure_error_message"; - public static ExecutionJobVertex getExecutionVertex(JobVertexID id, ExecutionContext executionContext) throws JobException { + public static ExecutionJobVertex getExecutionVertex(JobVertexID id, ExecutionContext executionContext) throws Exception { JobVertex ajv = new JobVertex("TestVertex", id); ajv.setInvokableClass(mock(AbstractInvokable.class).getClass()); @@ -176,8 +175,8 @@ public class ExecutionGraphTestUtils { executionContext, new JobID(), "test job", - new Configuration(), - ExecutionConfigTest.getSerializedConfig(), + new Configuration(), + new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); @@ -198,7 +197,7 @@ public class ExecutionGraphTestUtils { return ejv; } - public static ExecutionJobVertex getExecutionVertex(JobVertexID id) throws JobException, IOException { + public static ExecutionJobVertex getExecutionVertex(JobVertexID id) throws Exception { return getExecutionVertex(id, TestingUtils.defaultExecutionContext()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java index 9e4aa6d..f3743c1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java @@ -22,9 +22,9 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*; import static org.junit.Assert.*; import static org.mockito.Mockito.mock; -import java.util.Arrays; +import java.util.Collections; -import org.apache.flink.api.common.ExecutionConfigTest; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; @@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.SerializedValue; import org.junit.Test; public class ExecutionStateProgressTest { @@ -54,10 +55,10 @@ public class ExecutionStateProgressTest { jid, "test job", new Configuration(), - ExecutionConfigTest.getSerializedConfig(), + new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); - graph.attachJobGraph(Arrays.asList(ajv)); + graph.attachJobGraph(Collections.singletonList(ajv)); setGraphStatus(graph, JobStatus.RUNNING); http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java index d7ce0ba..f03370c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java @@ -25,7 +25,7 @@ import static org.mockito.Mockito.when; import java.net.InetAddress; import java.util.concurrent.TimeUnit; -import org.apache.flink.api.common.ExecutionConfigTest; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.io.StrictlyLocalAssignment; import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.core.io.InputSplitSource; @@ -42,6 +42,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.operators.testutils.DummyInvokable; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.SerializedValue; import org.junit.Test; import scala.concurrent.duration.FiniteDuration; @@ -273,7 +274,7 @@ public class LocalInputSplitsTest { jobGraph.getJobID(), jobGraph.getName(), jobGraph.getJobConfiguration(), - ExecutionConfigTest.getSerializedConfig(), + new SerializedValue<>(new ExecutionConfig()), TIMEOUT, new NoRestartStrategy()); @@ -338,7 +339,7 @@ public class LocalInputSplitsTest { jobGraph.getJobID(), jobGraph.getName(), jobGraph.getJobConfiguration(), - ExecutionConfigTest.getSerializedConfig(), + new SerializedValue<>(new ExecutionConfig()), TIMEOUT, new NoRestartStrategy()); http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java index 1b369db..0eb0607 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java @@ -22,10 +22,12 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import org.apache.flink.api.common.ExecutionConfigTest; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.SerializedValue; + import org.junit.Test; import java.util.ArrayList; @@ -46,7 +48,7 @@ public class PointwisePatternTest { private final Configuration cfg = new Configuration(); @Test - public void testNToN() { + public void testNToN() throws Exception { final int N = 23; JobVertex v1 = new JobVertex("vertex1"); @@ -64,7 +66,7 @@ public class PointwisePatternTest { jobId, jobName, cfg, - ExecutionConfigTest.getSerializedConfig(), + new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -88,7 +90,7 @@ public class PointwisePatternTest { } @Test - public void test2NToN() { + public void test2NToN() throws Exception { final int N = 17; JobVertex v1 = new JobVertex("vertex1"); @@ -106,7 +108,7 @@ public class PointwisePatternTest { jobId, jobName, cfg, - ExecutionConfigTest.getSerializedConfig(), + new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -131,7 +133,7 @@ public class PointwisePatternTest { } @Test - public void test3NToN() { + public void test3NToN() throws Exception { final int N = 17; JobVertex v1 = new JobVertex("vertex1"); @@ -149,7 +151,7 @@ public class PointwisePatternTest { jobId, jobName, cfg, - ExecutionConfigTest.getSerializedConfig(), + new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -175,7 +177,7 @@ public class PointwisePatternTest { } @Test - public void testNTo2N() { + public void testNTo2N() throws Exception { final int N = 41; JobVertex v1 = new JobVertex("vertex1"); @@ -193,7 +195,7 @@ public class PointwisePatternTest { jobId, jobName, cfg, - ExecutionConfigTest.getSerializedConfig(), + new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -217,7 +219,7 @@ public class PointwisePatternTest { } @Test - public void testNTo7N() { + public void testNTo7N() throws Exception { final int N = 11; JobVertex v1 = new JobVertex("vertex1"); @@ -235,7 +237,7 @@ public class PointwisePatternTest { jobId, jobName, cfg, - ExecutionConfigTest.getSerializedConfig(), + new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -259,7 +261,7 @@ public class PointwisePatternTest { } @Test - public void testLowHighIrregular() { + public void testLowHighIrregular() throws Exception { testLowToHigh(3, 16); testLowToHigh(19, 21); testLowToHigh(15, 20); @@ -267,14 +269,14 @@ public class PointwisePatternTest { } @Test - public void testHighLowIrregular() { + public void testHighLowIrregular() throws Exception { testHighToLow(16, 3); testHighToLow(21, 19); testHighToLow(20, 15); testHighToLow(31, 11); } - private void testLowToHigh(int lowDop, int highDop) { + private void testLowToHigh(int lowDop, int highDop) throws Exception { if (highDop < lowDop) { throw new IllegalArgumentException(); } @@ -297,7 +299,7 @@ public class PointwisePatternTest { jobId, jobName, cfg, - ExecutionConfigTest.getSerializedConfig(), + new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { @@ -327,7 +329,7 @@ public class PointwisePatternTest { } } - private void testHighToLow(int highDop, int lowDop) { + private void testHighToLow(int highDop, int lowDop) throws Exception { if (highDop < lowDop) { throw new IllegalArgumentException(); } @@ -350,7 +352,7 @@ public class PointwisePatternTest { jobId, jobName, cfg, - ExecutionConfigTest.getSerializedConfig(), + new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); try { http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java index 8bc474b..2a690d9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.executiongraph; -import org.apache.flink.api.common.ExecutionConfigTest; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -35,12 +35,14 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.operators.testutils.DummyInvokable; - import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.SerializedValue; + import org.junit.Test; import scala.concurrent.duration.FiniteDuration; +import java.io.IOException; import java.lang.reflect.Field; import java.net.InetAddress; import java.util.Arrays; @@ -105,9 +107,6 @@ public class TerminalStateDeadlockTest { final JobID jobId = resource.getJobID(); final JobVertexID vid1 = new JobVertexID(); final JobVertexID vid2 = new JobVertexID(); - - - final Configuration jobConfig = new Configuration(); final List<JobVertex> vertices; { @@ -182,13 +181,13 @@ public class TerminalStateDeadlockTest { private volatile boolean done; - TestExecGraph(JobID jobId) { + TestExecGraph(JobID jobId) throws IOException { super( TestingUtils.defaultExecutionContext(), jobId, "test graph", EMPTY_CONFIG, - ExecutionConfigTest.getSerializedConfig(), + new SerializedValue<>(new ExecutionConfig()), TIMEOUT, new FixedDelayRestartStrategy(1, 0)); } http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java index c483f41..4ee06b36 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java @@ -26,7 +26,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.concurrent.TimeUnit; -import org.apache.flink.api.common.ExecutionConfigTest; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.instance.DummyActorGateway; @@ -43,6 +43,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.operators.testutils.DummyInvokable; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.SerializedValue; import org.junit.Test; import scala.concurrent.duration.FiniteDuration; @@ -83,7 +84,7 @@ public class VertexLocationConstraintTest { jg.getJobID(), jg.getName(), jg.getJobConfiguration(), - ExecutionConfigTest.getSerializedConfig(), + new SerializedValue<>(new ExecutionConfig()), timeout, new NoRestartStrategy()); eg.attachJobGraph(Collections.singletonList(jobVertex)); @@ -156,7 +157,7 @@ public class VertexLocationConstraintTest { jg.getJobID(), jg.getName(), jg.getJobConfiguration(), - ExecutionConfigTest.getSerializedConfig(), + new SerializedValue<>(new ExecutionConfig()), timeout, new NoRestartStrategy()); eg.attachJobGraph(Collections.singletonList(jobVertex)); @@ -233,7 +234,7 @@ public class VertexLocationConstraintTest { jg.getJobID(), jg.getName(), jg.getJobConfiguration(), - ExecutionConfigTest.getSerializedConfig(), + new SerializedValue<>(new ExecutionConfig()), timeout, new NoRestartStrategy()); eg.attachJobGraph(Arrays.asList(jobVertex1, jobVertex2)); @@ -301,7 +302,7 @@ public class VertexLocationConstraintTest { jg.getJobID(), jg.getName(), jg.getJobConfiguration(), - ExecutionConfigTest.getSerializedConfig(), + new SerializedValue<>(new ExecutionConfig()), timeout, new NoRestartStrategy()); eg.attachJobGraph(Collections.singletonList(jobVertex)); @@ -371,7 +372,7 @@ public class VertexLocationConstraintTest { jg.getJobID(), jg.getName(), jg.getJobConfiguration(), - ExecutionConfigTest.getSerializedConfig(), + new SerializedValue<>(new ExecutionConfig()), timeout, new NoRestartStrategy()); eg.attachJobGraph(Arrays.asList(jobVertex1, jobVertex2)); @@ -411,7 +412,7 @@ public class VertexLocationConstraintTest { jg.getJobID(), jg.getName(), jg.getJobConfiguration(), - ExecutionConfigTest.getSerializedConfig(), + new SerializedValue<>(new ExecutionConfig()), timeout, new NoRestartStrategy()); eg.attachJobGraph(Collections.singletonList(vertex)); http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java index 7a23e26..6314969 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java @@ -24,7 +24,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.apache.flink.api.common.ExecutionConfigTest; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; @@ -33,6 +33,7 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.SerializedValue; import org.junit.Test; public class VertexSlotSharingTest { @@ -76,7 +77,7 @@ public class VertexSlotSharingTest { new JobID(), "test job", new Configuration(), - ExecutionConfigTest.getSerializedConfig(), + new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy()); eg.attachJobGraph(vertices); http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java index bc8cd63..3a9488d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java @@ -25,7 +25,8 @@ import static org.junit.Assert.fail; import java.net.InetAddress; -import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.util.InstantiationUtil; + import org.junit.Assert; import org.junit.Test; @@ -97,10 +98,10 @@ public class InstanceConnectionInfoTest { { InstanceConnectionInfo original = new InstanceConnectionInfo(InetAddress.getByName("1.2.3.4"), 8888); - InstanceConnectionInfo copy = CommonTestUtils.createCopyWritable(original); + InstanceConnectionInfo copy = InstantiationUtil.createCopyWritable(original); assertEquals(original, copy); - InstanceConnectionInfo serCopy = CommonTestUtils.createCopySerializable(original); + InstanceConnectionInfo serCopy = InstantiationUtil.clone(original); assertEquals(original, serCopy); } @@ -109,10 +110,10 @@ public class InstanceConnectionInfoTest { InstanceConnectionInfo original = new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 19871); original.getFQDNHostname(); - InstanceConnectionInfo copy = CommonTestUtils.createCopyWritable(original); + InstanceConnectionInfo copy = InstantiationUtil.createCopyWritable(original); assertEquals(original, copy); - InstanceConnectionInfo serCopy = CommonTestUtils.createCopySerializable(original); + InstanceConnectionInfo serCopy = InstantiationUtil.clone(original); assertEquals(original, serCopy); } } http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/MultiShotLatch.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/MultiShotLatch.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/MultiShotLatch.java deleted file mode 100644 index 7c55419..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/MultiShotLatch.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskmanager; - -/** - * Latch for synchronizing parts of code in tests. In contrast to - * {@link org.apache.flink.runtime.taskmanager.OneShotLatch} this will reset the state once - * {@link #await()} returns. - * - * <p> - * A part of the code that should only run after other code calls {@link #await()}. The call - * will only return once the other part is finished and calls {@link #trigger()}. - */ -public final class MultiShotLatch { - - private final Object lock = new Object(); - - private volatile boolean triggered; - - /** - * Fires the latch. Code that is blocked on {@link #await()} will now return. - */ - public void trigger() { - synchronized (lock) { - triggered = true; - lock.notifyAll(); - } - } - - /** - * Waits until {@link #trigger())} is called. - */ - public void await() throws InterruptedException { - synchronized (lock) { - while (!triggered) { - lock.wait(); - } - triggered = false; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/OneShotLatch.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/OneShotLatch.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/OneShotLatch.java deleted file mode 100644 index 504ccca..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/OneShotLatch.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskmanager; - -/** - * Latch for synchronizing parts of code in tests. Once the latch has fired once calls to - * {@link #await()} will return immediately in the future. - * - * <p> - * A part of the code that should only run after other code calls {@link #await()}. The call - * will only return once the other part is finished and calls {@link #trigger()}. - */ -public final class OneShotLatch { - - private final Object lock = new Object(); - - private boolean triggered; - - /** - * Fires the latch. Code that is blocked on {@link #await()} will now return. - */ - public void trigger() { - synchronized (lock) { - triggered = true; - lock.notifyAll(); - } - } - - /** - * Waits until {@link #trigger())} is called. Once {@code trigger()} has been called this - * call will always return immediately. - */ - public void await() throws InterruptedException { - synchronized (lock) { - while (!triggered) { - lock.wait(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java index e1f551c..7b55987 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java @@ -18,9 +18,10 @@ package org.apache.flink.runtime.taskmanager; -import org.apache.flink.api.common.ExecutionConfigTest; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; @@ -43,6 +44,7 @@ import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.util.SerializedValue; import org.junit.Before; import org.junit.Test; @@ -137,7 +139,7 @@ public class TaskAsyncCallTest { } } - private static Task createTask() { + private static Task createTask() throws Exception { LibraryCacheManager libCache = mock(LibraryCacheManager.class); when(libCache.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader()); @@ -150,7 +152,7 @@ public class TaskAsyncCallTest { TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( new JobID(), "Job Name", new JobVertexID(), new ExecutionAttemptID(), - ExecutionConfigTest.getSerializedConfig(), + new SerializedValue<>(new ExecutionConfig()), "Test Task", 0, 1, 0, new Configuration(), new Configuration(), CheckpointsInOrderInvokable.class.getName(), http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java index f367799..335f788 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java @@ -33,6 +33,7 @@ import java.io.IOException; import java.io.PrintWriter; import java.lang.reflect.Field; import java.net.*; +import java.util.UUID; import static org.junit.Assert.*; @@ -109,7 +110,7 @@ public class TaskManagerConfigurationTest { @Test public void testDefaultFsParameterLoading() { final File tmpDir = getTmpDir(); - final File confFile = new File(tmpDir.getAbsolutePath() + File.separator + CommonTestUtils.getRandomDirectoryName() + ".yaml"); + final File confFile = new File(tmpDir, UUID.randomUUID().toString() + ".yaml"); try { final URI defaultFS = new URI("otherFS", null, "localhost", 1234, null, null, null); @@ -146,13 +147,7 @@ public class TaskManagerConfigurationTest { try { InetAddress localhostAddress = InetAddress.getByName(hostname); server = new ServerSocket(0, 50, localhostAddress); - } - catch (UnknownHostException e) { - // may happen if disconnected. skip test. - System.err.println("Skipping 'testNetworkInterfaceSelection' test."); - return; - } - catch (IOException e) { + } catch (IOException e) { // may happen in certain test setups, skip test. System.err.println("Skipping 'testNetworkInterfaceSelection' test."); return; @@ -181,10 +176,8 @@ public class TaskManagerConfigurationTest { } private File getTmpDir() { - File tmpDir = new File(CommonTestUtils.getTempDir() + File.separator - + CommonTestUtils.getRandomDirectoryName() + File.separator); - tmpDir.mkdirs(); - + File tmpDir = new File(CommonTestUtils.getTempDir(), UUID.randomUUID().toString()); + assertTrue("could not create temp directory", tmpDir.mkdirs()); return tmpDir; } } http://git-wip-us.apache.org/repos/asf/flink/blob/4b71e0e7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index b3ad589..ce88c09 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -24,7 +24,6 @@ import akka.actor.Props; import akka.japi.Creator; import akka.testkit.JavaTestKit; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.ExecutionConfigTest; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -160,7 +159,7 @@ public class TaskManagerTest extends TestLogger { final JobID jid = new JobID(); final JobVertexID vid = new JobVertexID(); final ExecutionAttemptID eid = new ExecutionAttemptID(); - final SerializedValue<ExecutionConfig> executionConfig = ExecutionConfigTest.getSerializedConfig(); + final SerializedValue<ExecutionConfig> executionConfig = new SerializedValue<>(new ExecutionConfig()); final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, "TestJob", vid, eid, executionConfig, "TestTask", 2, 7, 0, new Configuration(), new Configuration(), @@ -263,15 +262,19 @@ public class TaskManagerTest extends TestLogger { final ExecutionAttemptID eid1 = new ExecutionAttemptID(); final ExecutionAttemptID eid2 = new ExecutionAttemptID(); - final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid1, "TestJob1", vid1, eid1, - ExecutionConfigTest.getSerializedConfig(), "TestTask1", 1, 5, 0, + final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor( + jid1, "TestJob1", vid1, eid1, + new SerializedValue<>(new ExecutionConfig()), + "TestTask1", 1, 5, 0, new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0); - final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid2, "TestJob2", vid2, eid2, - ExecutionConfigTest.getSerializedConfig(), "TestTask2", 2, 7, 0, + final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor( + jid2, "TestJob2", vid2, eid2, + new SerializedValue<>(new ExecutionConfig()), + "TestTask2", 2, 7, 0, new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), Collections.<InputGateDeploymentDescriptor>emptyList(), @@ -367,7 +370,7 @@ public class TaskManagerTest extends TestLogger { } @Test - public void testJobSubmissionAndStop() { + public void testJobSubmissionAndStop() throws Exception { new JavaTestKit(system){{ ActorGateway jobManager = null; @@ -397,7 +400,7 @@ public class TaskManagerTest extends TestLogger { final ExecutionAttemptID eid1 = new ExecutionAttemptID(); final ExecutionAttemptID eid2 = new ExecutionAttemptID(); - final SerializedValue<ExecutionConfig> executionConfig = ExecutionConfigTest.getSerializedConfig(); + final SerializedValue<ExecutionConfig> executionConfig = new SerializedValue<>(new ExecutionConfig()); final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid1, "TestJob", vid1, eid1, executionConfig, "TestTask1", 1, 5, 0, new Configuration(), new Configuration(), StoppableInvokable.class.getName(), @@ -525,15 +528,19 @@ public class TaskManagerTest extends TestLogger { final ExecutionAttemptID eid1 = new ExecutionAttemptID(); final ExecutionAttemptID eid2 = new ExecutionAttemptID(); - final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, "TestJob", vid1, eid1, - ExecutionConfigTest.getSerializedConfig(), "Sender", 0, 1, 0, + final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor( + jid, "TestJob", vid1, eid1, + new SerializedValue<>(new ExecutionConfig()), + "Sender", 0, 1, 0, new Configuration(), new Configuration(), Tasks.Sender.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0); - final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, "TestJob", vid2, eid2, - ExecutionConfigTest.getSerializedConfig(), "Receiver", 2, 7, 0, + final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor( + jid, "TestJob", vid2, eid2, + new SerializedValue<>(new ExecutionConfig()), + "Receiver", 2, 7, 0, new Configuration(), new Configuration(), Tasks.Receiver.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), Collections.<InputGateDeploymentDescriptor>emptyList(), @@ -626,14 +633,18 @@ public class TaskManagerTest extends TestLogger { } ); - final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, "TestJob", vid1, eid1, - ExecutionConfigTest.getSerializedConfig(), "Sender", 0, 1, 0, + final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor( + jid, "TestJob", vid1, eid1, + new SerializedValue<>(new ExecutionConfig()), + "Sender", 0, 1, 0, new Configuration(), new Configuration(), Tasks.Sender.class.getName(), irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0); - final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, "TestJob", vid2, eid2, - ExecutionConfigTest.getSerializedConfig(), "Receiver", 2, 7, 0, + final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor( + jid, "TestJob", vid2, eid2, + new SerializedValue<>(new ExecutionConfig()), + "Receiver", 2, 7, 0, new Configuration(), new Configuration(), Tasks.Receiver.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), Collections.singletonList(ircdd), @@ -767,14 +778,18 @@ public class TaskManagerTest extends TestLogger { } ); - final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, "TestJob", vid1, eid1, - ExecutionConfigTest.getSerializedConfig(), "Sender", 0, 1, 0, + final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor( + jid, "TestJob", vid1, eid1, + new SerializedValue<>(new ExecutionConfig()), + "Sender", 0, 1, 0, new Configuration(), new Configuration(), Tasks.Sender.class.getName(), irpdd, Collections.<InputGateDeploymentDescriptor>emptyList(), new ArrayList<BlobKey>(), Collections.<URL>emptyList(), 0); - final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, "TestJob", vid2, eid2, - ExecutionConfigTest.getSerializedConfig(), "Receiver", 2, 7, 0, + final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor( + jid, "TestJob", vid2, eid2, + new SerializedValue<>(new ExecutionConfig()), + "Receiver", 2, 7, 0, new Configuration(), new Configuration(), Tasks.BlockingReceiver.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), Collections.singletonList(ircdd), @@ -913,7 +928,8 @@ public class TaskManagerTest extends TestLogger { final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( jid, "TestJob", vid, eid, - ExecutionConfigTest.getSerializedConfig(), "Receiver", 0, 1, 0, + new SerializedValue<>(new ExecutionConfig()), + "Receiver", 0, 1, 0, new Configuration(), new Configuration(), Tasks.AgnosticReceiver.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), @@ -1007,7 +1023,9 @@ public class TaskManagerTest extends TestLogger { new InputGateDeploymentDescriptor(resultId, 0, icdd); final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( - jid, "TestJob", vid, eid, ExecutionConfigTest.getSerializedConfig(), "Receiver", 0, 1, 0, + jid, "TestJob", vid, eid, + new SerializedValue<>(new ExecutionConfig()), + "Receiver", 0, 1, 0, new Configuration(), new Configuration(), Tasks.AgnosticReceiver.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), @@ -1084,7 +1102,7 @@ public class TaskManagerTest extends TestLogger { "Job", new JobVertexID(), new ExecutionAttemptID(), - ExecutionConfigTest.getSerializedConfig(), + new SerializedValue<>(new ExecutionConfig()), "Task", 0, 1,