SLIDER-355. Agent is unable to connect to ZK on windows to retrieve AM host and 
port
SLIDER-356. Slider agent should not default to any host or port value when it 
cannot communicate with ZK


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/448d598f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/448d598f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/448d598f

Branch: refs/heads/feature/SLIDER-149_Support_a_YARN_service_registry
Commit: 448d598fccf75d2a340cdbbeb45d3913fb473050
Parents: fc534ca
Author: Sumit Mohanty <smoha...@hortonworks.com>
Authored: Sat Aug 23 19:50:16 2014 -0700
Committer: Sumit Mohanty <smoha...@hortonworks.com>
Committed: Sat Aug 23 19:50:30 2014 -0700

----------------------------------------------------------------------
 app-packages/memcached-win/appConfig.json       |  1 +
 app-packages/memcached-win/metainfo.xml         |  4 +-
 slider-agent/src/main/python/agent/Constants.py |  1 +
 .../src/main/python/agent/Controller.py         |  1 -
 slider-agent/src/main/python/agent/Registry.py  |  8 +-
 slider-agent/src/main/python/agent/main.py      | 59 ++++++++-----
 slider-agent/src/main/python/kazoo/client.py    |  6 +-
 .../src/main/python/kazoo/handlers/threading.py |  6 +-
 .../src/main/python/kazoo/handlers/utils.py     | 87 +++++++++++++++++---
 .../main/python/kazoo/protocol/connection.py    | 16 ++--
 .../src/main/python/kazoo/tests/test_client.py  |  6 +-
 .../main/python/kazoo/tests/test_connection.py  | 55 ++++++++-----
 .../providers/agent/AgentProviderService.java   |  3 +
 13 files changed, 176 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/448d598f/app-packages/memcached-win/appConfig.json
----------------------------------------------------------------------
diff --git a/app-packages/memcached-win/appConfig.json 
b/app-packages/memcached-win/appConfig.json
index e39feb0..471a974 100644
--- a/app-packages/memcached-win/appConfig.json
+++ b/app-packages/memcached-win/appConfig.json
@@ -8,6 +8,7 @@
 
     "site.global.app_user": "hadoop",
     "site.global.pid_file": "${AGENT_WORK_ROOT}/app/run/component.pid",
+    "site.global.app_root": "${AGENT_WORK_ROOT}\\app\\install",
     "site.global.additional_cp": 
"C:\\hdp\\hadoop-2.4.0.2.1.3.0-1990\\share\\hadoop\\common\\lib\\*",
     "site.global.xmx_val": "256m",
     "site.global.xms_val": "128m",

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/448d598f/app-packages/memcached-win/metainfo.xml
----------------------------------------------------------------------
diff --git a/app-packages/memcached-win/metainfo.xml 
b/app-packages/memcached-win/metainfo.xml
index 27a8be6..c7e5881 100644
--- a/app-packages/memcached-win/metainfo.xml
+++ b/app-packages/memcached-win/metainfo.xml
@@ -46,8 +46,8 @@
         <osType>any</osType>
         <packages>
           <package>
-            <type>zip</type>
-            <name>files/jmemcached-1.0.0.zip</name>
+            <type>folder</type>
+            <name>files\\jmemcached-1.0.0</name>
           </package>
         </packages>
       </osSpecific>

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/448d598f/slider-agent/src/main/python/agent/Constants.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/agent/Constants.py 
b/slider-agent/src/main/python/agent/Constants.py
index 2975266..f120b94 100644
--- a/slider-agent/src/main/python/agent/Constants.py
+++ b/slider-agent/src/main/python/agent/Constants.py
@@ -33,3 +33,4 @@ DO_NOT_HEARTBEAT_AFTER_ = "DO_NOT_HEARTBEAT_AFTER_"
 ZK_QUORUM="zk_quorum"
 ZK_REG_PATH="zk_reg_path"
 AUTO_GENERATED="auto_generated"
