[FLINK-4490] [distributed coordination] (part 3) Rename methods on 'Instance' to have more intuitive names
getResourceID() --> getTaskManagerID() getInstanceConnectionInfo() --> getTaskManagerLocation() Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eac6088a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eac6088a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eac6088a Branch: refs/heads/master Commit: eac6088a75e813a015b778f4cfc4cce0cf2a53ce Parents: aaa474a Author: Stephan Ewen <se...@apache.org> Authored: Wed Aug 31 13:59:01 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Sep 2 17:32:57 2016 +0200 ---------------------------------------------------------------------- .../handlers/TaskManagersHandler.java | 2 +- ...PartialInputChannelDeploymentDescriptor.java | 21 +- .../apache/flink/runtime/instance/Instance.java | 28 +- .../flink/runtime/instance/InstanceManager.java | 20 +- .../runtime/jobmanager/scheduler/Scheduler.java | 18 +- .../flink/runtime/jobmanager/JobManager.scala | 3 +- .../ExecutionGraphMetricsTest.java | 404 ++++++++++--------- .../executiongraph/ExecutionGraphTestUtils.java | 2 +- .../TerminalStateDeadlockTest.java | 3 +- .../runtime/instance/InstanceManagerTest.java | 31 +- .../flink/runtime/instance/InstanceTest.java | 6 +- .../flink/runtime/instance/SharedSlotsTest.java | 24 +- .../flink/runtime/instance/SimpleSlotTest.java | 2 +- .../partition/SpilledSubpartitionViewTest.java | 4 +- .../scheduler/CoLocationConstraintTest.java | 6 +- .../ScheduleWithCoLocationHintTest.java | 22 +- .../scheduler/SchedulerSlotSharingTest.java | 30 +- .../scheduler/SchedulerTestUtils.java | 5 +- .../scheduler/SlotAllocationFutureTest.java | 12 +- 19 files changed, 319 insertions(+), 324 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java index b60cd10..b5e9088 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java @@ -85,7 +85,7 @@ public class TaskManagersHandler implements RequestHandler { gen.writeStartObject(); gen.writeStringField("id", instance.getId().toString()); gen.writeStringField("path", instance.getActorGateway().path()); - gen.writeNumberField("dataPort", instance.getInstanceConnectionInfo().dataPort()); + gen.writeNumberField("dataPort", instance.getTaskManagerLocation().dataPort()); gen.writeNumberField("timeSinceLastHeartbeat", instance.getLastHeartBeat()); gen.writeNumberField("slotsNumber", instance.getTotalNumberOfSlots()); gen.writeNumberField("freeSlots", instance.getNumberOfAvailableSlots()); http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java index e1391a4..0eac39d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java @@ -44,7 +44,7 @@ public class PartialInputChannelDeploymentDescriptor { private final ResultPartitionID partitionID; /** The partition connection info. */ - private final TaskManagerLocation partitionConnectionInfo; + private final TaskManagerLocation partitionTaskManagerLocation; /** The partition connection index. */ private final int partitionConnectionIndex; @@ -52,12 +52,12 @@ public class PartialInputChannelDeploymentDescriptor { public PartialInputChannelDeploymentDescriptor( IntermediateDataSetID resultId, ResultPartitionID partitionID, - TaskManagerLocation partitionConnectionInfo, + TaskManagerLocation partitionTaskManagerLocation, int partitionConnectionIndex) { this.resultId = checkNotNull(resultId); this.partitionID = checkNotNull(partitionID); - this.partitionConnectionInfo = checkNotNull(partitionConnectionInfo); + this.partitionTaskManagerLocation = checkNotNull(partitionTaskManagerLocation); this.partitionConnectionIndex = partitionConnectionIndex; } @@ -66,23 +66,20 @@ public class PartialInputChannelDeploymentDescriptor { * * @see InputChannelDeploymentDescriptor */ - public InputChannelDeploymentDescriptor createInputChannelDeploymentDescriptor( - Execution consumerExecution) { + public InputChannelDeploymentDescriptor createInputChannelDeploymentDescriptor(Execution consumerExecution) { + checkNotNull(consumerExecution, "consumerExecution"); - checkNotNull(consumerExecution, "Consumer execution null"); - - TaskManagerLocation consumerConnectionInfo = consumerExecution.getAssignedResourceLocation(); - - checkNotNull(consumerConnectionInfo, "Consumer connection info null"); + TaskManagerLocation consumerLocation = consumerExecution.getAssignedResourceLocation(); + checkNotNull(consumerLocation, "Consumer connection info null"); final ResultPartitionLocation partitionLocation; - if (consumerConnectionInfo.equals(partitionConnectionInfo)) { + if (consumerLocation.equals(partitionTaskManagerLocation)) { partitionLocation = ResultPartitionLocation.createLocal(); } else { partitionLocation = ResultPartitionLocation.createRemote( - new ConnectionID(partitionConnectionInfo, partitionConnectionIndex)); + new ConnectionID(partitionTaskManagerLocation, partitionConnectionIndex)); } return new InputChannelDeploymentDescriptor(partitionID, partitionLocation); http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java index fe46895..4a8139b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java @@ -52,14 +52,11 @@ public class Instance implements SlotOwner { private final ActorGateway actorGateway; /** The instance connection information for the data transfer. */ - private final TaskManagerLocation connectionInfo; + private final TaskManagerLocation location; /** A description of the resources of the task manager */ private final HardwareDescription resources; - /** The ID identifies the resource the task manager runs on */ - private final ResourceID resourceId; - /** The ID identifying the taskManager. */ private final InstanceID instanceId; @@ -90,22 +87,19 @@ public class Instance implements SlotOwner { * Constructs an instance reflecting a registered TaskManager. * * @param actorGateway The actor gateway to communicate with the remote instance - * @param connectionInfo The remote connection where the task manager receives requests. - * @param resourceId The resource id which denotes the resource the task manager uses. + * @param location The remote connection where the task manager receives requests. * @param id The id under which the taskManager is registered. * @param resources The resources available on the machine. * @param numberOfSlots The number of task slots offered by this taskManager. */ public Instance( ActorGateway actorGateway, - TaskManagerLocation connectionInfo, - ResourceID resourceId, + TaskManagerLocation location, InstanceID id, HardwareDescription resources, int numberOfSlots) { this.actorGateway = actorGateway; - this.connectionInfo = connectionInfo; - this.resourceId = resourceId; + this.location = location; this.instanceId = id; this.resources = resources; this.numberOfSlots = numberOfSlots; @@ -120,8 +114,8 @@ public class Instance implements SlotOwner { // Properties // -------------------------------------------------------------------------------------------- - public ResourceID getResourceId() { - return resourceId; + public ResourceID getTaskManagerID() { + return location.getResourceID(); } public InstanceID getId() { @@ -246,7 +240,7 @@ public class Instance implements SlotOwner { return null; } else { - SimpleSlot slot = new SimpleSlot(jobID, this, connectionInfo, nextSlot, actorGateway); + SimpleSlot slot = new SimpleSlot(jobID, this, location, nextSlot, actorGateway); allocatedSlots.add(slot); return slot; } @@ -284,7 +278,7 @@ public class Instance implements SlotOwner { } else { SharedSlot slot = new SharedSlot( - jobID, this, connectionInfo, nextSlot, actorGateway, sharingGroupAssignment); + jobID, this, location, nextSlot, actorGateway, sharingGroupAssignment); allocatedSlots.add(slot); return slot; } @@ -355,8 +349,8 @@ public class Instance implements SlotOwner { return actorGateway; } - public TaskManagerLocation getInstanceConnectionInfo() { - return connectionInfo; + public TaskManagerLocation getTaskManagerLocation() { + return location; } public int getNumberOfAvailableSlots() { @@ -405,7 +399,7 @@ public class Instance implements SlotOwner { @Override public String toString() { - return String.format("%s @ %s - %d slots - URL: %s", instanceId, connectionInfo.getHostname(), + return String.format("%s @ %s - %d slots - URL: %s", instanceId, location.getHostname(), numberOfSlots, (actorGateway != null ? actorGateway.path() : "No instance gateway")); } } http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java index e7a4537..0c7e187 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java @@ -138,8 +138,7 @@ public class InstanceManager { * for the job execution. * * @param taskManager ActorRef to the TaskManager which wants to be registered - * @param resourceID The resource id of the TaskManager - * @param connectionInfo ConnectionInfo of the TaskManager + * @param taskManagerLocation Location info of the TaskManager * @param resources Hardware description of the TaskManager * @param numberOfSlots Number of available slots on the TaskManager * @param leaderSessionID The current leader session ID of the JobManager @@ -147,12 +146,12 @@ public class InstanceManager { */ public InstanceID registerTaskManager( ActorRef taskManager, - ResourceID resourceID, - TaskManagerLocation connectionInfo, + TaskManagerLocation taskManagerLocation, HardwareDescription resources, int numberOfSlots, - UUID leaderSessionID){ - synchronized(this.lock){ + UUID leaderSessionID) { + + synchronized (this.lock) { if (this.isShutdown) { throw new IllegalStateException("InstanceManager is shut down."); } @@ -174,12 +173,11 @@ public class InstanceManager { InstanceID instanceID = new InstanceID(); - Instance host = new Instance(actorGateway, connectionInfo, resourceID, instanceID, - resources, numberOfSlots); + Instance host = new Instance(actorGateway, taskManagerLocation, instanceID, resources, numberOfSlots); registeredHostsById.put(instanceID, host); registeredHostsByConnection.put(taskManager, host); - registeredHostsByResource.put(resourceID, host); + registeredHostsByResource.put(taskManagerLocation.getResourceID(), host); totalNumberOfAliveTaskSlots += numberOfSlots; @@ -187,7 +185,7 @@ public class InstanceManager { LOG.info(String.format("Registered TaskManager at %s (%s) as %s. " + "Current number of registered hosts is %d. " + "Current number of alive task slots is %d.", - connectionInfo.getHostname(), + taskManagerLocation.getHostname(), taskManager.path(), instanceID, registeredHostsById.size(), @@ -217,7 +215,7 @@ public class InstanceManager { registeredHostsByConnection.remove(host); registeredHostsById.remove(instance.getId()); - registeredHostsByResource.remove(instance.getResourceId()); + registeredHostsByResource.remove(instance.getTaskManagerID()); if (terminated) { deadHosts.add(instance.getActorGateway().actor()); http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java index b481b55..734972d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java @@ -363,7 +363,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { // if the instance has further available slots, re-add it to the set of available resources. if (instanceToUse.hasResourcesAvailable()) { - this.instancesWithAvailableResources.put(instanceToUse.getResourceId(), instanceToUse); + this.instancesWithAvailableResources.put(instanceToUse.getTaskManagerID(), instanceToUse); } if (slot != null) { @@ -425,7 +425,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { // if the instance has further available slots, re-add it to the set of available resources. if (instanceToUse.hasResourcesAvailable()) { - this.instancesWithAvailableResources.put(instanceToUse.getResourceId(), instanceToUse); + this.instancesWithAvailableResources.put(instanceToUse.getTaskManagerID(), instanceToUse); } if (sharedSlot != null) { @@ -469,7 +469,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { while (this.newlyAvailableInstances.size() > 0) { Instance queuedInstance = this.newlyAvailableInstances.poll(); if (queuedInstance != null) { - this.instancesWithAvailableResources.put(queuedInstance.getResourceId(), queuedInstance); + this.instancesWithAvailableResources.put(queuedInstance.getTaskManagerID(), queuedInstance); } } @@ -583,7 +583,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { } } else { - this.instancesWithAvailableResources.put(instance.getResourceId(), instance); + this.instancesWithAvailableResources.put(instance.getTaskManagerID(), instance); } } } @@ -649,7 +649,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { instance.setSlotAvailabilityListener(this); // store the instance in the by-host-lookup - String instanceHostName = instance.getInstanceConnectionInfo().getHostname(); + String instanceHostName = instance.getTaskManagerLocation().getHostname(); Set<Instance> instanceSet = allInstancesByHost.get(instanceHostName); if (instanceSet == null) { instanceSet = new HashSet<Instance>(); @@ -658,7 +658,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { instanceSet.add(instance); // add it to the available resources and let potential waiters know - this.instancesWithAvailableResources.put(instance.getResourceId(), instance); + this.instancesWithAvailableResources.put(instance.getTaskManagerID(), instance); // add all slots as available for (int i = 0; i < instance.getNumberOfAvailableSlots(); i++) { @@ -693,9 +693,9 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { } allInstances.remove(instance); - instancesWithAvailableResources.remove(instance.getResourceId()); + instancesWithAvailableResources.remove(instance.getTaskManagerID()); - String instanceHostName = instance.getInstanceConnectionInfo().getHostname(); + String instanceHostName = instance.getTaskManagerLocation().getHostname(); Set<Instance> instanceSet = allInstancesByHost.get(instanceHostName); if (instanceSet != null) { instanceSet.remove(instance); @@ -795,7 +795,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener { while ((instance = newlyAvailableInstances.poll()) != null) { if (instance.hasResourcesAvailable()) { - instancesWithAvailableResources.put(instance.getResourceId(), instance); + instancesWithAvailableResources.put(instance.getTaskManagerID(), instance); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 2a0ecc2..88af604 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -349,7 +349,7 @@ class JobManager( currentResourceManager = Option(msg.resourceManager()) val taskManagerResources = instanceManager.getAllRegisteredInstances.asScala.map( - instance => instance.getResourceId).toList.asJava + instance => instance.getTaskManagerID).toList.asJava // confirm registration and send known task managers with their resource ids sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, taskManagerResources)) @@ -425,7 +425,6 @@ class JobManager( try { val instanceID = instanceManager.registerTaskManager( taskManager, - resourceId, connectionInfo, hardwareInformation, numberOfSlots, http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java index d8bd6cb..d5520fd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java @@ -28,6 +28,7 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.instance.ActorGateway; @@ -47,8 +48,10 @@ import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; + import org.junit.Test; import org.mockito.Matchers; + import scala.concurrent.ExecutionContext$; import scala.concurrent.Future$; import scala.concurrent.duration.FiniteDuration; @@ -60,7 +63,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; @@ -76,200 +80,210 @@ public class ExecutionGraphMetricsTest extends TestLogger { */ @Test public void testExecutionGraphRestartTimeMetric() throws JobException, IOException, InterruptedException { - // setup execution graph with mocked scheduling logic - int parallelism = 1; - - JobVertex jobVertex = new JobVertex("TestVertex"); - jobVertex.setParallelism(parallelism); - jobVertex.setInvokableClass(Tasks.NoOpInvokable.class); - JobGraph jobGraph = new JobGraph("Test Job", jobVertex); - - Configuration config = new Configuration(); - config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test"); - config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestingReporter.class.getName()); - - Configuration jobConfig = new Configuration(); - - FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS); - - MetricRegistry metricRegistry = new MetricRegistry(config); - - assertTrue(metricRegistry.getReporters().size() == 1); - - MetricReporter reporter = metricRegistry.getReporters().get(0); - - assertTrue(reporter instanceof TestingReporter); - - TestingReporter testingReporter = (TestingReporter) reporter; - - MetricGroup metricGroup = new JobManagerMetricGroup(metricRegistry, "localhost"); - - Scheduler scheduler = mock(Scheduler.class); - - SimpleSlot simpleSlot = mock(SimpleSlot.class); - - Instance instance = mock(Instance.class); - - TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class); - - Slot rootSlot = mock(Slot.class); - - ActorGateway actorGateway = mock(ActorGateway.class); - - when(simpleSlot.isAlive()).thenReturn(true); - when(simpleSlot.getTaskManagerID()).thenReturn(instance.getResourceId()); - when(simpleSlot.getTaskManagerLocation()).thenReturn(instance.getInstanceConnectionInfo()); - when(simpleSlot.setExecutedVertex(Matchers.any(Execution.class))).thenReturn(true); - when(simpleSlot.getRoot()).thenReturn(rootSlot); - - when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(simpleSlot); - - when(instance.getInstanceConnectionInfo()).thenReturn(taskManagerLocation); - when(instance.getActorGateway()).thenReturn(actorGateway); - when(taskManagerLocation.getHostname()).thenReturn("localhost"); - - when(rootSlot.getSlotNumber()).thenReturn(0); - - when(actorGateway.ask(Matchers.any(Object.class), Matchers.any(FiniteDuration.class))).thenReturn(Future$.MODULE$.<Object>successful(Messages.getAcknowledge())); - - TestingRestartStrategy testingRestartStrategy = new TestingRestartStrategy(); - - ExecutionGraph executionGraph = new ExecutionGraph( - ExecutionContext$.MODULE$.fromExecutor(new ForkJoinPool()), - jobGraph.getJobID(), - jobGraph.getName(), - jobConfig, - new SerializedValue<ExecutionConfig>(null), - timeout, - testingRestartStrategy, - Collections.<BlobKey>emptyList(), - Collections.<URL>emptyList(), - getClass().getClassLoader(), - metricGroup); - - // get restarting time metric - Metric metric = testingReporter.getMetric(ExecutionGraph.RESTARTING_TIME_METRIC_NAME); - - assertNotNull(metric); - assertTrue(metric instanceof Gauge); - - @SuppressWarnings("unchecked") - Gauge<Long> restartingTime = (Gauge<Long>) metric; - - // check that the restarting time is 0 since it's the initial start - assertTrue(0L == restartingTime.getValue()); - - executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); - - // start execution - executionGraph.scheduleForExecution(scheduler); - - assertTrue(0L == restartingTime.getValue()); - - List<ExecutionAttemptID> executionIDs = new ArrayList<>(); - - for (ExecutionVertex executionVertex: executionGraph.getAllExecutionVertices()) { - executionIDs.add(executionVertex.getCurrentExecutionAttempt().getAttemptId()); - } - - // tell execution graph that the tasks are in state running --> job status switches to state running - for (ExecutionAttemptID executionID : executionIDs) { - executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.RUNNING)); - } - - assertEquals(JobStatus.RUNNING, executionGraph.getState()); - - assertTrue(0L == restartingTime.getValue()); - - // fail the job so that it goes into state restarting - for (ExecutionAttemptID executionID : executionIDs) { - executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.FAILED, new Exception())); - } - - assertEquals(JobStatus.RESTARTING, executionGraph.getState()); - - long firstRestartingTimestamp = executionGraph.getStatusTimestamp(JobStatus.RESTARTING); - - // wait some time so that the restarting time gauge shows a value different from 0 - Thread.sleep(50); - - long previousRestartingTime = restartingTime.getValue(); - - // check that the restarting time is monotonically increasing - for (int i = 0; i < 10; i++) { - long currentRestartingTime = restartingTime.getValue(); - - assertTrue(currentRestartingTime >= previousRestartingTime); - previousRestartingTime = currentRestartingTime; - } - - // check that we have measured some restarting time - assertTrue(previousRestartingTime > 0); - - // restart job - testingRestartStrategy.restartExecutionGraph(); - - executionIDs.clear(); - - for (ExecutionVertex executionVertex: executionGraph.getAllExecutionVertices()) { - executionIDs.add(executionVertex.getCurrentExecutionAttempt().getAttemptId()); - } - - for (ExecutionAttemptID executionID : executionIDs) { - executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.RUNNING)); - } - - assertEquals(JobStatus.RUNNING, executionGraph.getState()); - - assertTrue(firstRestartingTimestamp != 0); - - previousRestartingTime = restartingTime.getValue(); - - // check that the restarting time does not increase after we've reached the running state - for (int i = 0; i < 10; i++) { - long currentRestartingTime = restartingTime.getValue(); - - assertTrue(currentRestartingTime == previousRestartingTime); - previousRestartingTime = currentRestartingTime; - } - - // fail job again - for (ExecutionAttemptID executionID : executionIDs) { - executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.FAILED, new Exception())); - } - - assertEquals(JobStatus.RESTARTING, executionGraph.getState()); - - long secondRestartingTimestamp = executionGraph.getStatusTimestamp(JobStatus.RESTARTING); - - assertTrue(firstRestartingTimestamp != secondRestartingTimestamp); - - Thread.sleep(50); - - previousRestartingTime = restartingTime.getValue(); - - // check that the restarting time is increasing again - for (int i = 0; i < 10; i++) { - long currentRestartingTime = restartingTime.getValue(); - - assertTrue(currentRestartingTime >= previousRestartingTime); - previousRestartingTime = currentRestartingTime; - } - - assertTrue(previousRestartingTime > 0); - - // now lets fail the job while it is in restarting and see whether the restarting time then stops to increase - executionGraph.fail(new Exception()); - - assertEquals(JobStatus.FAILED, executionGraph.getState()); - - previousRestartingTime = restartingTime.getValue(); - - for (int i = 0; i < 10; i++) { - long currentRestartingTime = restartingTime.getValue(); - - assertTrue(currentRestartingTime == previousRestartingTime); - previousRestartingTime = currentRestartingTime; + final ExecutorService executor = Executors.newCachedThreadPool(); + try { + // setup execution graph with mocked scheduling logic + int parallelism = 1; + + JobVertex jobVertex = new JobVertex("TestVertex"); + jobVertex.setParallelism(parallelism); + jobVertex.setInvokableClass(Tasks.NoOpInvokable.class); + JobGraph jobGraph = new JobGraph("Test Job", jobVertex); + + Configuration config = new Configuration(); + config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestingReporter.class.getName()); + + Configuration jobConfig = new Configuration(); + + FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS); + + MetricRegistry metricRegistry = new MetricRegistry(config); + + assertTrue(metricRegistry.getReporters().size() == 1); + + MetricReporter reporter = metricRegistry.getReporters().get(0); + + assertTrue(reporter instanceof TestingReporter); + + TestingReporter testingReporter = (TestingReporter) reporter; + + MetricGroup metricGroup = new JobManagerMetricGroup(metricRegistry, "localhost"); + + Scheduler scheduler = mock(Scheduler.class); + + ResourceID taskManagerId = ResourceID.generate(); + + TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class); + when(taskManagerLocation.getResourceID()).thenReturn(taskManagerId); + when(taskManagerLocation.getHostname()).thenReturn("localhost"); + + ActorGateway actorGateway = mock(ActorGateway.class); + + Instance instance = mock(Instance.class); + when(instance.getTaskManagerLocation()).thenReturn(taskManagerLocation); + when(instance.getTaskManagerID()).thenReturn(taskManagerId); + when(instance.getActorGateway()).thenReturn(actorGateway); + + Slot rootSlot = mock(Slot.class); + + SimpleSlot simpleSlot = mock(SimpleSlot.class); + when(simpleSlot.isAlive()).thenReturn(true); + when(simpleSlot.getTaskManagerLocation()).thenReturn(taskManagerLocation); + when(simpleSlot.getTaskManagerID()).thenReturn(taskManagerId); + when(simpleSlot.getTaskManagerActorGateway()).thenReturn(actorGateway); + when(simpleSlot.setExecutedVertex(Matchers.any(Execution.class))).thenReturn(true); + when(simpleSlot.getRoot()).thenReturn(rootSlot); + + when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(simpleSlot); + + + + when(rootSlot.getSlotNumber()).thenReturn(0); + + when(actorGateway.ask(Matchers.any(Object.class), Matchers.any(FiniteDuration.class))).thenReturn(Future$.MODULE$.<Object>successful(Messages.getAcknowledge())); + + TestingRestartStrategy testingRestartStrategy = new TestingRestartStrategy(); + + ExecutionGraph executionGraph = new ExecutionGraph( + ExecutionContext$.MODULE$.fromExecutor(executor), + jobGraph.getJobID(), + jobGraph.getName(), + jobConfig, + new SerializedValue<ExecutionConfig>(null), + timeout, + testingRestartStrategy, + Collections.<BlobKey>emptyList(), + Collections.<URL>emptyList(), + getClass().getClassLoader(), + metricGroup); + + // get restarting time metric + Metric metric = testingReporter.getMetric(ExecutionGraph.RESTARTING_TIME_METRIC_NAME); + + assertNotNull(metric); + assertTrue(metric instanceof Gauge); + + @SuppressWarnings("unchecked") + Gauge<Long> restartingTime = (Gauge<Long>) metric; + + // check that the restarting time is 0 since it's the initial start + assertTrue(0L == restartingTime.getValue()); + + executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); + + // start execution + executionGraph.scheduleForExecution(scheduler); + + assertTrue(0L == restartingTime.getValue()); + + List<ExecutionAttemptID> executionIDs = new ArrayList<>(); + + for (ExecutionVertex executionVertex: executionGraph.getAllExecutionVertices()) { + executionIDs.add(executionVertex.getCurrentExecutionAttempt().getAttemptId()); + } + + // tell execution graph that the tasks are in state running --> job status switches to state running + for (ExecutionAttemptID executionID : executionIDs) { + executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.RUNNING)); + } + + assertEquals(JobStatus.RUNNING, executionGraph.getState()); + + assertTrue(0L == restartingTime.getValue()); + + // fail the job so that it goes into state restarting + for (ExecutionAttemptID executionID : executionIDs) { + executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.FAILED, new Exception())); + } + + assertEquals(JobStatus.RESTARTING, executionGraph.getState()); + + long firstRestartingTimestamp = executionGraph.getStatusTimestamp(JobStatus.RESTARTING); + + // wait some time so that the restarting time gauge shows a value different from 0 + Thread.sleep(50); + + long previousRestartingTime = restartingTime.getValue(); + + // check that the restarting time is monotonically increasing + for (int i = 0; i < 10; i++) { + long currentRestartingTime = restartingTime.getValue(); + + assertTrue(currentRestartingTime >= previousRestartingTime); + previousRestartingTime = currentRestartingTime; + } + + // check that we have measured some restarting time + assertTrue(previousRestartingTime > 0); + + // restart job + testingRestartStrategy.restartExecutionGraph(); + + executionIDs.clear(); + + for (ExecutionVertex executionVertex: executionGraph.getAllExecutionVertices()) { + executionIDs.add(executionVertex.getCurrentExecutionAttempt().getAttemptId()); + } + + for (ExecutionAttemptID executionID : executionIDs) { + executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.RUNNING)); + } + + assertEquals(JobStatus.RUNNING, executionGraph.getState()); + + assertTrue(firstRestartingTimestamp != 0); + + previousRestartingTime = restartingTime.getValue(); + + // check that the restarting time does not increase after we've reached the running state + for (int i = 0; i < 10; i++) { + long currentRestartingTime = restartingTime.getValue(); + + assertTrue(currentRestartingTime == previousRestartingTime); + previousRestartingTime = currentRestartingTime; + } + + // fail job again + for (ExecutionAttemptID executionID : executionIDs) { + executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.FAILED, new Exception())); + } + + assertEquals(JobStatus.RESTARTING, executionGraph.getState()); + + long secondRestartingTimestamp = executionGraph.getStatusTimestamp(JobStatus.RESTARTING); + + assertTrue(firstRestartingTimestamp != secondRestartingTimestamp); + + Thread.sleep(50); + + previousRestartingTime = restartingTime.getValue(); + + // check that the restarting time is increasing again + for (int i = 0; i < 10; i++) { + long currentRestartingTime = restartingTime.getValue(); + + assertTrue(currentRestartingTime >= previousRestartingTime); + previousRestartingTime = currentRestartingTime; + } + + assertTrue(previousRestartingTime > 0); + + // now lets fail the job while it is in restarting and see whether the restarting time then stops to increase + executionGraph.fail(new Exception()); + + assertEquals(JobStatus.FAILED, executionGraph.getState()); + + previousRestartingTime = restartingTime.getValue(); + + for (int i = 0; i < 10; i++) { + long currentRestartingTime = restartingTime.getValue(); + + assertTrue(currentRestartingTime == previousRestartingTime); + previousRestartingTime = currentRestartingTime; + } + } finally { + executor.shutdownNow(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index cddb6cb..df47c3a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -113,7 +113,7 @@ public class ExecutionGraphTestUtils { InetAddress address = InetAddress.getByName("127.0.0.1"); TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001); - return new Instance(gateway, connection, resourceID, new InstanceID(), hardwareDescription, numberOfSlots); + return new Instance(gateway, connection, new InstanceID(), hardwareDescription, numberOfSlots); } @SuppressWarnings("serial") http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java index a71faf6..870ae05 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java @@ -84,8 +84,7 @@ public class TerminalStateDeadlockTest { TaskManagerLocation ci = new TaskManagerLocation(resourceId, address, 12345); HardwareDescription resources = new HardwareDescription(4, 4000000, 3000000, 2000000); - Instance instance = new Instance(DummyActorGateway.INSTANCE, ci, - resourceId, new InstanceID(), resources, 4); + Instance instance = new Instance(DummyActorGateway.INSTANCE, ci, new InstanceID(), resources, 4); this.resource = instance.allocateSimpleSlot(new JobID()); } http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java index f1ed960..f3747c8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java @@ -87,12 +87,9 @@ public class InstanceManagerTest{ final JavaTestKit probe2 = new JavaTestKit(system); final JavaTestKit probe3 = new JavaTestKit(system); - cm.registerTaskManager(probe1.getRef(), resID1, - ici1, hardwareDescription, 1, leaderSessionID); - cm.registerTaskManager(probe2.getRef(), resID2, - ici2, hardwareDescription, 2, leaderSessionID); - cm.registerTaskManager(probe3.getRef(), resID3, - ici3, hardwareDescription, 5, leaderSessionID); + cm.registerTaskManager(probe1.getRef(), ici1, hardwareDescription, 1, leaderSessionID); + cm.registerTaskManager(probe2.getRef(), ici2, hardwareDescription, 2, leaderSessionID); + cm.registerTaskManager(probe3.getRef(), ici3, hardwareDescription, 5, leaderSessionID); assertEquals(3, cm.getNumberOfRegisteredTaskManagers()); assertEquals(8, cm.getTotalNumberOfSlots()); @@ -102,7 +99,7 @@ public class InstanceManagerTest{ HashSet<TaskManagerLocation>(); for(Instance instance: instances){ - taskManagerLocations.add(instance.getInstanceConnectionInfo()); + taskManagerLocations.add(instance.getTaskManagerLocation()); } assertTrue(taskManagerLocations.contains(ici1)); @@ -133,14 +130,13 @@ public class InstanceManagerTest{ TaskManagerLocation ici = new TaskManagerLocation(resID1, address, dataPort); JavaTestKit probe = new JavaTestKit(system); - cm.registerTaskManager(probe.getRef(), resID1, - ici, resources, 1, leaderSessionID); + cm.registerTaskManager(probe.getRef(), ici, resources, 1, leaderSessionID); assertEquals(1, cm.getNumberOfRegisteredTaskManagers()); assertEquals(1, cm.getTotalNumberOfSlots()); try { - cm.registerTaskManager(probe.getRef(), resID2, ici, resources, 1, leaderSessionID); + cm.registerTaskManager(probe.getRef(), ici, resources, 1, leaderSessionID); } catch (Exception e) { // good } @@ -182,12 +178,12 @@ public class InstanceManagerTest{ JavaTestKit probe2 = new JavaTestKit(system); JavaTestKit probe3 = new JavaTestKit(system); - InstanceID instanceID1 = cm.registerTaskManager(probe1.getRef(), resID1, - ici1, hardwareDescription, 1, leaderSessionID); - InstanceID instanceID2 = cm.registerTaskManager(probe2.getRef(), resID2, - ici2, hardwareDescription, 1, leaderSessionID); - InstanceID instanceID3 = cm.registerTaskManager(probe3.getRef(), resID3, - ici3, hardwareDescription, 1, leaderSessionID); + InstanceID instanceID1 = cm.registerTaskManager( + probe1.getRef(), ici1, hardwareDescription, 1, leaderSessionID); + InstanceID instanceID2 = cm.registerTaskManager( + probe2.getRef(), ici2, hardwareDescription, 1, leaderSessionID); + InstanceID instanceID3 = cm.registerTaskManager( + probe3.getRef(), ici3, hardwareDescription, 1, leaderSessionID); // report some immediate heart beats assertTrue(cm.reportHeartBeat(instanceID1, new byte[] {})); @@ -241,8 +237,7 @@ public class InstanceManagerTest{ TaskManagerLocation ici = new TaskManagerLocation(resID, address, 20000); JavaTestKit probe = new JavaTestKit(system); - cm.registerTaskManager(probe.getRef(), resID, - ici, resources, 1, leaderSessionID); + cm.registerTaskManager(probe.getRef(), ici, resources, 1, leaderSessionID); fail("Should raise exception in shutdown state"); } catch (IllegalStateException e) { http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java index 82d3723..aee62fd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java @@ -42,7 +42,7 @@ public class InstanceTest { TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001); Instance instance = new Instance(DummyActorGateway.INSTANCE, connection, - resourceID, new InstanceID(), hardwareDescription, 4); + new InstanceID(), hardwareDescription, 4); assertEquals(4, instance.getTotalNumberOfSlots()); assertEquals(4, instance.getNumberOfAvailableSlots()); @@ -105,7 +105,7 @@ public class InstanceTest { TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001); Instance instance = new Instance(DummyActorGateway.INSTANCE, connection, - resourceID, new InstanceID(), hardwareDescription, 3); + new InstanceID(), hardwareDescription, 3); assertEquals(3, instance.getNumberOfAvailableSlots()); @@ -137,7 +137,7 @@ public class InstanceTest { TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001); Instance instance = new Instance(DummyActorGateway.INSTANCE, connection, - resourceID, new InstanceID(), hardwareDescription, 3); + new InstanceID(), hardwareDescription, 3); assertEquals(3, instance.getNumberOfAvailableSlots()); http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java index 2c40e89..0edef5e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java @@ -132,7 +132,7 @@ public class SharedSlotsTest { assertEquals(Locality.LOCAL, sub1.getLocality()); assertEquals(1, sub1.getNumberLeaves()); assertEquals(vid1, sub1.getGroupID()); - assertEquals(instance.getResourceId(), sub1.getTaskManagerID()); + assertEquals(instance.getTaskManagerID(), sub1.getTaskManagerID()); assertEquals(jobId, sub1.getJobID()); assertEquals(sharedSlot, sub1.getParent()); assertEquals(sharedSlot, sub1.getRoot()); @@ -151,7 +151,7 @@ public class SharedSlotsTest { assertEquals(Locality.UNCONSTRAINED, sub2.getLocality()); assertEquals(1, sub2.getNumberLeaves()); assertEquals(vid2, sub2.getGroupID()); - assertEquals(instance.getResourceId(), sub2.getTaskManagerID()); + assertEquals(instance.getTaskManagerID(), sub2.getTaskManagerID()); assertEquals(jobId, sub2.getJobID()); assertEquals(sharedSlot, sub2.getParent()); assertEquals(sharedSlot, sub2.getRoot()); @@ -163,14 +163,14 @@ public class SharedSlotsTest { assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid3)); assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid4)); - SimpleSlot sub3 = assignment.getSlotForTask(vid3, Collections.singleton(instance.getInstanceConnectionInfo())); + SimpleSlot sub3 = assignment.getSlotForTask(vid3, Collections.singleton(instance.getTaskManagerLocation())); assertNotNull(sub3); assertNull(sub3.getExecutedVertex()); assertEquals(Locality.LOCAL, sub3.getLocality()); assertEquals(1, sub3.getNumberLeaves()); assertEquals(vid3, sub3.getGroupID()); - assertEquals(instance.getResourceId(), sub3.getTaskManagerID()); + assertEquals(instance.getTaskManagerID(), sub3.getTaskManagerID()); assertEquals(jobId, sub3.getJobID()); assertEquals(sharedSlot, sub3.getParent()); assertEquals(sharedSlot, sub3.getRoot()); @@ -183,14 +183,14 @@ public class SharedSlotsTest { assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(vid4)); SimpleSlot sub4 = assignment.getSlotForTask(vid4, - Collections.singleton(SchedulerTestUtils.getRandomInstance(1).getInstanceConnectionInfo())); + Collections.singleton(SchedulerTestUtils.getRandomInstance(1).getTaskManagerLocation())); assertNotNull(sub4); assertNull(sub4.getExecutedVertex()); assertEquals(Locality.NON_LOCAL, sub4.getLocality()); assertEquals(1, sub4.getNumberLeaves()); assertEquals(vid4, sub4.getGroupID()); - assertEquals(instance.getResourceId(), sub4.getTaskManagerID()); + assertEquals(instance.getTaskManagerID(), sub4.getTaskManagerID()); assertEquals(jobId, sub4.getJobID()); assertEquals(sharedSlot, sub4.getParent()); assertEquals(sharedSlot, sub4.getRoot()); @@ -456,7 +456,7 @@ public class SharedSlotsTest { assertNotNull(constraint.getSharedSlot()); assertTrue(constraint.isAssigned()); assertTrue(constraint.isAssignedAndAlive()); - assertEquals(instance.getInstanceConnectionInfo(), constraint.getLocation()); + assertEquals(instance.getTaskManagerLocation(), constraint.getLocation()); SimpleSlot tailSlot = assignment.getSlotForTask(constraint, Collections.<TaskManagerLocation>emptySet()); @@ -475,7 +475,7 @@ public class SharedSlotsTest { assertTrue(tailSlot.isReleased()); assertTrue(constraint.isAssigned()); assertFalse(constraint.isAssignedAndAlive()); - assertEquals(instance.getInstanceConnectionInfo(), constraint.getLocation()); + assertEquals(instance.getTaskManagerLocation(), constraint.getLocation()); // we should have resources again for the co-location constraint assertEquals(1, assignment.getNumberOfAvailableSlotsForGroup(constraint.getGroupId())); @@ -488,10 +488,10 @@ public class SharedSlotsTest { assertEquals(0, assignment.getNumberOfAvailableSlotsForGroup(constraint.getGroupId())); // verify some basic properties of the slots - assertEquals(instance.getResourceId(), sourceSlot.getTaskManagerID()); - assertEquals(instance.getResourceId(), headSlot.getTaskManagerID()); - assertEquals(instance.getResourceId(), tailSlot.getTaskManagerID()); - assertEquals(instance.getResourceId(), sinkSlot.getTaskManagerID()); + assertEquals(instance.getTaskManagerID(), sourceSlot.getTaskManagerID()); + assertEquals(instance.getTaskManagerID(), headSlot.getTaskManagerID()); + assertEquals(instance.getTaskManagerID(), tailSlot.getTaskManagerID()); + assertEquals(instance.getTaskManagerID(), sinkSlot.getTaskManagerID()); assertEquals(sourceId, sourceSlot.getGroupID()); assertEquals(sinkId, sinkSlot.getGroupID()); http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java index 82c2a74..c690d36 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java @@ -150,7 +150,7 @@ public class SimpleSlotTest { TaskManagerLocation connection = new TaskManagerLocation(resourceID, address, 10001); Instance instance = new Instance(DummyActorGateway.INSTANCE, connection, - resourceID, new InstanceID(), hardwareDescription, 1); + new InstanceID(), hardwareDescription, 1); return instance.allocateSimpleSlot(new JobID()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java index fff7bc6..5722cac 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.io.network.partition; -import com.google.common.collect.Lists; import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; @@ -36,6 +35,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -121,7 +121,7 @@ public class SpilledSubpartitionViewTest { } } - final List<Future<Boolean>> results = Lists.newArrayList(); + final List<Future<Boolean>> results = new ArrayList<>(); // Submit the consuming tasks for (ResultSubpartitionView view : readers) { http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java index 3bd4368..1344aef 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java @@ -140,7 +140,7 @@ public class CoLocationConstraintTest { // now, the location is assigned and we have a location assertTrue(constraint.isAssigned()); assertTrue(constraint.isAssignedAndAlive()); - assertEquals(instance2, constraint.getLocation()); + assertEquals(instance2.getTaskManagerLocation(), constraint.getLocation()); // release the slot slot2_1.releaseSlot(); @@ -148,7 +148,7 @@ public class CoLocationConstraintTest { // we should still have a location assertTrue(constraint.isAssigned()); assertFalse(constraint.isAssignedAndAlive()); - assertEquals(instance2, constraint.getLocation()); + assertEquals(instance2.getTaskManagerLocation(), constraint.getLocation()); // we can not assign a different location try { @@ -167,7 +167,7 @@ public class CoLocationConstraintTest { assertTrue(constraint.isAssigned()); assertTrue(constraint.isAssignedAndAlive()); - assertEquals(instance2, constraint.getLocation()); + assertEquals(instance2.getTaskManagerLocation(), constraint.getLocation()); } catch (Exception e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java index 5b7d18a..eab4fea 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java @@ -326,8 +326,8 @@ public class ScheduleWithCoLocationHintTest { Instance i1 = getRandomInstance(1); Instance i2 = getRandomInstance(1); - TaskManagerLocation loc1 = i1.getInstanceConnectionInfo(); - TaskManagerLocation loc2 = i2.getInstanceConnectionInfo(); + TaskManagerLocation loc1 = i1.getTaskManagerLocation(); + TaskManagerLocation loc2 = i2.getTaskManagerLocation(); scheduler.newInstanceAvailable(i2); scheduler.newInstanceAvailable(i1); @@ -398,8 +398,8 @@ public class ScheduleWithCoLocationHintTest { Instance i1 = getRandomInstance(1); Instance i2 = getRandomInstance(1); - TaskManagerLocation loc1 = i1.getInstanceConnectionInfo(); - TaskManagerLocation loc2 = i2.getInstanceConnectionInfo(); + TaskManagerLocation loc1 = i1.getTaskManagerLocation(); + TaskManagerLocation loc2 = i2.getTaskManagerLocation(); scheduler.newInstanceAvailable(i2); scheduler.newInstanceAvailable(i1); @@ -425,8 +425,8 @@ public class ScheduleWithCoLocationHintTest { SimpleSlot s4 = scheduler.scheduleImmediately(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2)); // still preserves the previous instance mapping) - assertEquals(i1.getResourceId(), s3.getTaskManagerID()); - assertEquals(i2.getResourceId(), s4.getTaskManagerID()); + assertEquals(i1.getTaskManagerID(), s3.getTaskManagerID()); + assertEquals(i2.getTaskManagerID(), s4.getTaskManagerID()); s3.releaseSlot(); s4.releaseSlot(); @@ -455,8 +455,8 @@ public class ScheduleWithCoLocationHintTest { Instance i1 = getRandomInstance(1); Instance i2 = getRandomInstance(1); - TaskManagerLocation loc1 = i1.getInstanceConnectionInfo(); - TaskManagerLocation loc2 = i2.getInstanceConnectionInfo(); + TaskManagerLocation loc1 = i1.getTaskManagerLocation(); + TaskManagerLocation loc2 = i2.getTaskManagerLocation(); scheduler.newInstanceAvailable(i2); scheduler.newInstanceAvailable(i1); @@ -516,7 +516,7 @@ public class ScheduleWithCoLocationHintTest { Instance i1 = getRandomInstance(1); Instance i2 = getRandomInstance(1); - TaskManagerLocation loc1 = i1.getInstanceConnectionInfo(); + TaskManagerLocation loc1 = i1.getTaskManagerLocation(); scheduler.newInstanceAvailable(i2); scheduler.newInstanceAvailable(i1); @@ -580,8 +580,8 @@ public class ScheduleWithCoLocationHintTest { Instance i1 = getRandomInstance(1); Instance i2 = getRandomInstance(1); - TaskManagerLocation loc1 = i1.getInstanceConnectionInfo(); - TaskManagerLocation loc2 = i2.getInstanceConnectionInfo(); + TaskManagerLocation loc1 = i1.getTaskManagerLocation(); + TaskManagerLocation loc2 = i2.getTaskManagerLocation(); scheduler.newInstanceAvailable(i2); scheduler.newInstanceAvailable(i1); http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java index a683834..fd0523b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java @@ -108,10 +108,10 @@ public class SchedulerSlotSharingTest { // make sure we have two slots on the first instance, and two on the second int c = 0; - c += (s5.getTaskManagerID().equals(i1.getResourceId())) ? 1 : -1; - c += (s6.getTaskManagerID().equals(i1.getResourceId())) ? 1 : -1; - c += (s7.getTaskManagerID().equals(i1.getResourceId())) ? 1 : -1; - c += (s8.getTaskManagerID().equals(i1.getResourceId())) ? 1 : -1; + c += (s5.getTaskManagerID().equals(i1.getTaskManagerID())) ? 1 : -1; + c += (s6.getTaskManagerID().equals(i1.getTaskManagerID())) ? 1 : -1; + c += (s7.getTaskManagerID().equals(i1.getTaskManagerID())) ? 1 : -1; + c += (s8.getTaskManagerID().equals(i1.getTaskManagerID())) ? 1 : -1; assertEquals(0, c); // release all @@ -637,8 +637,8 @@ public class SchedulerSlotSharingTest { Instance i1 = getRandomInstance(2); Instance i2 = getRandomInstance(2); - TaskManagerLocation loc1 = i1.getInstanceConnectionInfo(); - TaskManagerLocation loc2 = i2.getInstanceConnectionInfo(); + TaskManagerLocation loc1 = i1.getTaskManagerLocation(); + TaskManagerLocation loc2 = i2.getTaskManagerLocation(); Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); scheduler.newInstanceAvailable(i1); @@ -690,8 +690,8 @@ public class SchedulerSlotSharingTest { Instance i1 = getRandomInstance(2); Instance i2 = getRandomInstance(2); - TaskManagerLocation loc1 = i1.getInstanceConnectionInfo(); - TaskManagerLocation loc2 = i2.getInstanceConnectionInfo(); + TaskManagerLocation loc1 = i1.getTaskManagerLocation(); + TaskManagerLocation loc2 = i2.getTaskManagerLocation(); Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); scheduler.newInstanceAvailable(i1); @@ -743,7 +743,7 @@ public class SchedulerSlotSharingTest { Instance i1 = getRandomInstance(2); Instance i2 = getRandomInstance(2); - TaskManagerLocation loc1 = i1.getInstanceConnectionInfo(); + TaskManagerLocation loc1 = i1.getTaskManagerLocation(); Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); scheduler.newInstanceAvailable(i1); @@ -771,12 +771,12 @@ public class SchedulerSlotSharingTest { assertEquals(0, i1.getNumberOfAvailableSlots()); assertEquals(0, i2.getNumberOfAvailableSlots()); - assertEquals(i1.getResourceId(), s1.getTaskManagerID()); - assertEquals(i1.getResourceId(), s2.getTaskManagerID()); - assertEquals(i1.getResourceId(), s3.getTaskManagerID()); - assertEquals(i1.getResourceId(), s4.getTaskManagerID()); - assertEquals(i2.getResourceId(), s5.getTaskManagerID()); - assertEquals(i2.getResourceId(), s6.getTaskManagerID()); + assertEquals(i1.getTaskManagerID(), s1.getTaskManagerID()); + assertEquals(i1.getTaskManagerID(), s2.getTaskManagerID()); + assertEquals(i1.getTaskManagerID(), s3.getTaskManagerID()); + assertEquals(i1.getTaskManagerID(), s4.getTaskManagerID()); + assertEquals(i2.getTaskManagerID(), s5.getTaskManagerID()); + assertEquals(i2.getTaskManagerID(), s6.getTaskManagerID()); // check the scheduler's bookkeeping assertEquals(4, scheduler.getNumberOfLocalizedAssignments()); http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java index eef27a8..d040ec4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java @@ -69,8 +69,7 @@ public class SchedulerTestUtils { final long GB = 1024L*1024*1024; HardwareDescription resources = new HardwareDescription(4, 4*GB, 3*GB, 2*GB); - return new Instance(DummyActorGateway.INSTANCE, ci, resourceID, - new InstanceID(), resources, numSlots); + return new Instance(DummyActorGateway.INSTANCE, ci, new InstanceID(), resources, numSlots); } @@ -88,7 +87,7 @@ public class SchedulerTestUtils { public static Execution getTestVertex(Instance... preferredInstances) { List<TaskManagerLocation> locations = new ArrayList<>(preferredInstances.length); for (Instance i : preferredInstances) { - locations.add(i.getInstanceConnectionInfo()); + locations.add(i.getTaskManagerLocation()); } return getTestVertex(locations); } http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java index d9c100c..ea0d2cc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java @@ -56,9 +56,9 @@ public class SlotAllocationFutureTest { final Instance instance2 = SchedulerTestUtils.getRandomInstance(1); final SimpleSlot slot1 = new SimpleSlot(new JobID(), instance1, - instance1.getInstanceConnectionInfo(), 0, instance1.getActorGateway(), null, null); + instance1.getTaskManagerLocation(), 0, instance1.getActorGateway(), null, null); final SimpleSlot slot2 = new SimpleSlot(new JobID(), instance2, - instance2.getInstanceConnectionInfo(), 0, instance2.getActorGateway(), null, null); + instance2.getTaskManagerLocation(), 0, instance2.getActorGateway(), null, null); future.setSlot(slot1); try { @@ -85,7 +85,7 @@ public class SlotAllocationFutureTest { final Instance instance = SchedulerTestUtils.getRandomInstance(1); final SimpleSlot thisSlot = new SimpleSlot(new JobID(), instance, - instance.getInstanceConnectionInfo(), 0, instance.getActorGateway(), null, null); + instance.getTaskManagerLocation(), 0, instance.getActorGateway(), null, null); SlotAllocationFuture future = new SlotAllocationFuture(); @@ -108,7 +108,7 @@ public class SlotAllocationFutureTest { final Instance instance = SchedulerTestUtils.getRandomInstance(1); final SimpleSlot thisSlot = new SimpleSlot(new JobID(), instance, - instance.getInstanceConnectionInfo(), 0, instance.getActorGateway(), null, null); + instance.getTaskManagerLocation(), 0, instance.getActorGateway(), null, null); SlotAllocationFuture future = new SlotAllocationFuture(); future.setSlot(thisSlot); @@ -141,7 +141,7 @@ public class SlotAllocationFutureTest { final Instance instance = SchedulerTestUtils.getRandomInstance(1); final SimpleSlot thisSlot = new SimpleSlot(new JobID(), instance, - instance.getInstanceConnectionInfo(), 0, instance.getActorGateway(), null, null); + instance.getTaskManagerLocation(), 0, instance.getActorGateway(), null, null); final SlotAllocationFuture future = new SlotAllocationFuture(); @@ -181,7 +181,7 @@ public class SlotAllocationFutureTest { final Instance instance = SchedulerTestUtils.getRandomInstance(1); final SimpleSlot thisSlot = new SimpleSlot(new JobID(), instance, - instance.getInstanceConnectionInfo(), 0, instance.getActorGateway(), null, null); + instance.getTaskManagerLocation(), 0, instance.getActorGateway(), null, null); final SlotAllocationFuture future = new SlotAllocationFuture(); future.setSlot(thisSlot);