Repository: incubator-slider
Updated Branches:
  refs/heads/develop fc13c15c1 -> 971918661


SLIDER-558 enable port ranges for all containers


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/97191866
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/97191866
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/97191866

Branch: refs/heads/develop
Commit: 971918661fb0ce973b4efb488323ca85c24f57a1
Parents: fc13c15
Author: Jon Maron <jma...@hortonworks.com>
Authored: Sat Oct 25 17:21:01 2014 -0400
Committer: Jon Maron <jma...@hortonworks.com>
Committed: Sat Oct 25 17:21:01 2014 -0400

----------------------------------------------------------------------
 .../python/agent/CustomServiceOrchestrator.py   | 68 ++++++++++++++++----
 .../agent/TestCustomServiceOrchestrator.py      | 28 ++++++++
 .../org/apache/slider/common/SliderKeys.java    |  2 +-
 .../server/appmaster/SliderAppMaster.java       |  4 +-
 .../standalone/TestStandaloneAgentAM.groovy     | 47 ++++++++++++++
 5 files changed, 135 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/97191866/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py 
b/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py
index 3932287..119c926 100644
--- a/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py
+++ b/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py
@@ -22,6 +22,7 @@ import logging
 import os
 import json
 import pprint
+import random
 import sys
 import socket
 import posixpath
@@ -33,6 +34,7 @@ from PythonExecutor import PythonExecutor
 import hostname
 import Constants
 
+MAX_ATTEMPTS = 5
 
 logger = logging.getLogger()
 
@@ -252,13 +254,13 @@ class CustomServiceOrchestrator():
   Its of the form 
{component_name.ALLOCATED_PORT}[{DEFAULT_default_port}][{PER_CONTAINER}]
   Either a port gets allocated or if not then just set the value to "0"
   """
-
   def finalize_command(self, command, store_command, allocated_ports):
     component = command['componentName']
     allocated_for_this_component_format = "${{{0}.ALLOCATED_PORT}}"
     allocated_for_any = ".ALLOCATED_PORT}"
 
     port_allocation_req = allocated_for_this_component_format.format(component)
+    allowed_ports = self.get_allowed_ports(command)
     if 'configurations' in command:
       for key in command['configurations']:
         if len(command['configurations'][key]) > 0:
@@ -269,7 +271,7 @@ class CustomServiceOrchestrator():
               value = value.replace("${AGENT_LOG_ROOT}",
                                     self.config.getLogPath())
               if port_allocation_req in value:
-                value = self.allocate_ports(value, port_allocation_req)
+                value = self.allocate_ports(value, port_allocation_req, 
allowed_ports)
                 allocated_ports[key + "." + k] = value
               elif allocated_for_any in value:
                 ## All unallocated ports should be set to 0
@@ -323,7 +325,7 @@ class CustomServiceOrchestrator():
     append {DEFAULT_ and find the default value
     append {PER_CONTAINER} if it exists
   """
-  def allocate_ports(self, value, port_req_pattern):
+  def allocate_ports(self, value, port_req_pattern, allowed_ports=None):
     default_port_pattern = "{DEFAULT_"
     do_not_propagate_pattern = "{PER_CONTAINER}"
     index = value.find(port_req_pattern)
@@ -345,7 +347,7 @@ class CustomServiceOrchestrator():
       if index == value.find(replaced_pattern + do_not_propagate_pattern):
         replaced_pattern = replaced_pattern + do_not_propagate_pattern
         pass
-      port = self.allocate_port(def_port)
+      port = self.allocate_port(def_port, allowed_ports)
       value = value.replace(replaced_pattern, str(port), 1)
       logger.info("Allocated port " + str(port) + " for " + replaced_pattern)
       index = value.find(port_req_pattern)
@@ -354,24 +356,28 @@ class CustomServiceOrchestrator():
     pass
 
 
-  def allocate_port(self, default_port=None):
+  def allocate_port(self, default_port=None, allowed_ports=None):
     if default_port != None:
       if self.is_port_available(default_port):
         return default_port
 
-    MAX_ATTEMPT = 5
-    iter = 0
+    port_list = [0] * MAX_ATTEMPTS
+    if allowed_ports != None:
+      port_list = allowed_ports
+
+    i = 0
     port = -1