+MAX_AM_CONNECT_RETRIES = 10

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/448d598f/slider-agent/src/main/python/agent/Controller.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/agent/Controller.py 
b/slider-agent/src/main/python/agent/Controller.py
index dfd0a09..a3fb90d 100644
--- a/slider-agent/src/main/python/agent/Controller.py
+++ b/slider-agent/src/main/python/agent/Controller.py
@@ -331,7 +331,6 @@ class Controller(threading.Thread):
           zk_reg_path = self.config.get(AgentConfig.SERVER_SECTION, 
Constants.ZK_REG_PATH)
           registry = Registry(zk_quorum, zk_reg_path)
           amHost, amUnsecuredPort, amSecuredPort = registry.readAMHostPort()
-          logger.info("Read from ZK registry: AM host = %s, AM secured port = 
%s" % (amHost, amSecuredPort))
           self.hostname = amHost
           self.secured_port = amSecuredPort
           self.config.set(AgentConfig.SERVER_SECTION, "hostname", 
self.hostname)

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/448d598f/slider-agent/src/main/python/agent/Registry.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/agent/Registry.py 
b/slider-agent/src/main/python/agent/Registry.py
index c7d6e5d..9d4a387 100644
--- a/slider-agent/src/main/python/agent/Registry.py
+++ b/slider-agent/src/main/python/agent/Registry.py
@@ -50,14 +50,14 @@ class Registry:
       # the port needs to be utf-8 encoded
       amSecuredPort = amSecuredPort.encode('utf8', 'ignore')
       amUnsecuredPort = amUnsecuredPort.encode('utf8', 'ignore')
-    except Exception:
+    except Exception, e:
       # log and let empty strings be returned
-      logger.error("Could not connect to zk registry at %s in quorum %s" % 
-                   (self.zk_reg_path, self.zk_quorum))
+      logger.error("Could not connect to zk registry at %s in quorum %s. 
Error: %s" %
+                   (self.zk_reg_path, self.zk_quorum, str(e)))
       pass
     finally:
       if not zk == None:
         zk.stop()
         zk.close()
-    logger.info("AM Host = %s, AM Secured Port = %s" % (amHost, amSecuredPort))
+    logger.info("AM Host = %s, AM Secured Port = %s, ping port = %s" % 
(amHost, amSecuredPort, amUnsecuredPort))
     return amHost, amUnsecuredPort, amSecuredPort

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/448d598f/slider-agent/src/main/python/agent/main.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/agent/main.py 
b/slider-agent/src/main/python/agent/main.py
index 4f9cd62..f6b8666 100644
--- a/slider-agent/src/main/python/agent/main.py
+++ b/slider-agent/src/main/python/agent/main.py
@@ -217,18 +217,6 @@ def main():
   if options.debug:
     agentConfig.set(AgentConfig.AGENT_SECTION, AgentConfig.APP_DBG_CMD, 
options.debug)
 
-  # Extract the AM hostname and secured port from ZK registry
-  registry = Registry(options.zk_quorum, options.zk_reg_path)
-  amHost, amUnsecuredPort, amSecuredPort = registry.readAMHostPort()
-  if amHost:
-      agentConfig.set(AgentConfig.SERVER_SECTION, "hostname", amHost)
-
-  if amSecuredPort:
-      agentConfig.set(AgentConfig.SERVER_SECTION, "secured_port", 
amSecuredPort)
-
-  if amUnsecuredPort:
-      agentConfig.set(AgentConfig.SERVER_SECTION, "port", amUnsecuredPort)
-
   # set the security directory to a subdirectory of the run dir
   secDir = posixpath.join(agentConfig.getResolvedPath(AgentConfig.RUN_DIR), 
"security")
   logger.info("Security/Keys directory: " + secDir)
@@ -251,16 +239,43 @@ def main():
   if len(all_log_folders) > 1:
     logger.info("Selected log folder from available: " + 
",".join(all_log_folders))
 
-  server_url = SERVER_STATUS_URL.format(
-    agentConfig.get(AgentConfig.SERVER_SECTION, 'hostname'),
-    agentConfig.get(AgentConfig.SERVER_SECTION, 'port'),
-    agentConfig.get(AgentConfig.SERVER_SECTION, 'check_path'))
-  print("Connecting to the server at " + server_url + "...")
-  logger.info('Connecting to the server at: ' + server_url)
-
-  # Wait until server is reachable
-  netutil = NetUtil()
-  netutil.try_to_connect(server_url, -1, logger)
+  # Extract the AM hostname and secured port from ZK registry
+  zk_lookup_tries = 0
+  while zk_lookup_tries < Constants.MAX_AM_CONNECT_RETRIES:
+    registry = Registry(options.zk_quorum, options.zk_reg_path)
+    amHost, amUnsecuredPort, amSecuredPort = registry.readAMHostPort()
+
+    tryConnect = True
+    if not amHost or not amSecuredPort or not amUnsecuredPort:
+      logger.info("Unable to extract AM host details from ZK, retrying ...")
+      tryConnect = False
+      time.sleep(NetUtil.CONNECT_SERVER_RETRY_INTERVAL_SEC)
+
+    if tryConnect:
+      if amHost:
+        agentConfig.set(AgentConfig.SERVER_SECTION, "hostname", amHost)
+
+      if amSecuredPort:
+        agentConfig.set(AgentConfig.SERVER_SECTION, "secured_port", 
amSecuredPort)
+
+      if amUnsecuredPort:
+        agentConfig.set(AgentConfig.SERVER_SECTION, "port", amUnsecuredPort)
+
+      server_url = SERVER_STATUS_URL.format(
+        agentConfig.get(AgentConfig.SERVER_SECTION, 'hostname'),
+        agentConfig.get(AgentConfig.SERVER_SECTION, 'port'),
+        agentConfig.get(AgentConfig.SERVER_SECTION, 'check_path'))
+      print("Connecting to the server at " + server_url + "...")
+      logger.info('Connecting to the server at: ' + server_url)
+
+      # Wait until server is reachable and continue to query ZK
+      netutil = NetUtil()
+      retries = netutil.try_to_connect(server_url, 3, logger)
+      zk_lookup_tries += 1
+      if retries < 3:
+        break;
+      pass
+    pass
 
   # Launch Controller communication
   controller = Controller(agentConfig)

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/448d598f/slider-agent/src/main/python/kazoo/client.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/client.py 
b/slider-agent/src/main/python/kazoo/client.py
index a315489..11d9008 100644
--- a/slider-agent/src/main/python/kazoo/client.py
+++ b/slider-agent/src/main/python/kazoo/client.py
@@ -20,7 +20,7 @@ from kazoo.exceptions import (
     WriterNotClosedException,
 )
 from kazoo.handlers.threading import SequentialThreadingHandler
-from kazoo.handlers.utils import capture_exceptions, wrap
+from kazoo.handlers.utils import capture_exceptions, wrap, pipe_or_sock_write
 from kazoo.hosts import collect_hosts
 from kazoo.loggingsupport import BLATHER
 from kazoo.protocol.connection import ConnectionHandler
@@ -515,7 +515,7 @@ class KazooClient(object):
             async_object.set_exception(ConnectionClosedError(
                 "Connection has been closed"))
         try:
-            os.write(write_pipe, b'\0')
+            pipe_or_sock_write(write_pipe, b'\0')
         except:
             async_object.set_exception(ConnectionClosedError(
                 "Connection has been closed"))
@@ -585,7 +585,7 @@ class KazooClient(object):
 
         self._stopped.set()
         self._queue.append((CloseInstance, None))
-        os.write(self._connection._write_pipe, b'\0')
+        pipe_or_sock_write(self._connection._write_pipe, b'\0')
         self._safe_close()
 
     def restart(self):

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/448d598f/slider-agent/src/main/python/kazoo/handlers/threading.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/handlers/threading.py 
b/slider-agent/src/main/python/kazoo/handlers/threading.py
index 3ca9a8f..684a6b0 100644
--- a/slider-agent/src/main/python/kazoo/handlers/threading.py
+++ b/slider-agent/src/main/python/kazoo/handlers/threading.py
@@ -35,7 +35,7 @@ _STOP = object()
 log = logging.getLogger(__name__)
 
 
-class TimeoutError(Exception):
+class KazooTimeoutError(Exception):
     pass
 
 
@@ -104,7 +104,7 @@ class AsyncResult(object):
                     raise self._exception
 
             # if we get to this point we timeout
-            raise TimeoutError()
+            raise KazooTimeoutError()
 
     def get_nowait(self):
         """Return the value or raise the exception without blocking.
@@ -174,7 +174,7 @@ class SequentialThreadingHandler(object):
 
     """
     name = "sequential_threading_handler"
-    timeout_exception = TimeoutError
+    timeout_exception = KazooTimeoutError
     sleep_func = staticmethod(time.sleep)
     queue_impl = Queue.Queue
     queue_empty = Queue.Empty

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/448d598f/slider-agent/src/main/python/kazoo/handlers/utils.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/handlers/utils.py 
b/slider-agent/src/main/python/kazoo/handlers/utils.py
index 60d6404..385495e 100644
--- a/slider-agent/src/main/python/kazoo/handlers/utils.py
+++ b/slider-agent/src/main/python/kazoo/handlers/utils.py
@@ -8,7 +8,9 @@ except ImportError:  # pragma: nocover
     HAS_FNCTL = False
 import functools
 import os
-
+import sys
+import socket
+import errno
 
 def _set_fd_cloexec(fd):
     flags = fcntl.fcntl(fd, fcntl.F_GETFD)
@@ -21,19 +23,84 @@ def _set_default_tcpsock_options(module, sock):
         _set_fd_cloexec(sock)
     return sock
 
-
-def create_pipe():
-    """Create a non-blocking read/write pipe.
+def pipe_or_sock_read(p_or_s, n):
+    ''' Use a socket or a pipe to read something'''
+    if isinstance(p_or_s, int):
+        # This is a pipe
+        return os.read(p_or_s, n)
+    else:
+        return p_or_s.recv(n)
+
+def pipe_or_sock_close(p_or_s):
+    ''' Closes either a socket or a pipe'''
+    if isinstance(p_or_s, int):
+        os.close(p_or_s)
+    else:
+        p_or_s.close()
+        
+def pipe_or_sock_write(p_or_s, b):
+    ''' Read from a socket or a pipe depending on what is passed'''
+    if isinstance(p_or_s, int):
+        # This is a pipe
+        os.write(p_or_s,b)
+    else:
+        p_or_s.send(b)
+        
+def create_pipe_or_sock():
+    """ Create a non-blocking read/write pipe.
+        On Windows create a pair of sockets
     """
-    r, w = os.pipe()
-    if HAS_FNCTL:
-        fcntl.fcntl(r, fcntl.F_SETFL, os.O_NONBLOCK)
-        fcntl.fcntl(w, fcntl.F_SETFL, os.O_NONBLOCK)
-        _set_fd_cloexec(r)
-        _set_fd_cloexec(w)
+    if sys.platform == "win32":
+        r, w = create_sock_pair()
+    else:
+        r, w = os.pipe()
+        if HAS_FNCTL:
+            fcntl.fcntl(r, fcntl.F_SETFL, os.O_NONBLOCK)
+            fcntl.fcntl(w, fcntl.F_SETFL, os.O_NONBLOCK)
+            _set_fd_cloexec(r)
+            _set_fd_cloexec(w)
     return r, w
 
 
+
+def create_sock_pair(port=0):
+    """Create socket pair.
+
+    If socket.socketpair isn't available, we emulate it.
+    """
+    # See if socketpair() is available.
+    have_socketpair = hasattr(socket, 'socketpair')
+    if have_socketpair:
+        client_sock, srv_sock = socket.socketpair()
+        return client_sock, srv_sock
+
+    # Create a non-blocking temporary server socket
+    temp_srv_sock = socket.socket()
+    temp_srv_sock.setblocking(False)
+    temp_srv_sock.bind(('', port))
+    port = temp_srv_sock.getsockname()[1]
+    temp_srv_sock.listen(1)
+
+    # Create non-blocking client socket
+    client_sock = socket.socket()
+    client_sock.setblocking(False)
+    try:
+        client_sock.connect(('localhost', port))
+    except socket.error as err:
+        # EWOULDBLOCK is not an error, as the socket is non-blocking
+        if err.errno != errno.EWOULDBLOCK:
+            raise
+
+    # Use select to wait for connect() to succeed.
+    import select
+    timeout = 1
+    readable = select.select([temp_srv_sock], [], [], timeout)[0]
+    if temp_srv_sock not in readable:
+        raise Exception('Client socket not connected in {} 
second(s)'.format(timeout))
+    srv_sock, _ = temp_srv_sock.accept()
+
+    return client_sock, srv_sock
+
 def create_tcp_socket(module):
     """Create a TCP socket with the CLOEXEC flag set.
     """

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/448d598f/slider-agent/src/main/python/kazoo/protocol/connection.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/protocol/connection.py 
b/slider-agent/src/main/python/kazoo/protocol/connection.py
index 3cbb87f..7872928 100644
--- a/slider-agent/src/main/python/kazoo/protocol/connection.py
+++ b/slider-agent/src/main/python/kazoo/protocol/connection.py
@@ -17,7 +17,7 @@ from kazoo.exceptions import (
     SessionExpiredError,
     NoNodeError
 )
-from kazoo.handlers.utils import create_pipe
+from kazoo.handlers.utils import create_pipe_or_sock, pipe_or_sock_read, 
pipe_or_sock_write, pipe_or_sock_close
 from kazoo.loggingsupport import BLATHER
 from kazoo.protocol.serialization import (
     Auth,
@@ -169,7 +169,7 @@ class ConnectionHandler(object):
     def start(self):
         """Start the connection up"""
         if self.connection_closed.is_set():
-            self._read_pipe, self._write_pipe = create_pipe()
+            self._read_pipe, self._write_pipe = create_pipe_or_sock()
             self.connection_closed.clear()
         if self._connection_routine:
             raise Exception("Unable to start, connection routine already "
@@ -195,9 +195,9 @@ class ConnectionHandler(object):
         wp, rp = self._write_pipe, self._read_pipe
         self._write_pipe = self._read_pipe = None
         if wp is not None:
-            os.close(wp)
+            pipe_or_sock_close(wp)
         if rp is not None:
-            os.close(rp)
+            pipe_or_sock_close(rp)
 
     def _server_pinger(self):
         """Returns a server pinger iterable, that will ping the next
@@ -419,7 +419,7 @@ class ConnectionHandler(object):
             try:
                 # Clear possible inconsistence (no request in the queue
                 # but have data in the read pipe), which causes cpu to spin.
-                os.read(self._read_pipe, 1)
+                pipe_or_sock_read(self._read_pipe, 1)
             except OSError:
                 pass
             return
@@ -440,7 +440,7 @@ class ConnectionHandler(object):
 
         self._submit(request, connect_timeout, xid)
         client._queue.popleft()
-        os.read(self._read_pipe, 1)
+        pipe_or_sock_read(self._read_pipe, 1)
         client._pending.append((request, async_object, xid))
 
     def _send_ping(self, connect_timeout):
@@ -492,7 +492,7 @@ class ConnectionHandler(object):
 
     def _connect_attempt(self, host, port, retry):
         client = self.client
-        TimeoutError = self.handler.timeout_exception
+        KazooTimeoutError = self.handler.timeout_exception
         close_connection = False
 
         self._socket = None
@@ -537,7 +537,7 @@ class ConnectionHandler(object):
             self.logger.info('Closing connection to %s:%s', host, port)
             client._session_callback(KeeperState.CLOSED)
             return STOP_CONNECTING
-        except (ConnectionDropped, TimeoutError) as e:
+        except (ConnectionDropped, KazooTimeoutError) as e:
             if isinstance(e, ConnectionDropped):
                 self.logger.warning('Connection dropped: %s', e)
             else:

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/448d598f/slider-agent/src/main/python/kazoo/tests/test_client.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/tests/test_client.py 
b/slider-agent/src/main/python/kazoo/tests/test_client.py
index eb19ef5..5807292 100644
--- a/slider-agent/src/main/python/kazoo/tests/test_client.py
+++ b/slider-agent/src/main/python/kazoo/tests/test_client.py
@@ -84,10 +84,10 @@ class TestClientConstructor(unittest.TestCase):
             hosts='127.0.0.1:2181,127.0.0.1:2182/a/b').chroot, '/a/b')
 
     def test_connection_timeout(self):
