Repository: incubator-myriad
Updated Branches:
  refs/heads/master 101bcad35 -> a3ef8c791


MYRIAD-160 Randomizing Mesos ports assigned to NMs to make sure that in case of 
flexdown and subsequent flexup NM does not use previously used port ports 
clashes can still happen, but this minimizes those events.

This closes: #20
Review: #20


Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/a3ef8c79
Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/a3ef8c79
Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/a3ef8c79

Branch: refs/heads/master
Commit: a3ef8c791c77afad609deacda7bcade7e25fae80
Parents: 101bcad
Author: Yuliya Feldman <yfeld...@maprtech.com>
Authored: Mon Oct 26 11:22:13 2015 -0700
Committer: Santosh Marella <mare...@gmail.com>
Committed: Wed Oct 28 11:19:53 2015 -0700

----------------------------------------------------------------------
 .../apache/myriad/scheduler/TaskFactory.java    |  67 +++---
 .../myriad/scheduler/TestRandomPorts.java       | 204 +++++++++++++++++++
 2 files changed, 248 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/a3ef8c79/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java
index 33bc832..ad0ec0d 100644
--- 
a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java
+++ 
b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java
@@ -18,14 +18,17 @@
  */
 package org.apache.myriad.scheduler;
 
+import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.Iterator;
+import java.util.List;
 import java.util.Objects;
+import java.util.Random;
 
 import javax.inject.Inject;
 
 import org.apache.myriad.configuration.MyriadConfiguration;
 import org.apache.myriad.configuration.MyriadExecutorConfiguration;
+import org.apache.myriad.state.NodeTask;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
@@ -39,6 +42,7 @@ import org.apache.mesos.Protos.Resource;
 import org.apache.mesos.Protos.TaskInfo;
 import org.apache.mesos.Protos.TaskID;
 import org.apache.mesos.Protos.Value;
