SLIDER-219. Multiple dynamic ports may be asked as part of the same config value
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/1894a0ca Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/1894a0ca Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/1894a0ca Branch: refs/heads/feature/SLIDER-151_REST_API Commit: 1894a0ca206d4d024f7b3062133ab1039ca5988b Parents: a9a91b4 Author: Sumit Mohanty <smoha...@hortonworks.com> Authored: Tue Jul 29 23:33:00 2014 -0700 Committer: Sumit Mohanty <smoha...@hortonworks.com> Committed: Tue Aug 5 18:51:58 2014 -0700 ---------------------------------------------------------------------- app-packages/memcached/appConfig.json | 3 +- app-packages/memcached/metainfo.xml | 6 ++ .../python/agent/CustomServiceOrchestrator.py | 57 ++++++++-- .../agent/TestCustomServiceOrchestrator.py | 107 +++++++++++++++++-- slider-agent/src/test/python/agent/TestGrep.py | 2 +- slider-agent/src/test/python/agent/TestMain.py | 4 +- .../src/test/python/agent/TestRegistration.py | 4 +- .../providers/agent/AgentProviderService.java | 3 + .../agent/application/metadata/Component.java | 13 +++ .../application/metadata/MetainfoParser.java | 4 + .../agent/TestAgentProviderService.java | 25 ++++- 11 files changed, 202 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1894a0ca/app-packages/memcached/appConfig.json ---------------------------------------------------------------------- diff --git a/app-packages/memcached/appConfig.json b/app-packages/memcached/appConfig.json index d205591..11fb863 100644 --- a/app-packages/memcached/appConfig.json +++ b/app-packages/memcached/appConfig.json @@ -10,7 +10,8 @@ "site.global.additional_cp": "/usr/lib/hadoop/lib/*", "site.global.xmx_val": "256m", "site.global.xms_val": "128m", - "site.global.memory_val": "200M" + "site.global.memory_val": "200M", + "site.global.listen_port": "${MEMCACHED.ALLOCATED_PORT}{DEFAULT_11211}" }, "components": { "slider-appmaster": { http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1894a0ca/app-packages/memcached/metainfo.xml ---------------------------------------------------------------------- diff --git a/app-packages/memcached/metainfo.xml b/app-packages/memcached/metainfo.xml index d435b84..0550b7d 100644 --- a/app-packages/memcached/metainfo.xml +++ b/app-packages/memcached/metainfo.xml @@ -27,6 +27,12 @@ <component> <name>MEMCACHED</name> <category>MASTER</category> + <exports> + <export> + <name>host_port</name> + <value>${THIS_HOST}:${site.global.listen_port}</value> + </export> + </exports> <commandScript> <script>scripts/memcached.py</script> <scriptType>PYTHON</scriptType> http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1894a0ca/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py b/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py index 4eb22bf..7d740c2 100644 --- a/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py +++ b/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py @@ -52,9 +52,9 @@ class CustomServiceOrchestrator(): self.tmp_dir = config.getResolvedPath(AgentConfig.APP_TASK_DIR) self.python_executor = PythonExecutor(self.tmp_dir, config) self.status_commands_stdout = os.path.realpath(posixpath.join(self.tmp_dir, - 'status_command_stdout.txt')) + 'status_command_stdout.txt')) self.status_commands_stderr = os.path.realpath(posixpath.join(self.tmp_dir, - 'status_command_stderr.txt')) + 'status_command_stderr.txt')) self.public_fqdn = hostname.public_hostname() self.applied_configs = {} # Clean up old status command files if any @@ -81,7 +81,7 @@ class CustomServiceOrchestrator(): script_tuple = (script_path, self.base_dir) tmpstrucoutfile = os.path.realpath(posixpath.join(self.tmp_dir, - "structured-out-{0}.json".format(task_id))) + "structured-out-{0}.json".format(task_id))) if script_type.upper() != self.SCRIPT_TYPE_PYTHON: # We don't support anything else yet message = "Unknown script type {0}".format(script_type) @@ -97,9 +97,9 @@ class CustomServiceOrchestrator(): for py_file, current_base_dir in filtered_py_file_list: script_params = [command_name, json_path, current_base_dir] python_paths = [os.path.realpath(posixpath.join(self.config.getWorkRootPath(), - "infra", "agent", "slider-agent", "jinja2")), + "infra", "agent", "slider-agent", "jinja2")), os.path.realpath(posixpath.join(self.config.getWorkRootPath(), - "infra", "agent", "slider-agent"))] + "infra", "agent", "slider-agent"))] if platform.system() != "Windows": environment_vars = [("PYTHONPATH", ":".join(python_paths))] else: @@ -137,8 +137,8 @@ class CustomServiceOrchestrator(): # Irrespective of the outcome report the folder paths if command_name == 'INSTALL': ret[Constants.FOLDERS] = { - Constants.AGENT_LOG_ROOT : self.config.getLogPath(), - Constants.AGENT_WORK_ROOT : self.config.getWorkRootPath() + Constants.AGENT_LOG_ROOT: self.config.getLogPath(), + Constants.AGENT_WORK_ROOT: self.config.getWorkRootPath() } return ret @@ -249,9 +249,7 @@ class CustomServiceOrchestrator(): value = value.replace("${AGENT_LOG_ROOT}", self.config.getLogPath()) if port_allocation_req in value: - port = self.allocate_port() - value = value.replace(port_allocation_req, str(port)) - logger.info("Allocated port " + str(port) + " for " + port_allocation_req) + value = self.allocate_ports(value, port_allocation_req) allocated_ports[k] = value command['configurations'][key][k] = value pass @@ -266,7 +264,34 @@ class CustomServiceOrchestrator(): pass - def allocate_port(self): + def allocate_ports(self, value, port_req_pattern): + default_port_pattern = "{DEFAULT_" + index = value.find(port_req_pattern) + while index != -1: + replaced_pattern = port_req_pattern + def_port = None + if index == value.find(port_req_pattern + default_port_pattern): + replaced_pattern = port_req_pattern + default_port_pattern + start_index = index + len(replaced_pattern) + end_index = value.find("}", start_index) + def_port_str = value[start_index:end_index] + def_port = int(def_port_str) + replaced_pattern = replaced_pattern + def_port_str + "}" + pass + port = self.allocate_port(def_port) + value = value.replace(replaced_pattern, str(port), 1) + logger.info("Allocated port " + str(port) + " for " + replaced_pattern) + index = value.find(port_req_pattern) + pass + return value + pass + + + def allocate_port(self, default_port=None): + if default_port != None: + if self.is_port_available(default_port): + return default_port + MAX_ATTEMPT = 5 iter = 0 port = -1 @@ -284,4 +309,14 @@ class CustomServiceOrchestrator(): logger.info("Allocated dynamic port: " + str(port)) return port + def is_port_available(self, port): + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(0.2) + sock.connect(('127.0.0.1', port)) + sock.close() + except: + return True + return False + http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1894a0ca/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py b/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py index d2439b1..3b41bd7 100644 --- a/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py +++ b/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py @@ -34,6 +34,7 @@ from CustomServiceOrchestrator import CustomServiceOrchestrator from mock.mock import MagicMock, patch import StringIO import sys +from socket import socket class TestCustomServiceOrchestrator(TestCase): @@ -178,13 +179,13 @@ class TestCustomServiceOrchestrator(TestCase): pass - @patch.object(CustomServiceOrchestrator, "allocate_port") + @patch.object(CustomServiceOrchestrator, "allocate_ports") @patch.object(CustomServiceOrchestrator, "resolve_script_path") @patch.object(PythonExecutor, "run_file") def test_runCommand_get_port(self, run_file_mock, resolve_script_path_mock, - allocate_port_mock): + allocate_ports_mock): command = { 'role': 'HBASE_REGIONSERVER', 'hostLevelParams': { @@ -212,7 +213,7 @@ class TestCustomServiceOrchestrator(TestCase): config.getWorkRootPath.return_value = tempdir config.getLogPath.return_value = tempdir - allocate_port_mock.return_value = 10233 + allocate_ports_mock.return_value = str(10233) resolve_script_path_mock.return_value = "/basedir/scriptpath" dummy_controller = MagicMock() @@ -230,6 +231,100 @@ class TestCustomServiceOrchestrator(TestCase): self.assertEqual(run_file_mock.call_count, 1) + @patch.object(socket, "close") + @patch.object(socket, "connect") + def test_allocate_port_def(self, socket_connect_mock, socket_close_mock): + e = OSError() + socket_connect_mock.side_effect = e + tempdir = tempfile.gettempdir() + config = MagicMock() + config.get.return_value = "something" + config.getResolvedPath.return_value = tempdir + config.getWorkRootPath.return_value = tempdir + config.getLogPath.return_value = tempdir + + dummy_controller = MagicMock() + orchestrator = CustomServiceOrchestrator(config, dummy_controller) + ret = orchestrator.allocate_port(10) + self.assertEqual(ret, 10) + + @patch.object(socket, "getsockname") + @patch.object(socket, "bind") + @patch.object(socket, "close") + @patch.object(socket, "connect") + def test_allocate_port_new(self, socket_connect_mock, socket_close_mock, + socket_bind_mock, socket_getsockname_mock): + tempdir = tempfile.gettempdir() + config = MagicMock() + config.get.return_value = "something" + config.getResolvedPath.return_value = tempdir + config.getWorkRootPath.return_value = tempdir + config.getLogPath.return_value = tempdir + + dummy_controller = MagicMock() + orchestrator = CustomServiceOrchestrator(config, dummy_controller) + socket_getsockname_mock.return_value = [100, 101] + ret = orchestrator.allocate_port(10) + self.assertEqual(ret, 101) + + @patch.object(socket, "getsockname") + @patch.object(socket, "bind") + def test_allocate_port_no_def(self, socket_bind_mock, socket_getsockname_mock): + tempdir = tempfile.gettempdir() + config = MagicMock() + config.get.return_value = "something" + config.getResolvedPath.return_value = tempdir + config.getWorkRootPath.return_value = tempdir + config.getLogPath.return_value = tempdir + + dummy_controller = MagicMock() + orchestrator = CustomServiceOrchestrator(config, dummy_controller) + socket_getsockname_mock.return_value = [100, 102] + ret = orchestrator.allocate_port() + self.assertEqual(ret, 102) + + + @patch.object(CustomServiceOrchestrator, "allocate_port") + def test_allocate_port_combinations(self, allocate_port_mock): + tempdir = tempfile.gettempdir() + config = MagicMock() + config.get.return_value = "something" + config.getResolvedPath.return_value = tempdir + config.getWorkRootPath.return_value = tempdir + config.getLogPath.return_value = tempdir + + dummy_controller = MagicMock() + orchestrator = CustomServiceOrchestrator(config, dummy_controller) + + allocate_port_mock.side_effect = [101, 102, 103] + ret = orchestrator.allocate_ports("1000", "${A.ALLOCATED_PORT}") + self.assertEqual(ret, "1000") + ret = orchestrator.allocate_ports("${A.ALLOCATED_PORT}", "${A.ALLOCATED_PORT}") + self.assertEqual(ret, "101") + ret = orchestrator.allocate_ports("${A.ALLOCATED_PORT},${A.ALLOCATED_PORT}", "${A.ALLOCATED_PORT}") + self.assertEqual(ret, "102,103") + + + @patch.object(CustomServiceOrchestrator, "is_port_available") + def test_allocate_port_combinations2(self, is_port_available_mock): + tempdir = tempfile.gettempdir() + config = MagicMock() + config.get.return_value = "something" + config.getResolvedPath.return_value = tempdir + config.getWorkRootPath.return_value = tempdir + config.getLogPath.return_value = tempdir + + dummy_controller = MagicMock() + orchestrator = CustomServiceOrchestrator(config, dummy_controller) + + is_port_available_mock.return_value = True + ret = orchestrator.allocate_ports("${A.ALLOCATED_PORT}{DEFAULT_1005}", "${A.ALLOCATED_PORT}") + self.assertEqual(ret, "1005") + ret = orchestrator.allocate_ports("${A.ALLOCATED_PORT}{DEFAULT_1005}-${A.ALLOCATED_PORT}{DEFAULT_1006}", + "${A.ALLOCATED_PORT}") + self.assertEqual(ret, "1005-1006") + + @patch("hostname.public_hostname") @patch("os.path.isfile") @patch("os.unlink") @@ -353,8 +448,8 @@ class TestCustomServiceOrchestrator(TestCase): status = orchestrator.requestComponentStatus(status_command) self.assertEqual(CustomServiceOrchestrator.DEAD_STATUS, status['exitcode']) - @patch.object(CustomServiceOrchestrator, "allocate_port") - def test_finalize_command(self, mock_allocate_port): + @patch.object(CustomServiceOrchestrator, "allocate_ports") + def test_finalize_command(self, mock_allocate_ports): dummy_controller = MagicMock() tempdir = tempfile.gettempdir() tempWorkDir = tempdir + "W" @@ -363,7 +458,7 @@ class TestCustomServiceOrchestrator(TestCase): config.getResolvedPath.return_value = tempdir config.getWorkRootPath.return_value = tempWorkDir config.getLogPath.return_value = tempdir - mock_allocate_port.return_value = "10023" + mock_allocate_ports.return_value = "10023" orchestrator = CustomServiceOrchestrator(config, dummy_controller) command = {} http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1894a0ca/slider-agent/src/test/python/agent/TestGrep.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/test/python/agent/TestGrep.py b/slider-agent/src/test/python/agent/TestGrep.py index 75f0093..351befb 100644 --- a/slider-agent/src/test/python/agent/TestGrep.py +++ b/slider-agent/src/test/python/agent/TestGrep.py @@ -19,7 +19,7 @@ limitations under the License. ''' from unittest import TestCase -from agent.Grep import Grep +from Grep import Grep import socket import os, sys import logging http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1894a0ca/slider-agent/src/test/python/agent/TestMain.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/test/python/agent/TestMain.py b/slider-agent/src/test/python/agent/TestMain.py index dc3a199..bc8d2a3 100644 --- a/slider-agent/src/test/python/agent/TestMain.py +++ b/slider-agent/src/test/python/agent/TestMain.py @@ -26,11 +26,11 @@ import unittest from agent import ProcessHelper, main import logging import signal -from agent.AgentConfig import AgentConfig +from AgentConfig import AgentConfig import ConfigParser import os import tempfile -from agent.Controller import Controller +from Controller import Controller from optparse import OptionParser logger = logging.getLogger() http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1894a0ca/slider-agent/src/test/python/agent/TestRegistration.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/test/python/agent/TestRegistration.py b/slider-agent/src/test/python/agent/TestRegistration.py index 8647d0d..5245af9 100644 --- a/slider-agent/src/test/python/agent/TestRegistration.py +++ b/slider-agent/src/test/python/agent/TestRegistration.py @@ -25,8 +25,8 @@ import errno import tempfile from mock.mock import patch from mock.mock import MagicMock -from agent.Register import Register -from agent.AgentConfig import AgentConfig +from Register import Register +from AgentConfig import AgentConfig class TestRegistration(TestCase): http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1894a0ca/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java index a73aaab..f878b36 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java +++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java @@ -109,6 +109,7 @@ public class AgentProviderService extends AbstractProviderService implements private static final String CONTAINER_ID = "container_id"; private static final String GLOBAL_CONFIG_TAG = "global"; private static final String LOG_FOLDERS_TAG = "LogFolders"; + private static final String COMPONENT_DATA_TAG = "ComponentData"; private static final int MAX_LOG_ENTRIES = 20; private static final int DEFAULT_HEARTBEAT_MONITOR_INTERVAL = 60 * 1000; private final Object syncLock = new Object(); @@ -126,6 +127,7 @@ public class AgentProviderService extends AbstractProviderService implements return size() > MAX_LOG_ENTRIES; } }); + private Map<String, Map<String, String>> componentData = new ConcurrentHashMap(); private Boolean canAnyMasterPublish = null; private AgentLaunchParameter agentLaunchParameter = null; @@ -567,6 +569,7 @@ public class AgentProviderService extends AbstractProviderService implements publishComponentConfiguration(LOG_FOLDERS_TAG, LOG_FOLDERS_TAG, (new HashMap<>(this.workFolders)).entrySet()); } + /** * Process return status for component instances * @param heartBeat http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1894a0ca/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Component.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Component.java b/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Component.java index 6cd08e0..9dba87c 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Component.java +++ b/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/Component.java @@ -16,6 +16,9 @@ */ package org.apache.slider.providers.agent.application.metadata; +import java.util.ArrayList; +import java.util.List; + /** * */ @@ -26,9 +29,11 @@ public class Component { String minInstanceCount; String maxInstanceCount; CommandScript commandScript; + List<Export> exports; public Component() { publishConfig = Boolean.FALSE.toString(); + exports = new ArrayList<>(); } public String getName() { @@ -79,6 +84,14 @@ public class Component { this.commandScript = commandScript; } + public void addExport(Export export) { + exports.add(export); + } + + public List<Export> getExports() { + return exports; + } + @Override public String toString() { final StringBuilder sb = http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1894a0ca/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/MetainfoParser.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/MetainfoParser.java b/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/MetainfoParser.java index c7922a7..d2801d4 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/MetainfoParser.java +++ b/slider-core/src/main/java/org/apache/slider/providers/agent/application/metadata/MetainfoParser.java @@ -58,6 +58,10 @@ public class MetainfoParser { digester.addBeanPropertySetter("*/component/publishConfig"); digester.addBeanPropertySetter("*/component/minInstanceCount"); digester.addBeanPropertySetter("*/component/maxInstanceCount"); + digester.addObjectCreate("*/component/export", Export.class); + digester.addBeanPropertySetter("*/component/export/name"); + digester.addBeanPropertySetter("*/component/export/value"); + digester.addSetNext("*/component/export", "addExport"); digester.addSetNext("*/component", "addComponent"); digester.addObjectCreate("*/commandScript", CommandScript.class); http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1894a0ca/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java b/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java index 0a42e94..c34d0ea 100644 --- a/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java +++ b/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java @@ -151,6 +151,16 @@ public class TestAgentProviderService { + " <script>scripts/hbase_regionserver.py</script>\n" + " <scriptType>PYTHON</scriptType>\n" + " </commandScript>\n" + + " <exports>\n" + + " <export>\n" + + " <name>PropertyA</name>\n" + + " <value>${THIS_HOST}:${site.global.listen_port}</value>\n" + + " </export>\n" + + " <export>\n" + + " <name>PropertyB</name>\n" + + " <value>AConstant</value>\n" + + " </export>\n" + + " </exports>\n" + " </component>\n" + " </components>\n" + " <osSpecifics>\n" @@ -414,6 +424,7 @@ public class TestAgentProviderService { Assert.assertEquals(component.getMaxInstanceCount(), "2"); Assert.assertEquals(component.getCommandScript().getScript(), "scripts/hbase_master.py"); Assert.assertEquals(component.getCategory(), "MASTER"); + Assert.assertEquals(component.getExports().size(), 0); found++; } if (component.getName().equals("HBASE_REGIONSERVER")) { @@ -421,16 +432,24 @@ public class TestAgentProviderService { Assert.assertNull(component.getMaxInstanceCount()); Assert.assertEquals(component.getCommandScript().getScript(), "scripts/hbase_regionserver.py"); Assert.assertEquals(component.getCategory(), "SLAVE"); + Assert.assertEquals(component.getExports().size(), 2); + List<Export> es = component.getExports(); + Export e = es.get(0); + Assert.assertEquals(e.getName(), "PropertyA"); + Assert.assertEquals(e.getValue(), "${THIS_HOST}:${site.global.listen_port}"); + e = es.get(1); + Assert.assertEquals(e.getName(), "PropertyB"); + Assert.assertEquals(e.getValue(), "AConstant"); found++; } } Assert.assertEquals(found, 2); - assert application.getExportGroups().size() == 1; + Assert.assertEquals(application.getExportGroups().size(), 1); List<ExportGroup> egs = application.getExportGroups(); ExportGroup eg = egs.get(0); - assert eg.getName().equals("QuickLinks"); - assert eg.getExports().size() == 2; + Assert.assertEquals(eg.getName(), "QuickLinks"); + Assert.assertEquals(eg.getExports().size(), 2); found = 0; for (Export export : eg.getExports()) {