-    while iter < MAX_ATTEMPT:
-      iter = iter + 1
+    itor = iter(port_list)
+    while i < min(len(port_list), MAX_ATTEMPTS):
       try:
         sock = socket.socket()
-        sock.bind(('', 0))
+        sock.bind(('', itor.next()))
         port = sock.getsockname()[1]
       except Exception, err:
-        logger.info("Encountered error while trying to opening socket - " + 
str(err))
+        logger.info("Encountered error while trying to open socket - " + 
str(err))
       finally:
         sock.close()
+      i = i + 1
       pass
     logger.info("Allocated dynamic port: " + str(port))
     return port
@@ -387,3 +393,43 @@ class CustomServiceOrchestrator():
     return False
 
 
+  def get_allowed_ports(self, command):
+      allowed_ports = None
+      global_config = command['configurations'].get('global')
+      if global_config != None:
+          allowed_ports_value = global_config.get("slider.allowed.ports")
+          if allowed_ports_value:
+              allowed_ports = self.get_allowed_port_list(allowed_ports_value)
+
+      return allowed_ports
+
+
+  def get_allowed_port_list(self, allowedPortsOptionValue,
+                            num_values=MAX_ATTEMPTS):
+    selection = set()
+    invalid = set()
+    # tokens are comma seperated values
+    tokens = [x.strip() for x in allowedPortsOptionValue.split(',')]
+    for i in tokens:
+      try:
+        selection.add(int(i))
+      except:
+        # should be a range
+        try:
+          token = [int(k.strip()) for k in i.split('-')]
+          if len(token) > 1:
+            token.sort()
+            first = token[0]
+            last = token[len(token)-1]
+            for x in range(first, last+1):
+              selection.add(x)
+        except:
+          # not an int and not a range...
+          invalid.add(i)
+    selection = random.sample(selection, min (len(selection), num_values))
+    # Report invalid tokens before returning valid selection
+    logger.info("Allowed port values: " + str(selection))
+    logger.warning("Invalid port range values: " + str(invalid))
+    return selection
+
+

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/97191866/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git 
a/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py 
b/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py
index 6ada7fa..a4cef94 100644
--- a/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py
+++ b/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py
@@ -544,6 +544,34 @@ class TestCustomServiceOrchestrator(TestCase):
     self.assertFalse(port == -1)
     self.assertTrue(port > 0)
 
