Repository: flink
Updated Branches:
  refs/heads/master c251efca2 -> fb8f2c935


[FLINK-4525] [core] (followup) Remove remaining redundant code for pre-defined 
strictly local assignments.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fb8f2c93
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fb8f2c93
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fb8f2c93

Branch: refs/heads/master
Commit: fb8f2c935e96c9300a7584e310eb96e8a1f32f7f
Parents: eac6088
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Aug 31 13:52:45 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 2 17:32:57 2016 +0200

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionVertex.java |  13 -
 .../VertexLocationConstraintTest.java           | 456 -------------------
 2 files changed, 469 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fb8f2c93/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index f02647e..88e1b88 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -97,8 +97,6 @@ public class ExecutionVertex {
 
        private volatile Execution currentExecution;    // this field must 
never be null
 
-       private volatile List<TaskManagerLocation> locationConstraintInstances;
-
        private volatile boolean scheduleLocalOnly;
 
        // 
--------------------------------------------------------------------------------------------
@@ -351,10 +349,6 @@ public class ExecutionVertex {
                }
        }
 
-       public void setLocationConstraintHosts(List<TaskManagerLocation> 
instances) {
-               this.locationConstraintInstances = instances;
-       }
-
        public void setScheduleLocalOnly(boolean scheduleLocalOnly) {
                if (scheduleLocalOnly && inputEdges != null && 
inputEdges.length > 0) {
                        throw new IllegalArgumentException("Strictly local 
scheduling is only supported for sources.");
@@ -376,12 +370,6 @@ public class ExecutionVertex {
         * @return The preferred locations for this vertex execution, or null, 
if there is no preference.
         */
        public Iterable<TaskManagerLocation> getPreferredLocations() {
-               // if we have hard location constraints, use those
-               List<TaskManagerLocation> constraintInstances = 
this.locationConstraintInstances;
-               if (constraintInstances != null && 
!constraintInstances.isEmpty()) {
-                       return constraintInstances;
-               }
-
                // otherwise, base the preferred locations on the input 
connections
                if (inputEdges == null) {
                        return Collections.emptySet();
@@ -570,7 +558,6 @@ public class ExecutionVertex {
                this.resultPartitions = null;
                this.inputEdges = null;
                this.locationConstraint = null;
-               this.locationConstraintInstances = null;
        }
 
        public void cachePartitionInfo(PartialInputChannelDeploymentDescriptor 
partitionInfo){

http://git-wip-us.apache.org/repos/asf/flink/blob/fb8f2c93/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
deleted file mode 100644
index a1f3345..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
+++ /dev/null
@@ -1,456 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-import java.net.InetAddress;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.instance.DummyActorGateway;
-import org.apache.flink.runtime.instance.HardwareDescription;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.SerializedValue;
-import org.junit.Test;
-
-import scala.concurrent.duration.FiniteDuration;
-
-public class VertexLocationConstraintTest {
-
-       private static final FiniteDuration timeout = new FiniteDuration(100, 
TimeUnit.SECONDS);
-       
-       @Test
-       public void testScheduleWithConstraint1() {
-               try {
-                       final byte[] address1 = { 10, 0, 1, 4 };
-                       final byte[] address2 = { 10, 0, 1, 5 };
-                       final byte[] address3 = { 10, 0, 1, 6 };
-                       
-                       final String hostname1 = "host1";
-                       final String hostname2 = "host2";
-                       final String hostname3 = "host3";
-                       
-                       // prepare the scheduler
-                       Instance instance1 = getInstance(address1, 6789, 
hostname1);
-                       Instance instance2 = getInstance(address2, 6789, 
hostname2);
-                       Instance instance3 = getInstance(address3, 6789, 
hostname3);
-                       
-                       Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
-                       scheduler.newInstanceAvailable(instance1);
-                       scheduler.newInstanceAvailable(instance2);
-                       scheduler.newInstanceAvailable(instance3);
-                       
-                       // prepare the execution graph
-                       JobVertex jobVertex = new JobVertex("test vertex", new 
JobVertexID());
-                       jobVertex.setInvokableClass(DummyInvokable.class);
-                       jobVertex.setParallelism(2);
-                       JobGraph jg = new JobGraph("test job", jobVertex);
-                       
-                       ExecutionGraph eg = new ExecutionGraph(
-                                       TestingUtils.defaultExecutionContext(),
-                                       jg.getJobID(),
-                                       jg.getName(),
-                                       jg.getJobConfiguration(),
-                                       new SerializedValue<>(new 
ExecutionConfig()),
-                                       timeout,
-                                       new NoRestartStrategy());
-                       eg.attachJobGraph(Collections.singletonList(jobVertex));
-                       
-                       ExecutionJobVertex ejv = 
eg.getAllVertices().get(jobVertex.getID());
-                       ExecutionVertex[] vertices = ejv.getTaskVertices();
-                       
-                       
vertices[0].setLocationConstraintHosts(Arrays.asList(instance1.getInstanceConnectionInfo(),
 instance2.getInstanceConnectionInfo()));
-                       
vertices[1].setLocationConstraintHosts(Collections.singletonList(instance3.getInstanceConnectionInfo()));
-                       
-                       vertices[0].setScheduleLocalOnly(true);
-                       vertices[1].setScheduleLocalOnly(true);
-                       
-                       ejv.scheduleAll(scheduler, false);
-                       
-                       SimpleSlot slot1 = 
vertices[0].getCurrentAssignedResource();
-                       SimpleSlot slot2 = 
vertices[1].getCurrentAssignedResource();
-                       
-                       assertNotNull(slot1);
-                       assertNotNull(slot2);
-                       
-                       ResourceID target1 = slot1.getTaskManagerID();
-                       ResourceID target2 = slot2.getTaskManagerID();
-                       
-                       assertNotNull(target1);
-                       assertNotNull(target2);
-                       
-                       assertTrue(target1 == instance1.getResourceId() || 
target1 == instance2.getResourceId());
-                       assertEquals(target2, instance3.getResourceId());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testScheduleWithConstraint2() {
-               
-               // same test as above, which swapped host names to guard 
against "accidentally worked" because of
-               // the order in which requests are handles by internal data 
structures
-               
-               try {
-                       final byte[] address1 = { 10, 0, 1, 4 };
-                       final byte[] address2 = { 10, 0, 1, 5 };
-                       final byte[] address3 = { 10, 0, 1, 6 };
-                       
-                       final String hostname1 = "host1";
-                       final String hostname2 = "host2";
-                       final String hostname3 = "host3";
-                       
-                       // prepare the scheduler
-                       Instance instance1 = getInstance(address1, 6789, 
hostname1);
-                       Instance instance2 = getInstance(address2, 6789, 
hostname2);
-                       Instance instance3 = getInstance(address3, 6789, 
hostname3);
-                       
-                       Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
-                       scheduler.newInstanceAvailable(instance1);
-                       scheduler.newInstanceAvailable(instance2);
-                       scheduler.newInstanceAvailable(instance3);
-                       
-                       // prepare the execution graph
-                       JobVertex jobVertex = new JobVertex("test vertex", new 
JobVertexID());
-                       jobVertex.setInvokableClass(DummyInvokable.class);
-                       jobVertex.setParallelism(2);
-                       JobGraph jg = new JobGraph("test job", jobVertex);
-                       
-                       ExecutionGraph eg = new ExecutionGraph(
-                                       TestingUtils.defaultExecutionContext(),
-                                       jg.getJobID(),
-                                       jg.getName(),
-                                       jg.getJobConfiguration(),
-                                       new SerializedValue<>(new 
ExecutionConfig()),
-                                       timeout,
-                                       new NoRestartStrategy());
-                       eg.attachJobGraph(Collections.singletonList(jobVertex));
-                       
-                       ExecutionJobVertex ejv = 
eg.getAllVertices().get(jobVertex.getID());
-                       ExecutionVertex[] vertices = ejv.getTaskVertices();
-                       
-                       
vertices[0].setLocationConstraintHosts(Collections.singletonList(instance3.getInstanceConnectionInfo()));
-                       
vertices[1].setLocationConstraintHosts(Arrays.asList(instance1.getInstanceConnectionInfo(),
 instance2.getInstanceConnectionInfo()));
-                       
-                       vertices[0].setScheduleLocalOnly(true);
-                       vertices[1].setScheduleLocalOnly(true);
-                       
-                       ejv.scheduleAll(scheduler, false);
-                       
-                       SimpleSlot slot1 = 
vertices[0].getCurrentAssignedResource();
-                       SimpleSlot slot2 = 
vertices[1].getCurrentAssignedResource();
-                       
-                       assertNotNull(slot1);
-                       assertNotNull(slot2);
-                       
-                       ResourceID target1 = slot1.getTaskManagerID();
-                       ResourceID target2 = slot2.getTaskManagerID();
-                       
-                       assertTrue(target1 == instance3.getResourceId());
-                       assertTrue(target2 == instance1.getResourceId() || 
target2 == instance2.getResourceId());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testScheduleWithConstraintAndSlotSharing() {
-               try {
-                       final byte[] address1 = { 10, 0, 1, 4 };
-                       final byte[] address2 = { 10, 0, 1, 5 };
-                       final byte[] address3 = { 10, 0, 1, 6 };
-                       
-                       final String hostname1 = "host1";
-                       final String hostname2 = "host2";
-                       final String hostname3 = "host3";
-                       
-                       // prepare the scheduler
-                       Instance instance1 = getInstance(address1, 6789, 
hostname1);
-                       Instance instance2 = getInstance(address2, 6789, 
hostname2);
-                       Instance instance3 = getInstance(address3, 6789, 
hostname3);
-                       
-                       Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
-                       scheduler.newInstanceAvailable(instance1);
-                       scheduler.newInstanceAvailable(instance2);
-                       scheduler.newInstanceAvailable(instance3);
-                       
-                       // prepare the execution graph
-                       JobVertex jobVertex1 = new JobVertex("v1", new 
JobVertexID());
-                       JobVertex jobVertex2 = new JobVertex("v2", new 
JobVertexID());
-                       jobVertex1.setInvokableClass(DummyInvokable.class);
-                       jobVertex2.setInvokableClass(DummyInvokable.class);
-                       jobVertex1.setParallelism(2);
-                       jobVertex2.setParallelism(3);
-                       
-                       SlotSharingGroup sharingGroup = new SlotSharingGroup();
-                       jobVertex1.setSlotSharingGroup(sharingGroup);
-                       jobVertex2.setSlotSharingGroup(sharingGroup);
-                       
-                       JobGraph jg = new JobGraph("test job", jobVertex1, 
jobVertex2);
-                       
-                       ExecutionGraph eg = new ExecutionGraph(
-                                       TestingUtils.defaultExecutionContext(),
-                                       jg.getJobID(),
-                                       jg.getName(),
-                                       jg.getJobConfiguration(),
-                                       new SerializedValue<>(new 
ExecutionConfig()),
-                                       timeout,
-                                       new NoRestartStrategy());
-                       eg.attachJobGraph(Arrays.asList(jobVertex1, 
jobVertex2));
-                       
-                       ExecutionJobVertex ejv = 
eg.getAllVertices().get(jobVertex1.getID());
-                       ExecutionVertex[] vertices = ejv.getTaskVertices();
-                       
-                       
vertices[0].setLocationConstraintHosts(Arrays.asList(instance1.getInstanceConnectionInfo(),
 instance2.getInstanceConnectionInfo()));
-                       
vertices[1].setLocationConstraintHosts(Collections.singletonList(instance3.getInstanceConnectionInfo()));
-                       
-                       vertices[0].setScheduleLocalOnly(true);
-                       vertices[1].setScheduleLocalOnly(true);
-                       
-                       ejv.scheduleAll(scheduler, false);
-                       
-                       SimpleSlot slot1 = 
vertices[0].getCurrentAssignedResource();
-                       SimpleSlot slot2 = 
vertices[1].getCurrentAssignedResource();
-                       
-                       assertNotNull(slot1);
-                       assertNotNull(slot2);
-
-                       ResourceID target1 = slot1.getTaskManagerID();
-                       ResourceID target2 = slot2.getTaskManagerID();
-
-                       assertTrue(target1 == instance1.getResourceId() || 
target1 == instance2.getResourceId());
-                       assertTrue(target2 == instance3.getResourceId());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testScheduleWithUnfulfillableConstraint() {
-               
-               // same test as above, which swapped host names to guard 
against "accidentally worked" because of
-               // the order in which requests are handles by internal data 
structures
-               
-               try {
-                       final byte[] address1 = { 10, 0, 1, 4 };
-                       final byte[] address2 = { 10, 0, 1, 5 };
-                       
-                       final String hostname1 = "host1";
-                       final String hostname2 = "host2";
-                       
-                       // prepare the scheduler
-                       Instance instance1 = getInstance(address1, 6789, 
hostname1);
-                       Instance instance2 = getInstance(address2, 6789, 
hostname2);
-                       
-                       Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
-                       scheduler.newInstanceAvailable(instance1);
-                       
-                       // prepare the execution graph
-                       JobVertex jobVertex = new JobVertex("test vertex", new 
JobVertexID());
-                       jobVertex.setInvokableClass(DummyInvokable.class);
-                       jobVertex.setParallelism(1);
-                       JobGraph jg = new JobGraph("test job", jobVertex);
-                       
-                       ExecutionGraph eg = new ExecutionGraph(
-                                       TestingUtils.defaultExecutionContext(),
-                                       jg.getJobID(),
-                                       jg.getName(),
-                                       jg.getJobConfiguration(),
-                                       new SerializedValue<>(new 
ExecutionConfig()),
-                                       timeout,
-                                       new NoRestartStrategy());
-                       eg.attachJobGraph(Collections.singletonList(jobVertex));
-                       
-                       ExecutionJobVertex ejv = 
eg.getAllVertices().get(jobVertex.getID());
-                       ExecutionVertex[] vertices = ejv.getTaskVertices();
-                       
-                       
vertices[0].setLocationConstraintHosts(Collections.singletonList(instance2.getInstanceConnectionInfo()));
-                       vertices[0].setScheduleLocalOnly(true);
-                       
-                       try {
-                               ejv.scheduleAll(scheduler, false);
-                               fail("This should fail with a 
NoResourceAvailableException");
-                       }
-                       catch (NoResourceAvailableException e) {
-                               // bam! we are good...
-                               assertTrue(e.getMessage().contains(hostname2));
-                       }
-                       catch (Exception e) {
-                               fail("Wrong exception type");
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testScheduleWithUnfulfillableConstraintInSharingGroup() {
-               
-               // same test as above, which swapped host names to guard 
against "accidentally worked" because of
-               // the order in which requests are handles by internal data 
structures
-               
-               try {
-                       final byte[] address1 = { 10, 0, 1, 4 };
-                       final byte[] address2 = { 10, 0, 1, 5 };
-                       
-                       final String hostname1 = "host1";
-                       final String hostname2 = "host2";
-                       
-                       // prepare the scheduler
-                       Instance instance1 = getInstance(address1, 6789, 
hostname1);
-                       Instance instance2 = getInstance(address2, 6789, 
hostname2);
-                       
-                       Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
-                       scheduler.newInstanceAvailable(instance1);
-                       
-                       // prepare the execution graph
-                       JobVertex jobVertex1 = new JobVertex("v1", new 
JobVertexID());
-                       JobVertex jobVertex2 = new JobVertex("v2", new 
JobVertexID());
-                       
-                       jobVertex1.setInvokableClass(DummyInvokable.class);
-                       jobVertex2.setInvokableClass(DummyInvokable.class);
-                       
-                       jobVertex1.setParallelism(1);
-                       jobVertex2.setParallelism(1);
-                       
-                       JobGraph jg = new JobGraph("test job", jobVertex1, 
jobVertex2);
-                       
-                       SlotSharingGroup sharingGroup = new SlotSharingGroup();
-                       jobVertex1.setSlotSharingGroup(sharingGroup);
-                       jobVertex2.setSlotSharingGroup(sharingGroup);
-                       
-                       ExecutionGraph eg = new ExecutionGraph(
-                                       TestingUtils.defaultExecutionContext(),
-                                       jg.getJobID(),
-                                       jg.getName(),
-                                       jg.getJobConfiguration(),
-                                       new SerializedValue<>(new 
ExecutionConfig()),
-                                       timeout,
-                                       new NoRestartStrategy());
-                       eg.attachJobGraph(Arrays.asList(jobVertex1, 
jobVertex2));
-                       
-                       ExecutionJobVertex ejv = 
eg.getAllVertices().get(jobVertex1.getID());
-                       ExecutionVertex[] vertices = ejv.getTaskVertices();
-                       
-                       
vertices[0].setLocationConstraintHosts(Collections.singletonList(instance2.getInstanceConnectionInfo()));
-                       vertices[0].setScheduleLocalOnly(true);
-                       
-                       try {
-                               ejv.scheduleAll(scheduler, false);
-                               fail("This should fail with a 
NoResourceAvailableException");
-                       }
-                       catch (NoResourceAvailableException e) {
-                               // bam! we are good...
-                               assertTrue(e.getMessage().contains(hostname2));
-                       }
-                       catch (Exception e) {
-                               fail("Wrong exception type");
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testArchivingClearsFields() {
-               try {
-                       JobVertex vertex = new JobVertex("test vertex", new 
JobVertexID());
-                       JobGraph jg = new JobGraph("test job", vertex);
-                       
-                       ExecutionGraph eg = new ExecutionGraph(
-                                       TestingUtils.defaultExecutionContext(),
-                                       jg.getJobID(),
-                                       jg.getName(),
-                                       jg.getJobConfiguration(),
-                                       new SerializedValue<>(new 
ExecutionConfig()),
-                                       timeout,
-                                       new NoRestartStrategy());
-                       eg.attachJobGraph(Collections.singletonList(vertex));
-                       
-                       ExecutionVertex ev = 
eg.getAllVertices().get(vertex.getID()).getTaskVertices()[0];
-                       
-                       Instance instance = 
ExecutionGraphTestUtils.getInstance(DummyActorGateway.INSTANCE);
-                       
ev.setLocationConstraintHosts(Collections.singletonList(instance.getInstanceConnectionInfo()));
-                       
-                       assertNotNull(ev.getPreferredLocations());
-                       assertEquals(instance, 
ev.getPreferredLocations().iterator().next());
-                       
-                       // transition to a final state
-                       eg.fail(new Exception());
-                       
-                       eg.prepareForArchiving();
-                       
-                       assertTrue(ev.getPreferredLocations() == null || 
!ev.getPreferredLocations().iterator().hasNext());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       public static Instance getInstance(byte[] ipAddress, int dataPort, 
String hostname) throws Exception {
-               HardwareDescription hardwareDescription = new 
HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
-               
-               TaskManagerLocation connection = 
mock(TaskManagerLocation.class);
-               
when(connection.address()).thenReturn(InetAddress.getByAddress(ipAddress));
-               when(connection.dataPort()).thenReturn(dataPort);
-               
when(connection.addressString()).thenReturn(InetAddress.getByAddress(ipAddress).toString());
-               when(connection.getHostname()).thenReturn(hostname);
-               when(connection.getFQDNHostname()).thenReturn(hostname);
-               
-               return new Instance(
-                               new ExecutionGraphTestUtils.SimpleActorGateway(
-                                               
TestingUtils.defaultExecutionContext()),
-                               connection,
-                               ResourceID.generate(),
-                               new InstanceID(),
-                               hardwareDescription,
-                               1);
-       }
-}

Reply via email to