+import org.apache.mesos.Protos.Value.Range;
 import org.apache.mesos.Protos.Value.Scalar;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,7 +58,7 @@ public interface TaskFactory {
   static final String YARN_HTTP_POLICY = "yarn.http.policy";
   static final String YARN_HTTP_POLICY_HTTPS_ONLY = "HTTPS_ONLY";
 
-  TaskInfo createTask(Offer offer, FrameworkID frameworkId, TaskID taskId, 
org.apache.myriad.state.NodeTask nodeTask);
+  TaskInfo createTask(Offer offer, FrameworkID frameworkId, TaskID taskId, 
NodeTask nodeTask);
 
   // TODO(Santosh): This is needed because the ExecutorInfo constructed
   // to launch NM needs to be specified to launch placeholder tasks for
@@ -72,6 +76,7 @@ public interface TaskFactory {
     public static final String YARN_NODEMANAGER_OPTS_KEY = 
"YARN_NODEMANAGER_OPTS";
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(NMTaskFactoryImpl.class);
+    private static final Random rand = new Random();
     private MyriadConfiguration cfg;
     private TaskUtils taskUtils;
     private ExecutorCommandLineGenerator clGenerator;
@@ -85,32 +90,47 @@ public interface TaskFactory {
       this.constraints = new NMTaskConstraints();
     }
 
+    @VisibleForTesting
+    protected static HashSet<Long> getNMPorts(Resource resource) {
+      HashSet<Long> ports = new HashSet<>();
+      if (resource.getName().equals("ports")){
+        /*
+        ranges.getRangeList() returns a list of ranges, each range specifies a 
begin and end only.
+        so must loop though each range until we get all ports needed.  We exit 
each loop as soon as all
+        ports are found so bounded by NMPorts.expectedNumPorts.
+        */
+        final List<Range> ranges = resource.getRanges().getRangeList();
+        final List<Long> allAvailablePorts = new ArrayList<>();
+        for (Range range : ranges) {
+          if (range.hasBegin() && range.hasEnd()) {
+            for (long i = range.getBegin(); i <= range.getEnd(); i++) {
+              allAvailablePorts.add(i);
+            }
+          }
+        }
+        final int allAvailablePortsSize = allAvailablePorts.size();
+        Preconditions.checkState(allAvailablePorts.size() >= 
NMPorts.expectedNumPorts(), "Not enough ports in offer");
+        
+        while (ports.size() < NMPorts.expectedNumPorts()) {
+          int portIndex = rand.nextInt(allAvailablePortsSize);
+          ports.add(allAvailablePorts.get(portIndex));
+        }        
+      }
+      return ports;
+    }
+    
     //Utility function to get the first NMPorts.expectedNumPorts number of 
ports of an offer
-    private static NMPorts getPorts(Offer offer) {
+    @VisibleForTesting
+    protected static NMPorts getPorts(Offer offer) {
       HashSet<Long> ports = new HashSet<>();
       for (Resource resource : offer.getResourcesList()) {
         if (resource.getName().equals("ports")) {
-          /*
-          ranges.getRangeList() returns a list of ranges, each range specifies 
a begin and end only.
-          so must loop though each range until we get all ports needed.  We 
exit each loop as soon as all
-          ports are found so bounded by NMPorts.expectedNumPorts.
-          */
-          Iterator<Value.Range> itr = 
resource.getRanges().getRangeList().iterator();
-          while (itr.hasNext() && ports.size() < NMPorts.expectedNumPorts()) {
-            Value.Range range = itr.next();
-            if (range.getBegin() <= range.getEnd()) {
-              long i = range.getBegin();
-              while (i <= range.getEnd() && ports.size() < 
NMPorts.expectedNumPorts()) {
-                ports.add(i);
-                i++;
-              }
-            }
-          }
+          ports = getNMPorts(resource);
+          break;
         }
       }
 
-      Preconditions.checkState(ports.size() == NMPorts.expectedNumPorts(), 
"Not enough ports in offer");
-      Long[] portArray = ports.toArray(new Long[ports.size()]);
+      Long [] portArray = ports.toArray(new Long [ports.size()]);
       return new NMPorts(portArray);
     }
 
@@ -123,7 +143,8 @@ public interface TaskFactory {
       if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) {
         //Both FrameworkUser and FrameworkSuperuser to get all of the 
directory permissions correct.
         if (!(cfg.getFrameworkUser().isPresent() && 
cfg.getFrameworkSuperUser().isPresent())) {
-          throw new RuntimeException("Trying to use remote distribution, but 
frameworkUser" + "and/or frameworkSuperUser not set!");
+          throw new RuntimeException("Trying to use remote distribution, but 
frameworkUser" +
+            "and/or frameworkSuperUser not set!");
         }
         String nodeManagerUri = 
myriadExecutorConfiguration.getNodeManagerUri().get();
         cmd = clGenerator.generateCommandLine(profile, ports);
@@ -153,7 +174,7 @@ public interface TaskFactory {
     }
 
     @Override
-    public TaskInfo createTask(Offer offer, FrameworkID frameworkId, TaskID 
taskId, org.apache.myriad.state.NodeTask nodeTask) {
+    public TaskInfo createTask(Offer offer, FrameworkID frameworkId, TaskID 
taskId, NodeTask nodeTask) {
       Objects.requireNonNull(offer, "Offer should be non-null");
       Objects.requireNonNull(nodeTask, "NodeTask should be non-null");
 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/a3ef8c79/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestRandomPorts.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestRandomPorts.java
 
b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestRandomPorts.java
new file mode 100644
index 0000000..e8c0e58
--- /dev/null
+++ 
b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/TestRandomPorts.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.myriad.scheduler;
+
+
+import static org.junit.Assert.*;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.Resource;
+import org.apache.mesos.Protos.Value.Range;
+import org.apache.mesos.Protos.Value.Ranges;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.myriad.scheduler.TaskFactory.NMTaskFactoryImpl;
+import com.google.common.collect.Lists;
+
+/**
+ * Test Class to test NM ports randomization
+ *
+ */
+public class TestRandomPorts {
+
+  
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+  }
+
+  @Test
+  public void testRandomPorts() {
+    Range range1 = Range.newBuilder().setBegin(100).setEnd(200).build();
+    Range range2 = Range.newBuilder().setBegin(250).setEnd(300).build();
+    Range range3 = Range.newBuilder().setBegin(310).setEnd(500).build();
+    Range range4 = Range.newBuilder().setBegin(520).setEnd(720).build();
+    Range range5 = Range.newBuilder().setBegin(750).setEnd(1000).build();
+    
+    Ranges ranges = Ranges.newBuilder().addRange(range1)
+        .addRange(range2)
+        .addRange(range3)
+        .addRange(range4)
+        .addRange(range5).build();
+    
+    
+    Resource resource = 
Resource.newBuilder().setType(Protos.Value.Type.RANGES).setRanges(ranges).setName("ports").build();
+    
+    Set<Long> ports = NMTaskFactoryImpl.getNMPorts(resource);
+    
+    assertEquals(NMPorts.expectedNumPorts(), ports.size());
+    List<Long> sortedList = Lists.newArrayList(ports);
+    
+    Collections.sort(sortedList);
+    
+    for (Long port : sortedList) {
+      assertTrue((port >= 100 && port <= 200) ||
+          (port >= 250 && port <= 300) ||
+          (port >= 310 && port <= 500) ||
+          (port >= 520 && port <= 720) ||
+          (port >= 750 && port <= 1000));
+    }
+  }
+
+  @Test
+  public void testRandomPortsNotEnough() {
+    Range range1 = Range.newBuilder().setBegin(100).setEnd(200).build();
+    Range range2 = Range.newBuilder().setBegin(250).setEnd(300).build();
+    
+    Ranges ranges = Ranges.newBuilder().addRange(range1)
+        .addRange(range2)
+        .build();
+    
+    
+    Resource resource = 
Resource.newBuilder().setType(Protos.Value.Type.RANGES).setRanges(ranges).setName("ports").build();
+    
+    Set<Long> ports = NMTaskFactoryImpl.getNMPorts(resource);
+    
+    assertEquals(NMPorts.expectedNumPorts(), ports.size());
+    List<Long> sortedList = Lists.newArrayList(ports);
+    
+    Collections.sort(sortedList);
+
+    for (Long port : sortedList) {
+      assertTrue((port >= 100 && port <= 200) ||
+          (port >= 250 && port <= 300));
+    }    
+  }
+
+  @Test
+  public void testRandomPortsNotEnoughPercentKickIn() {
+    Range range1 = Range.newBuilder().setBegin(100).setEnd(200).build();
+    Range range2 = Range.newBuilder().setBegin(250).setEnd(335).build();
+    
+    Ranges ranges = Ranges.newBuilder().addRange(range1)
+        .addRange(range2)
+        .build();
+    
+    
+    Resource resource = 
Resource.newBuilder().setType(Protos.Value.Type.RANGES).setRanges(ranges).setName("ports").build();
+    
+    Set<Long> ports = NMTaskFactoryImpl.getNMPorts(resource);
+    
+    assertEquals(NMPorts.expectedNumPorts(), ports.size());
+    List<Long> sortedList = Lists.newArrayList(ports);
+    
+    Collections.sort(sortedList);
+
+    for (int i = 0; i < sortedList.size(); i++) {
+      assertTrue((sortedList.get(i) >= 100 && sortedList.get(i) <= 200) ||
+          (sortedList.get(i) >= 250 && sortedList.get(i) <= 335));
+    }
+  }
+  
+  @Test
+  public void testRandomPortsLargeRange() {
+    Range range1 = Range.newBuilder().setBegin(100).setEnd(500).build();
+    Range range2 = Range.newBuilder().setBegin(550).setEnd(835).build();
+    
+    Ranges ranges = Ranges.newBuilder().addRange(range1)
+        .addRange(range2)
+        .build();
+    
+    
+    Resource resource = 
Resource.newBuilder().setType(Protos.Value.Type.RANGES).setRanges(ranges).setName("ports").build();
+    
+    Set<Long> ports = NMTaskFactoryImpl.getNMPorts(resource);
+    
+    assertEquals(NMPorts.expectedNumPorts(), ports.size());
+    List<Long> sortedList = Lists.newArrayList(ports);
+    
+    Collections.sort(sortedList);
+
+    for (int i = 0; i < sortedList.size(); i++) {
+      assertTrue((sortedList.get(i) >= 100 && sortedList.get(i) <= 500) || 
+          (sortedList.get(i) >= 550 && sortedList.get(i) <= 835));
+    }
+  }
+
+  @Test
+  public void testRandomPortsSmallRange() {
+    Range range1 = Range.newBuilder().setBegin(100).setEnd(100).build();
+    Range range2 = Range.newBuilder().setBegin(110).setEnd(115).build();
+    
+    Ranges ranges = Ranges.newBuilder().addRange(range1)
+        .addRange(range2)
+        .build();
+    
+    Resource resource = 
Resource.newBuilder().setType(Protos.Value.Type.RANGES).setRanges(ranges).setName("ports").build();
+    
+    Set<Long> ports = NMTaskFactoryImpl.getNMPorts(resource);
+    
+    assertEquals(NMPorts.expectedNumPorts(), ports.size());
+    List<Long> sortedList = Lists.newArrayList(ports);
+    
+    Collections.sort(sortedList);
+
+    for (int i = 0; i < sortedList.size(); i++) {
+      assertTrue(sortedList.get(i) == 100 || (sortedList.get(i) <= 115 && 
sortedList.get(i) >= 110));
+    }
+  }
+  
+  @Test
+  public void notEnoughPorts() throws Exception {
+    Range range1 = Range.newBuilder().setBegin(100).setEnd(100).build();
+    Range range2 = Range.newBuilder().setBegin(110).setEnd(111).build();
+    
+    Ranges ranges = Ranges.newBuilder().addRange(range1)
+        .addRange(range2)
+        .build();
+    
+    Resource resource = 
Resource.newBuilder().setType(Protos.Value.Type.RANGES).setRanges(ranges).setName("ports").build();
+    
+    try {
+      NMTaskFactoryImpl.getNMPorts(resource);
+      fail("Should fail, as number of ports is not enough");
+    } catch (IllegalStateException ise) {
+      // should get here
+    }
+
+  }
+}

Reply via email to