+
+  def test_parse_allowed_port_values(self):
+    dummy_controller = MagicMock()
+    tempdir = tempfile.gettempdir()
+    tempWorkDir = tempdir + "W"
+    config = MagicMock()
+    config.get.return_value = "something"
+    config.getResolvedPath.return_value = tempdir
+    config.getWorkRootPath.return_value = tempWorkDir
+    config.getLogPath.return_value = tempdir
+
+    orchestrator = CustomServiceOrchestrator(config, dummy_controller)
+    port_range = "48000-48005"
+    port_range_full_list = [48000, 48001, 48002, 48003, 48004, 48005]
+    allowed_ports = orchestrator.get_allowed_port_list(port_range, 3)
+    self.assertTrue(set(allowed_ports).issubset(port_range_full_list))
+
+    port_range = "48000 , 48005"
+    port_range_full_list = [48000, 48005]
+    allowed_ports = orchestrator.get_allowed_port_list(port_range, 1)
+    self.assertTrue(set(allowed_ports).issubset(port_range_full_list))
+
+    port_range = "48000 , 48004-48005"
+    port_range_full_list = [48000, 48004, 48005]
+    allowed_ports = orchestrator.get_allowed_port_list(port_range, 2)
+    self.assertTrue(set(allowed_ports).issubset(port_range_full_list))
+
+
   def tearDown(self):
     # enable stdout
     sys.stdout = sys.__stdout__

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/97191866/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java 
b/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java
index 048dfa7..5f16e56 100644
--- a/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java
+++ b/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java
@@ -189,5 +189,5 @@ public interface SliderKeys extends SliderXmlConfKeys {
   String AM_FILTER_NAME =
       "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer";
 
-  String KEY_AM_ALLOWED_PORT_RANGE = "slider.am.allowed.port.range";
+  String KEY_ALLOWED_PORT_RANGE = "site.global.slider.allowed.ports";
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/97191866/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
index 7a1711f..bb198a1 100644
--- 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -917,8 +917,8 @@ public class SliderAppMaster extends 
AbstractSliderLaunchedService
       throws SliderException {
     int portToRequest = 0;
     String portRange = instanceDefinition.
-        getAppConfOperations().getComponent(SliderKeys.COMPONENT_AM)
-        .getOption(SliderKeys.KEY_AM_ALLOWED_PORT_RANGE , "0");
+        getAppConfOperations().getGlobalOptions().
+          getOption(SliderKeys.KEY_ALLOWED_PORT_RANGE, "0");
     if (!"0".equals(portRange)) {
       if (portScanner == null) {
         portScanner = new PortScanner();

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/97191866/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAgentAM.groovy
----------------------------------------------------------------------
diff --git 
a/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAgentAM.groovy
 
b/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAgentAM.groovy
index 60b4dd8..f04583e 100644
--- 
a/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAgentAM.groovy
+++ 
b/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAgentAM.groovy
@@ -20,17 +20,23 @@ package org.apache.slider.agent.standalone
 
 import groovy.transform.CompileStatic
 import groovy.util.logging.Slf4j
+import org.apache.hadoop.fs.Path
 import org.apache.hadoop.yarn.api.records.ApplicationId
 import org.apache.hadoop.yarn.api.records.ApplicationReport
 import org.apache.hadoop.yarn.api.records.YarnApplicationState
+import org.apache.hadoop.yarn.exceptions.YarnException
 import org.apache.slider.agent.AgentMiniClusterTestBase
 import org.apache.slider.api.ClusterNode
 import org.apache.slider.client.SliderClient
 import org.apache.slider.common.SliderKeys
 import org.apache.slider.common.params.ActionRegistryArgs
+import org.apache.slider.core.build.InstanceBuilder
+import org.apache.slider.core.conf.AggregateConf
 import org.apache.slider.core.exceptions.SliderException
+import org.apache.slider.core.launch.LaunchedApplication
 import org.apache.slider.core.main.LauncherExitCodes
 import org.apache.slider.core.main.ServiceLauncher
+import org.apache.slider.core.persist.LockAcquireFailedException
 import org.junit.Test
 
 @CompileStatic
@@ -159,8 +165,49 @@ class TestStandaloneAgentAM  extends 
AgentMiniClusterTestBase {
         clustername)
     assert instance3.yarnApplicationState >= YarnApplicationState.FINISHED
 
+    //create another AM, this time with a port range
+    setSliderClientClassName(TestSliderClient.name)
+    try {
+      launcher = createStandaloneAM(clustername, true, true)
+      client = launcher.service
+      i2AppID = client.applicationId
+
+      reportFor = client.getApplicationReport(i2AppID)
+      URI uri = new URI(reportFor.originalTrackingUrl)
+      assert uri.port in [60000, 60001, 60002, 60003]
+      assert reportFor.rpcPort in [60000, 60001, 60002, 60003]
 
+      assert 0 == clusterActionFreeze(client, clustername)
+
+    } finally {
+      setSliderClientClassName(SliderClient.name)
+    }
   }
 
 
+  static class TestSliderClient extends SliderClient {
+    @Override
+    protected void persistInstanceDefinition(boolean overwrite,
+                                             Path appconfdir,
+                                             InstanceBuilder builder)
+    throws IOException, SliderException, LockAcquireFailedException {
+      AggregateConf conf = builder.getInstanceDescription()
+      conf.getAppConfOperations().getGlobalOptions().put(
+          SliderKeys.KEY_ALLOWED_PORT_RANGE,
+          "60000-60003")
+      super.persistInstanceDefinition(overwrite, appconfdir, builder)
+    }
+
+    @Override
+    LaunchedApplication launchApplication(String clustername,
+                                          Path clusterDirectory,
+                                          AggregateConf instanceDefinition,
+                                          boolean debugAM)
+    throws YarnException, IOException {
+      instanceDefinition.getAppConfOperations().getGlobalOptions().put(
+          SliderKeys.KEY_ALLOWED_PORT_RANGE,
+          "60000-60003")
+      return super.launchApplication(clustername, clusterDirectory, 
instanceDefinition, debugAM)
+    }
+  }
 }

Reply via email to