-        from kazoo.handlers.threading import TimeoutError
+        from kazoo.handlers.threading import KazooTimeoutError
         client = self._makeOne(hosts='127.0.0.1:9')
-        self.assertTrue(client.handler.timeout_exception is TimeoutError)
-        self.assertRaises(TimeoutError, client.start, 0.1)
+        self.assertTrue(client.handler.timeout_exception is KazooTimeoutError)
+        self.assertRaises(KazooTimeoutError, client.start, 0.1)
 
     def test_ordered_host_selection(self):
         client = self._makeOne(hosts='127.0.0.1:9,127.0.0.2:9/a',

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/448d598f/slider-agent/src/main/python/kazoo/tests/test_connection.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/tests/test_connection.py 
b/slider-agent/src/main/python/kazoo/tests/test_connection.py
index c764b03..d89a2c6 100644
--- a/slider-agent/src/main/python/kazoo/tests/test_connection.py
+++ b/slider-agent/src/main/python/kazoo/tests/test_connection.py
@@ -1,5 +1,6 @@
 """license: Apache License 2.0, see LICENSE for more details."""
 from collections import namedtuple
+import sys
 import os
 import errno
 import threading
@@ -150,7 +151,7 @@ class TestConnectionHandler(KazooTestCase):
 
         deserialize_ev = threading.Event()
 
-        def bad_deserialize(bytes, offset):
+        def bad_deserialize(_bytes, offset):
             deserialize_ev.set()
             raise struct.error()
 
@@ -196,28 +197,35 @@ class TestConnectionHandler(KazooTestCase):
         client.stop()
         assert read_pipe is not None
         assert write_pipe is not None
-        os.fstat(read_pipe)
-        os.fstat(write_pipe)
+        if sys.platform != "win32"
+            os.fstat(read_pipe)
+            os.fstat(write_pipe)
+        else:
+            read_pipe.getsockname()
+            write_pipe.getsockname()
 
         # close client, and pipes should be
         client.close()
 
-        try:
-            os.fstat(read_pipe)
-        except OSError as e:
-            if not e.errno == errno.EBADF:
-                raise
-        else:
-            self.fail("Expected read_pipe to be closed")
-
-        try:
-            os.fstat(write_pipe)
-        except OSError as e:
-            if not e.errno == errno.EBADF:
-                raise
+        if sys.platform != "win32"
+            try:
+                os.fstat(read_pipe)
+            except OSError as e:
+                if not e.errno == errno.EBADF:
+                    raise
+            else:
+                self.fail("Expected read_pipe to be closed")
+    
+            try:
+                os.fstat(write_pipe)
+            except OSError as e:
+                if not e.errno == errno.EBADF:
+                    raise
+            else:
+                self.fail("Expected write_pipe to be closed")
         else:
-            self.fail("Expected write_pipe to be closed")
-
+            pass # Not sure what to do here
+            
         # start client back up. should get a new, valid pipe
         client.start()
         read_pipe = client._connection._read_pipe
@@ -225,8 +233,13 @@ class TestConnectionHandler(KazooTestCase):
 
         assert read_pipe is not None
         assert write_pipe is not None
-        os.fstat(read_pipe)
-        os.fstat(write_pipe)
+        if sys.platform != "win32"
+            os.fstat(read_pipe)
+            os.fstat(write_pipe)
+        else:
+            read_pipe.getsockname()
+            write_pipe.getsockname()
+            
 
     def test_dirty_pipe(self):
         client = self.client
@@ -237,7 +250,7 @@ class TestConnectionHandler(KazooTestCase):
         # blow up client. simulates case where some error leaves
         # a byte in the pipe which doesn't correspond to the
         # request queue.
-        os.write(write_pipe, b'\0')
+        pipe_or_sock_write(write_pipe, b'\0')
 
         # eventually this byte should disappear from pipe
         wait(lambda: client.handler.select([read_pipe], [], [], 0)[0] == [])

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/448d598f/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
 
b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
index 84697c6..ce64a06 100644
--- 
a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
+++ 
b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
@@ -323,6 +323,9 @@ public class AgentProviderService extends 
AbstractProviderService implements
       operation.add(debugCmd);
     }
 
+    String outfile = new File(logDir, "agent.out").toString();
+    operation.add("> " + outfile + " 2>&1");
+
     launcher.addCommand(operation.build());
 
     // initialize the component instance state

Reply via email to