SLIDER-355. Agent is unable to connect to ZK on windows to retrieve AM host and port SLIDER-356. Slider agent should not default to any host or port value when it cannot communicate with ZK
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/448d598f Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/448d598f Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/448d598f Branch: refs/heads/feature/SLIDER-149_Support_a_YARN_service_registry Commit: 448d598fccf75d2a340cdbbeb45d3913fb473050 Parents: fc534ca Author: Sumit Mohanty <smoha...@hortonworks.com> Authored: Sat Aug 23 19:50:16 2014 -0700 Committer: Sumit Mohanty <smoha...@hortonworks.com> Committed: Sat Aug 23 19:50:30 2014 -0700 ---------------------------------------------------------------------- app-packages/memcached-win/appConfig.json | 1 + app-packages/memcached-win/metainfo.xml | 4 +- slider-agent/src/main/python/agent/Constants.py | 1 + .../src/main/python/agent/Controller.py | 1 - slider-agent/src/main/python/agent/Registry.py | 8 +- slider-agent/src/main/python/agent/main.py | 59 ++++++++----- slider-agent/src/main/python/kazoo/client.py | 6 +- .../src/main/python/kazoo/handlers/threading.py | 6 +- .../src/main/python/kazoo/handlers/utils.py | 87 +++++++++++++++++--- .../main/python/kazoo/protocol/connection.py | 16 ++-- .../src/main/python/kazoo/tests/test_client.py | 6 +- .../main/python/kazoo/tests/test_connection.py | 55 ++++++++----- .../providers/agent/AgentProviderService.java | 3 + 13 files changed, 176 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/448d598f/app-packages/memcached-win/appConfig.json ---------------------------------------------------------------------- diff --git a/app-packages/memcached-win/appConfig.json b/app-packages/memcached-win/appConfig.json index e39feb0..471a974 100644 --- a/app-packages/memcached-win/appConfig.json +++ b/app-packages/memcached-win/appConfig.json @@ -8,6 +8,7 @@ "site.global.app_user": "hadoop", "site.global.pid_file": "${AGENT_WORK_ROOT}/app/run/component.pid", + "site.global.app_root": "${AGENT_WORK_ROOT}\\app\\install", "site.global.additional_cp": "C:\\hdp\\hadoop-2.4.0.2.1.3.0-1990\\share\\hadoop\\common\\lib\\*", "site.global.xmx_val": "256m", "site.global.xms_val": "128m", http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/448d598f/app-packages/memcached-win/metainfo.xml ---------------------------------------------------------------------- diff --git a/app-packages/memcached-win/metainfo.xml b/app-packages/memcached-win/metainfo.xml index 27a8be6..c7e5881 100644 --- a/app-packages/memcached-win/metainfo.xml +++ b/app-packages/memcached-win/metainfo.xml @@ -46,8 +46,8 @@ <osType>any</osType> <packages> <package> - <type>zip</type> - <name>files/jmemcached-1.0.0.zip</name> + <type>folder</type> + <name>files\\jmemcached-1.0.0</name> </package> </packages> </osSpecific> http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/448d598f/slider-agent/src/main/python/agent/Constants.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/Constants.py b/slider-agent/src/main/python/agent/Constants.py index 2975266..f120b94 100644 --- a/slider-agent/src/main/python/agent/Constants.py +++ b/slider-agent/src/main/python/agent/Constants.py @@ -33,3 +33,4 @@ DO_NOT_HEARTBEAT_AFTER_ = "DO_NOT_HEARTBEAT_AFTER_" ZK_QUORUM="zk_quorum" ZK_REG_PATH="zk_reg_path" AUTO_GENERATED="auto_generated" +MAX_AM_CONNECT_RETRIES = 10 http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/448d598f/slider-agent/src/main/python/agent/Controller.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/Controller.py b/slider-agent/src/main/python/agent/Controller.py index dfd0a09..a3fb90d 100644 --- a/slider-agent/src/main/python/agent/Controller.py +++ b/slider-agent/src/main/python/agent/Controller.py @@ -331,7 +331,6 @@ class Controller(threading.Thread): zk_reg_path = self.config.get(AgentConfig.SERVER_SECTION, Constants.ZK_REG_PATH) registry = Registry(zk_quorum, zk_reg_path) amHost, amUnsecuredPort, amSecuredPort = registry.readAMHostPort() - logger.info("Read from ZK registry: AM host = %s, AM secured port = %s" % (amHost, amSecuredPort)) self.hostname = amHost self.secured_port = amSecuredPort self.config.set(AgentConfig.SERVER_SECTION, "hostname", self.hostname) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/448d598f/slider-agent/src/main/python/agent/Registry.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/Registry.py b/slider-agent/src/main/python/agent/Registry.py index c7d6e5d..9d4a387 100644 --- a/slider-agent/src/main/python/agent/Registry.py +++ b/slider-agent/src/main/python/agent/Registry.py @@ -50,14 +50,14 @@ class Registry: # the port needs to be utf-8 encoded amSecuredPort = amSecuredPort.encode('utf8', 'ignore') amUnsecuredPort = amUnsecuredPort.encode('utf8', 'ignore') - except Exception: + except Exception, e: # log and let empty strings be returned - logger.error("Could not connect to zk registry at %s in quorum %s" % - (self.zk_reg_path, self.zk_quorum)) + logger.error("Could not connect to zk registry at %s in quorum %s. Error: %s" % + (self.zk_reg_path, self.zk_quorum, str(e))) pass finally: if not zk == None: zk.stop() zk.close() - logger.info("AM Host = %s, AM Secured Port = %s" % (amHost, amSecuredPort)) + logger.info("AM Host = %s, AM Secured Port = %s, ping port = %s" % (amHost, amSecuredPort, amUnsecuredPort)) return amHost, amUnsecuredPort, amSecuredPort http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/448d598f/slider-agent/src/main/python/agent/main.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/main.py b/slider-agent/src/main/python/agent/main.py index 4f9cd62..f6b8666 100644 --- a/slider-agent/src/main/python/agent/main.py +++ b/slider-agent/src/main/python/agent/main.py @@ -217,18 +217,6 @@ def main(): if options.debug: agentConfig.set(AgentConfig.AGENT_SECTION, AgentConfig.APP_DBG_CMD, options.debug) - # Extract the AM hostname and secured port from ZK registry - registry = Registry(options.zk_quorum, options.zk_reg_path) - amHost, amUnsecuredPort, amSecuredPort = registry.readAMHostPort() - if amHost: - agentConfig.set(AgentConfig.SERVER_SECTION, "hostname", amHost) - - if amSecuredPort: - agentConfig.set(AgentConfig.SERVER_SECTION, "secured_port", amSecuredPort) - - if amUnsecuredPort: - agentConfig.set(AgentConfig.SERVER_SECTION, "port", amUnsecuredPort) - # set the security directory to a subdirectory of the run dir secDir = posixpath.join(agentConfig.getResolvedPath(AgentConfig.RUN_DIR), "security") logger.info("Security/Keys directory: " + secDir) @@ -251,16 +239,43 @@ def main(): if len(all_log_folders) > 1: logger.info("Selected log folder from available: " + ",".join(all_log_folders)) - server_url = SERVER_STATUS_URL.format( - agentConfig.get(AgentConfig.SERVER_SECTION, 'hostname'), - agentConfig.get(AgentConfig.SERVER_SECTION, 'port'), - agentConfig.get(AgentConfig.SERVER_SECTION, 'check_path')) - print("Connecting to the server at " + server_url + "...") - logger.info('Connecting to the server at: ' + server_url) - - # Wait until server is reachable - netutil = NetUtil() - netutil.try_to_connect(server_url, -1, logger) + # Extract the AM hostname and secured port from ZK registry + zk_lookup_tries = 0 + while zk_lookup_tries < Constants.MAX_AM_CONNECT_RETRIES: + registry = Registry(options.zk_quorum, options.zk_reg_path) + amHost, amUnsecuredPort, amSecuredPort = registry.readAMHostPort() + + tryConnect = True + if not amHost or not amSecuredPort or not amUnsecuredPort: + logger.info("Unable to extract AM host details from ZK, retrying ...") + tryConnect = False + time.sleep(NetUtil.CONNECT_SERVER_RETRY_INTERVAL_SEC) + + if tryConnect: + if amHost: + agentConfig.set(AgentConfig.SERVER_SECTION, "hostname", amHost) + + if amSecuredPort: + agentConfig.set(AgentConfig.SERVER_SECTION, "secured_port", amSecuredPort) + + if amUnsecuredPort: + agentConfig.set(AgentConfig.SERVER_SECTION, "port", amUnsecuredPort) + + server_url = SERVER_STATUS_URL.format( + agentConfig.get(AgentConfig.SERVER_SECTION, 'hostname'), + agentConfig.get(AgentConfig.SERVER_SECTION, 'port'), + agentConfig.get(AgentConfig.SERVER_SECTION, 'check_path')) + print("Connecting to the server at " + server_url + "...") + logger.info('Connecting to the server at: ' + server_url) + + # Wait until server is reachable and continue to query ZK + netutil = NetUtil() + retries = netutil.try_to_connect(server_url, 3, logger) + zk_lookup_tries += 1 + if retries < 3: + break; + pass + pass # Launch Controller communication controller = Controller(agentConfig) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/448d598f/slider-agent/src/main/python/kazoo/client.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/client.py b/slider-agent/src/main/python/kazoo/client.py index a315489..11d9008 100644 --- a/slider-agent/src/main/python/kazoo/client.py +++ b/slider-agent/src/main/python/kazoo/client.py @@ -20,7 +20,7 @@ from kazoo.exceptions import ( WriterNotClosedException, ) from kazoo.handlers.threading import SequentialThreadingHandler -from kazoo.handlers.utils import capture_exceptions, wrap +from kazoo.handlers.utils import capture_exceptions, wrap, pipe_or_sock_write from kazoo.hosts import collect_hosts from kazoo.loggingsupport import BLATHER from kazoo.protocol.connection import ConnectionHandler @@ -515,7 +515,7 @@ class KazooClient(object): async_object.set_exception(ConnectionClosedError( "Connection has been closed")) try: - os.write(write_pipe, b'\0') + pipe_or_sock_write(write_pipe, b'\0') except: async_object.set_exception(ConnectionClosedError( "Connection has been closed")) @@ -585,7 +585,7 @@ class KazooClient(object): self._stopped.set() self._queue.append((CloseInstance, None)) - os.write(self._connection._write_pipe, b'\0') + pipe_or_sock_write(self._connection._write_pipe, b'\0') self._safe_close() def restart(self): http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/448d598f/slider-agent/src/main/python/kazoo/handlers/threading.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/handlers/threading.py b/slider-agent/src/main/python/kazoo/handlers/threading.py index 3ca9a8f..684a6b0 100644 --- a/slider-agent/src/main/python/kazoo/handlers/threading.py +++ b/slider-agent/src/main/python/kazoo/handlers/threading.py @@ -35,7 +35,7 @@ _STOP = object() log = logging.getLogger(__name__) -class TimeoutError(Exception): +class KazooTimeoutError(Exception): pass @@ -104,7 +104,7 @@ class AsyncResult(object): raise self._exception # if we get to this point we timeout - raise TimeoutError() + raise KazooTimeoutError() def get_nowait(self): """Return the value or raise the exception without blocking. @@ -174,7 +174,7 @@ class SequentialThreadingHandler(object): """ name = "sequential_threading_handler" - timeout_exception = TimeoutError + timeout_exception = KazooTimeoutError sleep_func = staticmethod(time.sleep) queue_impl = Queue.Queue queue_empty = Queue.Empty http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/448d598f/slider-agent/src/main/python/kazoo/handlers/utils.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/handlers/utils.py b/slider-agent/src/main/python/kazoo/handlers/utils.py index 60d6404..385495e 100644 --- a/slider-agent/src/main/python/kazoo/handlers/utils.py +++ b/slider-agent/src/main/python/kazoo/handlers/utils.py @@ -8,7 +8,9 @@ except ImportError: # pragma: nocover HAS_FNCTL = False import functools import os - +import sys +import socket +import errno def _set_fd_cloexec(fd): flags = fcntl.fcntl(fd, fcntl.F_GETFD) @@ -21,19 +23,84 @@ def _set_default_tcpsock_options(module, sock): _set_fd_cloexec(sock) return sock - -def create_pipe(): - """Create a non-blocking read/write pipe. +def pipe_or_sock_read(p_or_s, n): + ''' Use a socket or a pipe to read something''' + if isinstance(p_or_s, int): + # This is a pipe + return os.read(p_or_s, n) + else: + return p_or_s.recv(n) + +def pipe_or_sock_close(p_or_s): + ''' Closes either a socket or a pipe''' + if isinstance(p_or_s, int): + os.close(p_or_s) + else: + p_or_s.close() + +def pipe_or_sock_write(p_or_s, b): + ''' Read from a socket or a pipe depending on what is passed''' + if isinstance(p_or_s, int): + # This is a pipe + os.write(p_or_s,b) + else: + p_or_s.send(b) + +def create_pipe_or_sock(): + """ Create a non-blocking read/write pipe. + On Windows create a pair of sockets """ - r, w = os.pipe() - if HAS_FNCTL: - fcntl.fcntl(r, fcntl.F_SETFL, os.O_NONBLOCK) - fcntl.fcntl(w, fcntl.F_SETFL, os.O_NONBLOCK) - _set_fd_cloexec(r) - _set_fd_cloexec(w) + if sys.platform == "win32": + r, w = create_sock_pair() + else: + r, w = os.pipe() + if HAS_FNCTL: + fcntl.fcntl(r, fcntl.F_SETFL, os.O_NONBLOCK) + fcntl.fcntl(w, fcntl.F_SETFL, os.O_NONBLOCK) + _set_fd_cloexec(r) + _set_fd_cloexec(w) return r, w + +def create_sock_pair(port=0): + """Create socket pair. + + If socket.socketpair isn't available, we emulate it. + """ + # See if socketpair() is available. + have_socketpair = hasattr(socket, 'socketpair') + if have_socketpair: + client_sock, srv_sock = socket.socketpair() + return client_sock, srv_sock + + # Create a non-blocking temporary server socket + temp_srv_sock = socket.socket() + temp_srv_sock.setblocking(False) + temp_srv_sock.bind(('', port)) + port = temp_srv_sock.getsockname()[1] + temp_srv_sock.listen(1) + + # Create non-blocking client socket + client_sock = socket.socket() + client_sock.setblocking(False) + try: + client_sock.connect(('localhost', port)) + except socket.error as err: + # EWOULDBLOCK is not an error, as the socket is non-blocking + if err.errno != errno.EWOULDBLOCK: + raise + + # Use select to wait for connect() to succeed. + import select + timeout = 1 + readable = select.select([temp_srv_sock], [], [], timeout)[0] + if temp_srv_sock not in readable: + raise Exception('Client socket not connected in {} second(s)'.format(timeout)) + srv_sock, _ = temp_srv_sock.accept() + + return client_sock, srv_sock + def create_tcp_socket(module): """Create a TCP socket with the CLOEXEC flag set. """ http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/448d598f/slider-agent/src/main/python/kazoo/protocol/connection.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/protocol/connection.py b/slider-agent/src/main/python/kazoo/protocol/connection.py index 3cbb87f..7872928 100644 --- a/slider-agent/src/main/python/kazoo/protocol/connection.py +++ b/slider-agent/src/main/python/kazoo/protocol/connection.py @@ -17,7 +17,7 @@ from kazoo.exceptions import ( SessionExpiredError, NoNodeError ) -from kazoo.handlers.utils import create_pipe +from kazoo.handlers.utils import create_pipe_or_sock, pipe_or_sock_read, pipe_or_sock_write, pipe_or_sock_close from kazoo.loggingsupport import BLATHER from kazoo.protocol.serialization import ( Auth, @@ -169,7 +169,7 @@ class ConnectionHandler(object): def start(self): """Start the connection up""" if self.connection_closed.is_set(): - self._read_pipe, self._write_pipe = create_pipe() + self._read_pipe, self._write_pipe = create_pipe_or_sock() self.connection_closed.clear() if self._connection_routine: raise Exception("Unable to start, connection routine already " @@ -195,9 +195,9 @@ class ConnectionHandler(object): wp, rp = self._write_pipe, self._read_pipe self._write_pipe = self._read_pipe = None if wp is not None: - os.close(wp) + pipe_or_sock_close(wp) if rp is not None: - os.close(rp) + pipe_or_sock_close(rp) def _server_pinger(self): """Returns a server pinger iterable, that will ping the next @@ -419,7 +419,7 @@ class ConnectionHandler(object): try: # Clear possible inconsistence (no request in the queue # but have data in the read pipe), which causes cpu to spin. - os.read(self._read_pipe, 1) + pipe_or_sock_read(self._read_pipe, 1) except OSError: pass return @@ -440,7 +440,7 @@ class ConnectionHandler(object): self._submit(request, connect_timeout, xid) client._queue.popleft() - os.read(self._read_pipe, 1) + pipe_or_sock_read(self._read_pipe, 1) client._pending.append((request, async_object, xid)) def _send_ping(self, connect_timeout): @@ -492,7 +492,7 @@ class ConnectionHandler(object): def _connect_attempt(self, host, port, retry): client = self.client - TimeoutError = self.handler.timeout_exception + KazooTimeoutError = self.handler.timeout_exception close_connection = False self._socket = None @@ -537,7 +537,7 @@ class ConnectionHandler(object): self.logger.info('Closing connection to %s:%s', host, port) client._session_callback(KeeperState.CLOSED) return STOP_CONNECTING - except (ConnectionDropped, TimeoutError) as e: + except (ConnectionDropped, KazooTimeoutError) as e: if isinstance(e, ConnectionDropped): self.logger.warning('Connection dropped: %s', e) else: http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/448d598f/slider-agent/src/main/python/kazoo/tests/test_client.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/tests/test_client.py b/slider-agent/src/main/python/kazoo/tests/test_client.py index eb19ef5..5807292 100644 --- a/slider-agent/src/main/python/kazoo/tests/test_client.py +++ b/slider-agent/src/main/python/kazoo/tests/test_client.py @@ -84,10 +84,10 @@ class TestClientConstructor(unittest.TestCase): hosts='127.0.0.1:2181,127.0.0.1:2182/a/b').chroot, '/a/b') def test_connection_timeout(self): - from kazoo.handlers.threading import TimeoutError + from kazoo.handlers.threading import KazooTimeoutError client = self._makeOne(hosts='127.0.0.1:9') - self.assertTrue(client.handler.timeout_exception is TimeoutError) - self.assertRaises(TimeoutError, client.start, 0.1) + self.assertTrue(client.handler.timeout_exception is KazooTimeoutError) + self.assertRaises(KazooTimeoutError, client.start, 0.1) def test_ordered_host_selection(self): client = self._makeOne(hosts='127.0.0.1:9,127.0.0.2:9/a', http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/448d598f/slider-agent/src/main/python/kazoo/tests/test_connection.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/tests/test_connection.py b/slider-agent/src/main/python/kazoo/tests/test_connection.py index c764b03..d89a2c6 100644 --- a/slider-agent/src/main/python/kazoo/tests/test_connection.py +++ b/slider-agent/src/main/python/kazoo/tests/test_connection.py @@ -1,5 +1,6 @@ """license: Apache License 2.0, see LICENSE for more details.""" from collections import namedtuple +import sys import os import errno import threading @@ -150,7 +151,7 @@ class TestConnectionHandler(KazooTestCase): deserialize_ev = threading.Event() - def bad_deserialize(bytes, offset): + def bad_deserialize(_bytes, offset): deserialize_ev.set() raise struct.error() @@ -196,28 +197,35 @@ class TestConnectionHandler(KazooTestCase): client.stop() assert read_pipe is not None assert write_pipe is not None - os.fstat(read_pipe) - os.fstat(write_pipe) + if sys.platform != "win32" + os.fstat(read_pipe) + os.fstat(write_pipe) + else: + read_pipe.getsockname() + write_pipe.getsockname() # close client, and pipes should be client.close() - try: - os.fstat(read_pipe) - except OSError as e: - if not e.errno == errno.EBADF: - raise - else: - self.fail("Expected read_pipe to be closed") - - try: - os.fstat(write_pipe) - except OSError as e: - if not e.errno == errno.EBADF: - raise + if sys.platform != "win32" + try: + os.fstat(read_pipe) + except OSError as e: + if not e.errno == errno.EBADF: + raise + else: + self.fail("Expected read_pipe to be closed") + + try: + os.fstat(write_pipe) + except OSError as e: + if not e.errno == errno.EBADF: + raise + else: + self.fail("Expected write_pipe to be closed") else: - self.fail("Expected write_pipe to be closed") - + pass # Not sure what to do here + # start client back up. should get a new, valid pipe client.start() read_pipe = client._connection._read_pipe @@ -225,8 +233,13 @@ class TestConnectionHandler(KazooTestCase): assert read_pipe is not None assert write_pipe is not None - os.fstat(read_pipe) - os.fstat(write_pipe) + if sys.platform != "win32" + os.fstat(read_pipe) + os.fstat(write_pipe) + else: + read_pipe.getsockname() + write_pipe.getsockname() + def test_dirty_pipe(self): client = self.client @@ -237,7 +250,7 @@ class TestConnectionHandler(KazooTestCase): # blow up client. simulates case where some error leaves # a byte in the pipe which doesn't correspond to the # request queue. - os.write(write_pipe, b'\0') + pipe_or_sock_write(write_pipe, b'\0') # eventually this byte should disappear from pipe wait(lambda: client.handler.select([read_pipe], [], [], 0)[0] == []) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/448d598f/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java index 84697c6..ce64a06 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java +++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java @@ -323,6 +323,9 @@ public class AgentProviderService extends AbstractProviderService implements operation.add(debugCmd); } + String outfile = new File(logDir, "agent.out").toString(); + operation.add("> " + outfile + " 2>&1"); + launcher.addCommand(operation.build()); // initialize the component instance state