http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/tests/test_lock.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/tests/test_lock.py 
b/slider-agent/src/main/python/kazoo/tests/test_lock.py
new file mode 100644
index 0000000..36b6d42
--- /dev/null
+++ b/slider-agent/src/main/python/kazoo/tests/test_lock.py
@@ -0,0 +1,517 @@
+import uuid
+import threading
+
+from nose.tools import eq_, ok_
+
+from kazoo.exceptions import CancelledError
+from kazoo.exceptions import LockTimeout
+from kazoo.testing import KazooTestCase
+from kazoo.tests.util import wait
+
+
+class KazooLockTests(KazooTestCase):
+    def setUp(self):
+        super(KazooLockTests, self).setUp()
+        self.lockpath = "/" + uuid.uuid4().hex
+
+        self.condition = threading.Condition()
+        self.released = threading.Event()
+        self.active_thread = None
+        self.cancelled_threads = []
+
+    def _thread_lock_acquire_til_event(self, name, lock, event):
+        try:
+            with lock:
+                with self.condition:
+                    eq_(self.active_thread, None)
+                    self.active_thread = name
+                    self.condition.notify_all()
+
+                event.wait()
+
+                with self.condition:
+                    eq_(self.active_thread, name)
+                    self.active_thread = None
+                    self.condition.notify_all()
+            self.released.set()
+        except CancelledError:
+            with self.condition:
+                self.cancelled_threads.append(name)
+                self.condition.notify_all()
+
+    def test_lock_one(self):
+        lock_name = uuid.uuid4().hex
+        lock = self.client.Lock(self.lockpath, lock_name)
+        event = threading.Event()
+
+        thread = threading.Thread(target=self._thread_lock_acquire_til_event,
+            args=(lock_name, lock, event))
+        thread.start()
+
+        lock2_name = uuid.uuid4().hex
+        anotherlock = self.client.Lock(self.lockpath, lock2_name)
+
+        # wait for any contender to show up on the lock
+        wait(anotherlock.contenders)
+        eq_(anotherlock.contenders(), [lock_name])
+
+        with self.condition:
+            while self.active_thread != lock_name:
+                self.condition.wait()
+
+        # release the lock
+        event.set()
+
+        with self.condition:
+            while self.active_thread:
+                self.condition.wait()
+        self.released.wait()
+        thread.join()
+
+    def test_lock(self):
+        threads = []
+        names = ["contender" + str(i) for i in range(5)]
+
+        contender_bits = {}
+
+        for name in names:
+            e = threading.Event()
+
+            l = self.client.Lock(self.lockpath, name)
+            t = threading.Thread(target=self._thread_lock_acquire_til_event,
+                args=(name, l, e))
+            contender_bits[name] = (t, e)
+            threads.append(t)
+
+        # acquire the lock ourselves first to make the others line up
+        lock = self.client.Lock(self.lockpath, "test")
+        lock.acquire()
+
+        for t in threads:
+            t.start()
+
+        # wait for everyone to line up on the lock
+        wait(lambda: len(lock.contenders()) == 6)
+        contenders = lock.contenders()
+
+        eq_(contenders[0], "test")
+        contenders = contenders[1:]
+        remaining = list(contenders)
+
+        # release the lock and contenders should claim it in order
+        lock.release()
+
+        for contender in contenders:
+            thread, event = contender_bits[contender]
+
+            with self.condition:
+                while not self.active_thread:
+                    self.condition.wait()
+                eq_(self.active_thread, contender)
+
+            eq_(lock.contenders(), remaining)
+            remaining = remaining[1:]
+
+            event.set()
+
+            with self.condition:
+                while self.active_thread:
+                    self.condition.wait()
+        for thread in threads:
+            thread.join()
+
+    def test_lock_reconnect(self):
+        event = threading.Event()
+        other_lock = self.client.Lock(self.lockpath, 'contender')
+        thread = threading.Thread(target=self._thread_lock_acquire_til_event,
+                                  args=('contender', other_lock, event))
+
+        # acquire the lock ourselves first to make the contender line up
+        lock = self.client.Lock(self.lockpath, "test")
+        lock.acquire()
+
+        thread.start()
+        # wait for the contender to line up on the lock
+        wait(lambda: len(lock.contenders()) == 2)
+        eq_(lock.contenders(), ['test', 'contender'])
+
+        self.expire_session()
+
+        lock.release()
+
+        with self.condition:
+            while not self.active_thread:
+                self.condition.wait()
+            eq_(self.active_thread, 'contender')
+
+        event.set()
+        thread.join()
+
+    def test_lock_non_blocking(self):
+        lock_name = uuid.uuid4().hex
+        lock = self.client.Lock(self.lockpath, lock_name)
+        event = threading.Event()
+
+        thread = threading.Thread(target=self._thread_lock_acquire_til_event,
+            args=(lock_name, lock, event))
+        thread.start()
+
+        lock1 = self.client.Lock(self.lockpath, lock_name)
+
+        # wait for the thread to acquire the lock
+        with self.condition:
+            if not self.active_thread:
+                self.condition.wait(5)
+
+        ok_(not lock1.acquire(blocking=False))
+        eq_(lock.contenders(), [lock_name])  # just one - itself
+
+        event.set()
+        thread.join()
+
+    def test_lock_fail_first_call(self):
+        event1 = threading.Event()
+        lock1 = self.client.Lock(self.lockpath, "one")
+        thread1 = threading.Thread(target=self._thread_lock_acquire_til_event,
+            args=("one", lock1, event1))
+        thread1.start()
+
+        # wait for this thread to acquire the lock
+        with self.condition:
+            if not self.active_thread:
+                self.condition.wait(5)
+                eq_(self.active_thread, "one")
+        eq_(lock1.contenders(), ["one"])
+        event1.set()
+        thread1.join()
+
+    def test_lock_cancel(self):
+        event1 = threading.Event()
+        lock1 = self.client.Lock(self.lockpath, "one")
+        thread1 = threading.Thread(target=self._thread_lock_acquire_til_event,
+            args=("one", lock1, event1))
+        thread1.start()
+
+        # wait for this thread to acquire the lock
+        with self.condition:
+            if not self.active_thread:
+                self.condition.wait(5)
+                eq_(self.active_thread, "one")
+
+        client2 = self._get_client()
+        client2.start()
+        event2 = threading.Event()
+        lock2 = client2.Lock(self.lockpath, "two")
+        thread2 = threading.Thread(target=self._thread_lock_acquire_til_event,
+            args=("two", lock2, event2))
+        thread2.start()
+
+        # this one should block in acquire. check that it is a contender
+        wait(lambda: len(lock2.contenders()) > 1)
+        eq_(lock2.contenders(), ["one", "two"])
+
+        lock2.cancel()
+        with self.condition:
+            if not "two" in self.cancelled_threads:
+                self.condition.wait()
+                assert "two" in self.cancelled_threads
+
+        eq_(lock2.contenders(), ["one"])
+
+        thread2.join()
+        event1.set()
+        thread1.join()
+        client2.stop()
+
+    def test_lock_double_calls(self):
+        lock1 = self.client.Lock(self.lockpath, "one")
+        lock1.acquire()
+        lock1.acquire()
+        lock1.release()
+        lock1.release()
+
+    def test_lock_reacquire(self):
+        lock = self.client.Lock(self.lockpath, "one")
+        lock.acquire()
+        lock.release()
+        lock.acquire()
+        lock.release()
+
+    def test_lock_timeout(self):
+        timeout = 3
+        e = threading.Event()
+        started = threading.Event()
+
+        # In the background thread, acquire the lock and wait thrice the time
+        # that the main thread is going to wait to acquire the lock.
+        lock1 = self.client.Lock(self.lockpath, "one")
+
+        def _thread(lock, event, timeout):
+            with lock:
+                started.set()
+                event.wait(timeout)
+                if not event.isSet():
+                    # Eventually fail to avoid hanging the tests
+                    self.fail("lock2 never timed out")
+
+        t = threading.Thread(target=_thread, args=(lock1, e, timeout * 3))
+        t.start()
+
+        # Start the main thread's kazoo client and try to acquire the lock
+        # but give up after `timeout` seconds
+        client2 = self._get_client()
+        client2.start()
+        started.wait(5)
+        self.assertTrue(started.isSet())
+        lock2 = client2.Lock(self.lockpath, "two")
+        try:
+            lock2.acquire(timeout=timeout)
+        except LockTimeout:
+            # A timeout is the behavior we're expecting, since the background
+            # thread should still be holding onto the lock
+            pass
+        else:
+            self.fail("Main thread unexpectedly acquired the lock")
+        finally:
+            # Cleanup
+            e.set()
+            t.join()
+            client2.stop()
+
+
+class TestSemaphore(KazooTestCase):
+    def setUp(self):
+        super(TestSemaphore, self).setUp()
+        self.lockpath = "/" + uuid.uuid4().hex
+
+        self.condition = threading.Condition()
+        self.released = threading.Event()
+        self.active_thread = None
+        self.cancelled_threads = []
+
+    def test_basic(self):
+        sem1 = self.client.Semaphore(self.lockpath)
+        sem1.acquire()
+        sem1.release()
+
+    def test_lock_one(self):
+        sem1 = self.client.Semaphore(self.lockpath, max_leases=1)
+        sem2 = self.client.Semaphore(self.lockpath, max_leases=1)
+        started = threading.Event()
+        event = threading.Event()
+
+        sem1.acquire()
+
+        def sema_one():
+            started.set()
+            with sem2:
+                event.set()
+
+        thread = threading.Thread(target=sema_one, args=())
+        thread.start()
+        started.wait(10)
+
+        self.assertFalse(event.is_set())
+
+        sem1.release()
+        event.wait(10)
+        self.assert_(event.is_set())
+        thread.join()
+
+    def test_non_blocking(self):
+        sem1 = self.client.Semaphore(
+            self.lockpath, identifier='sem1', max_leases=2)
+        sem2 = self.client.Semaphore(
+            self.lockpath, identifier='sem2', max_leases=2)
+        sem3 = self.client.Semaphore(
+            self.lockpath, identifier='sem3', max_leases=2)
+
+        sem1.acquire()
+        sem2.acquire()
+        ok_(not sem3.acquire(blocking=False))
+        eq_(set(sem1.lease_holders()), set(['sem1', 'sem2']))
+        sem2.release()
+        # the next line isn't required, but avoids timing issues in tests
+        sem3.acquire()
+        eq_(set(sem1.lease_holders()), set(['sem1', 'sem3']))
+        sem1.release()
+        sem3.release()
+
+    def test_non_blocking_release(self):
+        sem1 = self.client.Semaphore(
+            self.lockpath, identifier='sem1', max_leases=1)
+        sem2 = self.client.Semaphore(
+            self.lockpath, identifier='sem2', max_leases=1)
+        sem1.acquire()
+        sem2.acquire(blocking=False)
+
+        # make sure there's no shutdown / cleanup error
+        sem1.release()
+        sem2.release()
+
+    def test_holders(self):
+        started = threading.Event()
+        event = threading.Event()
+
+        def sema_one():
+            with self.client.Semaphore(self.lockpath, 'fred', max_leases=1):
+                started.set()
+                event.wait()
+
+        thread = threading.Thread(target=sema_one, args=())
+        thread.start()
+        started.wait()
+        sem1 = self.client.Semaphore(self.lockpath)
+        holders = sem1.lease_holders()
+        eq_(holders, ['fred'])
+        event.set()
+        thread.join()
+
+    def test_semaphore_cancel(self):
+        sem1 = self.client.Semaphore(self.lockpath, 'fred', max_leases=1)
+        sem2 = self.client.Semaphore(self.lockpath, 'george', max_leases=1)
+        sem1.acquire()
+        started = threading.Event()
+        event = threading.Event()
+
+        def sema_one():
+            started.set()
+            try:
+                with sem2:
+                    started.set()
+            except CancelledError:
+                event.set()
+
+        thread = threading.Thread(target=sema_one, args=())
+        thread.start()
+        started.wait()
+        eq_(sem1.lease_holders(), ['fred'])
+        eq_(event.is_set(), False)
+        sem2.cancel()
+        event.wait()
+        eq_(event.is_set(), True)
+        thread.join()
+
+    def test_multiple_acquire_and_release(self):
+        sem1 = self.client.Semaphore(self.lockpath, 'fred', max_leases=1)
+        sem1.acquire()
+        sem1.acquire()
+
+        eq_(True, sem1.release())
+        eq_(False, sem1.release())
+
+    def test_handle_session_loss(self):
+        expire_semaphore = self.client.Semaphore(self.lockpath, 'fred',
+                                                 max_leases=1)
+
+        client = self._get_client()
+        client.start()
+        lh_semaphore = client.Semaphore(self.lockpath, 'george', max_leases=1)
+        lh_semaphore.acquire()
+
+        started = threading.Event()
+        event = threading.Event()
+        event2 = threading.Event()
+
+        def sema_one():
+            started.set()
+            with expire_semaphore:
+                event.set()
+                event2.wait()
+
+        thread = threading.Thread(target=sema_one, args=())
+        thread.start()
+
+        started.wait()
+        eq_(lh_semaphore.lease_holders(), ['george'])
+
+        # Fired in a separate thread to make sure we can see the effect
+        expired = threading.Event()
+
+        def expire():
+            self.expire_session()
+            expired.set()
+
+        thread = threading.Thread(target=expire, args=())
+        thread.start()
+        expire_semaphore.wake_event.wait()
+        expired.wait()
+
+        lh_semaphore.release()
+        client.stop()
+
+        event.wait(5)
+        eq_(expire_semaphore.lease_holders(), ['fred'])
+        event2.set()
+        thread.join()
+
+    def test_inconsistent_max_leases(self):
+        sem1 = self.client.Semaphore(self.lockpath, max_leases=1)
+        sem2 = self.client.Semaphore(self.lockpath, max_leases=2)
+
+        sem1.acquire()
+        self.assertRaises(ValueError, sem2.acquire)
+
+    def test_inconsistent_max_leases_other_data(self):
+        sem1 = self.client.Semaphore(self.lockpath, max_leases=1)
+        sem2 = self.client.Semaphore(self.lockpath, max_leases=2)
+
+        self.client.ensure_path(self.lockpath)
+        self.client.set(self.lockpath, b'a$')
+
+        sem1.acquire()
+        # sem2 thinks it's ok to have two lease holders
+        ok_(sem2.acquire(blocking=False))
+
+    def test_reacquire(self):
+        lock = self.client.Semaphore(self.lockpath)
+        lock.acquire()
+        lock.release()
+        lock.acquire()
+        lock.release()
+
+    def test_acquire_after_cancelled(self):
+        lock = self.client.Semaphore(self.lockpath)
+        self.assertTrue(lock.acquire())
+        self.assertTrue(lock.release())
+        lock.cancel()
+        self.assertTrue(lock.cancelled)
+        self.assertTrue(lock.acquire())
+
+    def test_timeout(self):
+        timeout = 3
+        e = threading.Event()
+        started = threading.Event()
+
+        # In the background thread, acquire the lock and wait thrice the time
+        # that the main thread is going to wait to acquire the lock.
+        sem1 = self.client.Semaphore(self.lockpath, "one")
+
+        def _thread(sem, event, timeout):
+            with sem:
+                started.set()
+                event.wait(timeout)
+                if not event.isSet():
+                    # Eventually fail to avoid hanging the tests
+                    self.fail("sem2 never timed out")
+
+        t = threading.Thread(target=_thread, args=(sem1, e, timeout * 3))
+        t.start()
+
+        # Start the main thread's kazoo client and try to acquire the lock
+        # but give up after `timeout` seconds
+        client2 = self._get_client()
+        client2.start()
+        started.wait(5)
+        self.assertTrue(started.isSet())
+        sem2 = client2.Semaphore(self.lockpath, "two")
+        try:
+            sem2.acquire(timeout=timeout)
+        except LockTimeout:
+            # A timeout is the behavior we're expecting, since the background
+            # thread will still be holding onto the lock
+            e.set()
+        finally:
+            # Cleanup
+            t.join()
+            client2.stop()

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/tests/test_partitioner.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/tests/test_partitioner.py 
b/slider-agent/src/main/python/kazoo/tests/test_partitioner.py
new file mode 100644
index 0000000..a323d07
--- /dev/null
+++ b/slider-agent/src/main/python/kazoo/tests/test_partitioner.py
@@ -0,0 +1,92 @@
+import uuid
+import time
+
+from nose.tools import eq_
+
+from kazoo.testing import KazooTestCase
+from kazoo.recipe.partitioner import PartitionState
+
+
+class KazooPartitionerTests(KazooTestCase):
+    def setUp(self):
+        super(KazooPartitionerTests, self).setUp()
+        self.path = "/" + uuid.uuid4().hex
+
+    def test_party_of_one(self):
+        partitioner = self.client.SetPartitioner(
+            self.path, set=(1, 2, 3), time_boundary=0.2)
+        partitioner.wait_for_acquire(14)
+        eq_(partitioner.state, PartitionState.ACQUIRED)
+        eq_(list(partitioner), [1, 2, 3])
+        partitioner.finish()
+
+    def test_party_of_two(self):
+        partitioners = [self.client.SetPartitioner(self.path, (1, 2),
+                        identifier="p%s" % i, time_boundary=0.2)
+                        for i in range(2)]
+
+        partitioners[0].wait_for_acquire(14)
+        partitioners[1].wait_for_acquire(14)
+        eq_(list(partitioners[0]), [1])
+        eq_(list(partitioners[1]), [2])
+        partitioners[0].finish()
+        time.sleep(0.1)
+        eq_(partitioners[1].release, True)
+        partitioners[1].finish()
+
+    def test_party_expansion(self):
+        partitioners = [self.client.SetPartitioner(self.path, (1, 2, 3),
+                        identifier="p%s" % i, time_boundary=0.2)
+                        for i in range(2)]
+
+        partitioners[0].wait_for_acquire(14)
+        partitioners[1].wait_for_acquire(14)
+        eq_(partitioners[0].state, PartitionState.ACQUIRED)
+        eq_(partitioners[1].state, PartitionState.ACQUIRED)
+
+        eq_(list(partitioners[0]), [1, 3])
+        eq_(list(partitioners[1]), [2])
+
+        # Add another partition, wait till they settle
+        partitioners.append(self.client.SetPartitioner(self.path, (1, 2, 3),
+                            identifier="p2", time_boundary=0.2))
+        time.sleep(0.1)
+        eq_(partitioners[0].release, True)
+        for p in partitioners[:-1]:
+            p.release_set()
+
+        for p in partitioners:
+            p.wait_for_acquire(14)
+
+        eq_(list(partitioners[0]), [1])
+        eq_(list(partitioners[1]), [2])
+        eq_(list(partitioners[2]), [3])
+
+        for p in partitioners:
+            p.finish()
+
+    def test_more_members_than_set_items(self):
+        partitioners = [self.client.SetPartitioner(self.path, (1,),
+                        identifier="p%s" % i, time_boundary=0.2)
+                        for i in range(2)]
+
+        partitioners[0].wait_for_acquire(14)
+        partitioners[1].wait_for_acquire(14)
+        eq_(partitioners[0].state, PartitionState.ACQUIRED)
+        eq_(partitioners[1].state, PartitionState.ACQUIRED)
+
+        eq_(list(partitioners[0]), [1])
+        eq_(list(partitioners[1]), [])
+
+        for p in partitioners:
+            p.finish()
+
+    def test_party_session_failure(self):
+        partitioner = self.client.SetPartitioner(
+            self.path, set=(1, 2, 3), time_boundary=0.2)
+        partitioner.wait_for_acquire(14)
+        eq_(partitioner.state, PartitionState.ACQUIRED)
+        # simulate session failure
+        partitioner._fail_out()
+        partitioner.release_set()
+        self.assertTrue(partitioner.failed)

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/tests/test_party.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/tests/test_party.py 
b/slider-agent/src/main/python/kazoo/tests/test_party.py
new file mode 100644
index 0000000..d44eec7
--- /dev/null
+++ b/slider-agent/src/main/python/kazoo/tests/test_party.py
@@ -0,0 +1,84 @@
+import uuid
+
+from nose.tools import eq_
+
+from kazoo.testing import KazooTestCase
+
+
+class KazooPartyTests(KazooTestCase):
+    def setUp(self):
+        super(KazooPartyTests, self).setUp()
+        self.path = "/" + uuid.uuid4().hex
+
+    def test_party(self):
+        parties = [self.client.Party(self.path, "p%s" % i)
+                   for i in range(5)]
+
+        one_party = parties[0]
+
+        eq_(list(one_party), [])
+        eq_(len(one_party), 0)
+
+        participants = set()
+        for party in parties:
+            party.join()
+            participants.add(party.data.decode('utf-8'))
+
+            eq_(set(party), participants)
+            eq_(len(party), len(participants))
+
+        for party in parties:
+            party.leave()
+            participants.remove(party.data.decode('utf-8'))
+
+            eq_(set(party), participants)
+            eq_(len(party), len(participants))
+
+    def test_party_reuse_node(self):
+        party = self.client.Party(self.path, "p1")
+        self.client.ensure_path(self.path)
+        self.client.create(party.create_path)
+        party.join()
+        self.assertTrue(party.participating)
+        party.leave()
+        self.assertFalse(party.participating)
+        self.assertEqual(len(party), 0)
+
+    def test_party_vanishing_node(self):
+        party = self.client.Party(self.path, "p1")
+        party.join()
+        self.assertTrue(party.participating)
+        self.client.delete(party.create_path)
+        party.leave()
+        self.assertFalse(party.participating)
+        self.assertEqual(len(party), 0)
+
+
+class KazooShallowPartyTests(KazooTestCase):
+    def setUp(self):
+        super(KazooShallowPartyTests, self).setUp()
+        self.path = "/" + uuid.uuid4().hex
+
+    def test_party(self):
+        parties = [self.client.ShallowParty(self.path, "p%s" % i)
+                   for i in range(5)]
+
+        one_party = parties[0]
+
+        eq_(list(one_party), [])
+        eq_(len(one_party), 0)
+
+        participants = set()
+        for party in parties:
+            party.join()
+            participants.add(party.data.decode('utf-8'))
+
+            eq_(set(party), participants)
+            eq_(len(party), len(participants))
+
+        for party in parties:
+            party.leave()
+            participants.remove(party.data.decode('utf-8'))
+
+            eq_(set(party), participants)
+            eq_(len(party), len(participants))

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/tests/test_paths.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/tests/test_paths.py 
b/slider-agent/src/main/python/kazoo/tests/test_paths.py
new file mode 100644
index 0000000..e092196
--- /dev/null
+++ b/slider-agent/src/main/python/kazoo/tests/test_paths.py
@@ -0,0 +1,98 @@
+import sys
+from unittest import TestCase
+
+from kazoo.protocol import paths
+
+
+if sys.version_info > (3, ):  # pragma: nocover
+    def u(s):
+        return s
+else:  # pragma: nocover
+    def u(s):
+        return unicode(s, "unicode_escape")
+
+
+class NormPathTestCase(TestCase):
+
+    def test_normpath(self):
+        self.assertEqual(paths.normpath('/a/b'), '/a/b')
+
+    def test_normpath_empty(self):
+        self.assertEqual(paths.normpath(''), '')
+
+    def test_normpath_unicode(self):
+        self.assertEqual(paths.normpath(u('/\xe4/b')), u('/\xe4/b'))
+
+    def test_normpath_dots(self):
+        self.assertEqual(paths.normpath('/a./b../c'), '/a./b../c')
+
+    def test_normpath_slash(self):
+        self.assertEqual(paths.normpath('/'), '/')
+
+    def test_normpath_multiple_slashes(self):
+        self.assertEqual(paths.normpath('//'), '/')
+        self.assertEqual(paths.normpath('//a/b'), '/a/b')
+        self.assertEqual(paths.normpath('/a//b//'), '/a/b')
+        self.assertEqual(paths.normpath('//a////b///c/'), '/a/b/c')
+
+    def test_normpath_relative(self):
+        self.assertRaises(ValueError, paths.normpath, './a/b')
+        self.assertRaises(ValueError, paths.normpath, '/a/../b')
+
+
+class JoinTestCase(TestCase):
+
+    def test_join(self):
+        self.assertEqual(paths.join('/a'), '/a')
+        self.assertEqual(paths.join('/a', 'b/'), '/a/b/')
+        self.assertEqual(paths.join('/a', 'b', 'c'), '/a/b/c')
+
+    def test_join_empty(self):
+        self.assertEqual(paths.join(''), '')
+        self.assertEqual(paths.join('', 'a', 'b'), 'a/b')
+        self.assertEqual(paths.join('/a', '', 'b/', 'c'), '/a/b/c')
+
+    def test_join_absolute(self):
+        self.assertEqual(paths.join('/a/b', '/c'), '/c')
+
+
+class IsAbsTestCase(TestCase):
+
+    def test_isabs(self):
+        self.assertTrue(paths.isabs('/'))
+        self.assertTrue(paths.isabs('/a'))
+        self.assertTrue(paths.isabs('/a//b/c'))
+        self.assertTrue(paths.isabs('//a/b'))
+
+    def test_isabs_false(self):
+        self.assertFalse(paths.isabs(''))
+        self.assertFalse(paths.isabs('a/'))
+        self.assertFalse(paths.isabs('a/../'))
+
+
+class BaseNameTestCase(TestCase):
+
+    def test_basename(self):
+        self.assertEquals(paths.basename(''), '')
+        self.assertEquals(paths.basename('/'), '')
+        self.assertEquals(paths.basename('//a'), 'a')
+        self.assertEquals(paths.basename('//a/'), '')
+        self.assertEquals(paths.basename('/a/b.//c..'), 'c..')
+
+
+class PrefixRootTestCase(TestCase):
+
+    def test_prefix_root(self):
+        self.assertEquals(paths._prefix_root('/a/', 'b/c'), '/a/b/c')
+        self.assertEquals(paths._prefix_root('/a/b', 'c/d'), '/a/b/c/d')
+        self.assertEquals(paths._prefix_root('/a', '/b/c'), '/a/b/c')
+        self.assertEquals(paths._prefix_root('/a', '//b/c.'), '/a/b/c.')
+
+
+class NormRootTestCase(TestCase):
+
+    def test_norm_root(self):
+        self.assertEquals(paths._norm_root(''), '/')
+        self.assertEquals(paths._norm_root('/'), '/')
+        self.assertEquals(paths._norm_root('//a'), '/a')
+        self.assertEquals(paths._norm_root('//a./b'), '/a./b')

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/tests/test_queue.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/tests/test_queue.py 
b/slider-agent/src/main/python/kazoo/tests/test_queue.py
new file mode 100644
index 0000000..6a9ec68
--- /dev/null
+++ b/slider-agent/src/main/python/kazoo/tests/test_queue.py
@@ -0,0 +1,179 @@
+import uuid
+
+from nose import SkipTest
+from nose.tools import eq_, ok_
+
+from kazoo.testing import KazooTestCase
+from kazoo.tests.util import TRAVIS_ZK_VERSION
+
+
+class KazooQueueTests(KazooTestCase):
+
+    def _makeOne(self):
+        path = "/" + uuid.uuid4().hex
+        return self.client.Queue(path)
+
+    def test_queue_validation(self):
+        queue = self._makeOne()
+        self.assertRaises(TypeError, queue.put, {})
+        self.assertRaises(TypeError, queue.put, b"one", b"100")
+        self.assertRaises(TypeError, queue.put, b"one", 10.0)
+        self.assertRaises(ValueError, queue.put, b"one", -100)
+        self.assertRaises(ValueError, queue.put, b"one", 100000)
+
+    def test_empty_queue(self):
+        queue = self._makeOne()
+        eq_(len(queue), 0)
+        self.assertTrue(queue.get() is None)
+        eq_(len(queue), 0)
+
+    def test_queue(self):
+        queue = self._makeOne()
+        queue.put(b"one")
+        queue.put(b"two")
+        queue.put(b"three")
+        eq_(len(queue), 3)
+
+        eq_(queue.get(), b"one")
+        eq_(queue.get(), b"two")
+        eq_(queue.get(), b"three")
+        eq_(len(queue), 0)
+
+    def test_priority(self):
+        queue = self._makeOne()
+        queue.put(b"four", priority=101)
+        queue.put(b"one", priority=0)
+        queue.put(b"two", priority=0)
+        queue.put(b"three", priority=10)
+
+        eq_(queue.get(), b"one")
+        eq_(queue.get(), b"two")
+        eq_(queue.get(), b"three")
+        eq_(queue.get(), b"four")
+
+
+class KazooLockingQueueTests(KazooTestCase):
+
+    def setUp(self):
+        KazooTestCase.setUp(self)
+        skip = False
+        if TRAVIS_ZK_VERSION and TRAVIS_ZK_VERSION < (3, 4):
+            skip = True
+        elif TRAVIS_ZK_VERSION and TRAVIS_ZK_VERSION >= (3, 4):
+            skip = False
+        else:
+            ver = self.client.server_version()
+            if ver[1] < 4:
+                skip = True
+        if skip:
+            raise SkipTest("Must use Zookeeper 3.4 or above")
+
+    def _makeOne(self):
+        path = "/" + uuid.uuid4().hex
+        return self.client.LockingQueue(path)
+
+    def test_queue_validation(self):
+        queue = self._makeOne()
+        self.assertRaises(TypeError, queue.put, {})
+        self.assertRaises(TypeError, queue.put, b"one", b"100")
+        self.assertRaises(TypeError, queue.put, b"one", 10.0)
+        self.assertRaises(ValueError, queue.put, b"one", -100)
+        self.assertRaises(ValueError, queue.put, b"one", 100000)
+        self.assertRaises(TypeError, queue.put_all, {})
+        self.assertRaises(TypeError, queue.put_all, [{}])
+        self.assertRaises(TypeError, queue.put_all, [b"one"], b"100")
+        self.assertRaises(TypeError, queue.put_all, [b"one"], 10.0)
+        self.assertRaises(ValueError, queue.put_all, [b"one"], -100)
+        self.assertRaises(ValueError, queue.put_all, [b"one"], 100000)
+
+    def test_empty_queue(self):
+        queue = self._makeOne()
+        eq_(len(queue), 0)
+        self.assertTrue(queue.get(0) is None)
+        eq_(len(queue), 0)
+
+    def test_queue(self):
+        queue = self._makeOne()
+        queue.put(b"one")
+        queue.put_all([b"two", b"three"])
+        eq_(len(queue), 3)
+
+        ok_(not queue.consume())
+        ok_(not queue.holds_lock())
+        eq_(queue.get(1), b"one")
+        ok_(queue.holds_lock())
+        # Without consuming, should return the same element
+        eq_(queue.get(1), b"one")
+        ok_(queue.consume())
+        ok_(not queue.holds_lock())
+        eq_(queue.get(1), b"two")
+        ok_(queue.holds_lock())
+        ok_(queue.consume())
+        ok_(not queue.holds_lock())
+        eq_(queue.get(1), b"three")
+        ok_(queue.holds_lock())
+        ok_(queue.consume())
+        ok_(not queue.holds_lock())
+        ok_(not queue.consume())
+        eq_(len(queue), 0)
+
+    def test_consume(self):
+        queue = self._makeOne()
+
+        queue.put(b"one")
+        ok_(not queue.consume())
+        queue.get(.1)
+        ok_(queue.consume())
+        ok_(not queue.consume())
+
+    def test_holds_lock(self):
+        queue = self._makeOne()
+
+        ok_(not queue.holds_lock())
+        queue.put(b"one")
+        queue.get(.1)
+        ok_(queue.holds_lock())
+        queue.consume()
+        ok_(not queue.holds_lock())
+
+    def test_priority(self):
+        queue = self._makeOne()
+        queue.put(b"four", priority=101)
+        queue.put(b"one", priority=0)
+        queue.put(b"two", priority=0)
+        queue.put(b"three", priority=10)
+
+        eq_(queue.get(1), b"one")
+        ok_(queue.consume())
+        eq_(queue.get(1), b"two")
+        ok_(queue.consume())
+        eq_(queue.get(1), b"three")
+        ok_(queue.consume())
+        eq_(queue.get(1), b"four")
+        ok_(queue.consume())
+
+    def test_concurrent_execution(self):
+        queue = self._makeOne()
+        value1 = []
+        value2 = []
+        value3 = []
+        event1 = self.client.handler.event_object()
+        event2 = self.client.handler.event_object()
+        event3 = self.client.handler.event_object()
+
+        def get_concurrently(value, event):
+            q = self.client.LockingQueue(queue.path)
+            value.append(q.get(.1))
+            event.set()
+
+        self.client.handler.spawn(get_concurrently, value1, event1)
+        self.client.handler.spawn(get_concurrently, value2, event2)
+        self.client.handler.spawn(get_concurrently, value3, event3)
+        queue.put(b"one")
+        event1.wait(.2)
+        event2.wait(.2)
+        event3.wait(.2)
+
+        result = value1 + value2 + value3
+        eq_(result.count(b"one"), 1)
+        eq_(result.count(None), 2)

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/tests/test_retry.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/tests/test_retry.py 
b/slider-agent/src/main/python/kazoo/tests/test_retry.py
new file mode 100644
index 0000000..b85739b
--- /dev/null
+++ b/slider-agent/src/main/python/kazoo/tests/test_retry.py
@@ -0,0 +1,77 @@
+import unittest
+
+from nose.tools import eq_
+
+
+class TestRetrySleeper(unittest.TestCase):
+
+    def _pass(self):
+        pass
+
+    def _fail(self, times=1):
+        from kazoo.retry import ForceRetryError
+        scope = dict(times=0)
+
+        def inner():
+            if scope['times'] >= times:
+                pass
+            else:
+                scope['times'] += 1
+                raise ForceRetryError('Failed!')
+        return inner
+
+    def _makeOne(self, *args, **kwargs):
+        from kazoo.retry import KazooRetry
+        return KazooRetry(*args, **kwargs)
+
+    def test_reset(self):
+        retry = self._makeOne(delay=0, max_tries=2)
+        retry(self._fail())
+        eq_(retry._attempts, 1)
+        retry.reset()
+        eq_(retry._attempts, 0)
+
+    def test_too_many_tries(self):
+        from kazoo.retry import RetryFailedError
+        retry = self._makeOne(delay=0)
+        self.assertRaises(RetryFailedError, retry, self._fail(times=999))
+        eq_(retry._attempts, 1)
+
+    def test_maximum_delay(self):
+        def sleep_func(_time):
+            pass
+
+        retry = self._makeOne(delay=10, max_tries=100, sleep_func=sleep_func)
+        retry(self._fail(times=10))
+        self.assertTrue(retry._cur_delay < 4000, retry._cur_delay)
+        # gevent's sleep function is picky about the type
+        eq_(type(retry._cur_delay), float)
+
+    def test_copy(self):
+        _sleep = lambda t: None
+        retry = self._makeOne(sleep_func=_sleep)
+        rcopy = retry.copy()
+        self.assertTrue(rcopy.sleep_func is _sleep)
+
+
+class TestKazooRetry(unittest.TestCase):
+
+    def _makeOne(self, **kw):
+        from kazoo.retry import KazooRetry
+        return KazooRetry(**kw)
+
+    def test_connection_closed(self):
+        from kazoo.exceptions import ConnectionClosedError
+        retry = self._makeOne()
+
+        def testit():
+            raise ConnectionClosedError()
+        self.assertRaises(ConnectionClosedError, retry, testit)
+
+    def test_session_expired(self):
+        from kazoo.exceptions import SessionExpiredError
+        retry = self._makeOne(max_tries=1)
+
+        def testit():
+            raise SessionExpiredError()
+        self.assertRaises(Exception, retry, testit)

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/tests/test_security.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/tests/test_security.py 
b/slider-agent/src/main/python/kazoo/tests/test_security.py
new file mode 100644
index 0000000..4a7e670
--- /dev/null
+++ b/slider-agent/src/main/python/kazoo/tests/test_security.py
@@ -0,0 +1,40 @@
+import unittest
+
+from nose.tools import eq_
+from kazoo.security import Permissions
+
+
+class TestACL(unittest.TestCase):
+    def _makeOne(self, *args, **kwargs):
+        from kazoo.security import make_acl
+        return make_acl(*args, **kwargs)
+
+    def test_read_acl(self):
+        acl = self._makeOne("digest", ":", read=True)
+        eq_(acl.perms & Permissions.READ, Permissions.READ)
+
+    def test_all_perms(self):
+        acl = self._makeOne("digest", ":", read=True, write=True,
+                            create=True, delete=True, admin=True)
+        for perm in [Permissions.READ, Permissions.CREATE, Permissions.WRITE,
+                     Permissions.DELETE, Permissions.ADMIN]:
+            eq_(acl.perms & perm, perm)
+
+    def test_perm_listing(self):
+        from kazoo.security import ACL
+        f = ACL(15, 'fred')
+        self.assert_('READ' in f.acl_list)
+        self.assert_('WRITE' in f.acl_list)
+        self.assert_('CREATE' in f.acl_list)
+        self.assert_('DELETE' in f.acl_list)
+
+        f = ACL(16, 'fred')
+        self.assert_('ADMIN' in f.acl_list)
+
+        f = ACL(31, 'george')
+        self.assert_('ALL' in f.acl_list)
+
+    def test_perm_repr(self):
+        from kazoo.security import ACL
+        f = ACL(16, 'fred')
+        self.assert_("ACL(perms=16, acl_list=['ADMIN']" in repr(f))

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/tests/test_threading_handler.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/tests/test_threading_handler.py 
b/slider-agent/src/main/python/kazoo/tests/test_threading_handler.py
new file mode 100644
index 0000000..b4b93f1
--- /dev/null
+++ b/slider-agent/src/main/python/kazoo/tests/test_threading_handler.py
@@ -0,0 +1,326 @@
+import threading
+import unittest
+
+import mock
+from nose.tools import assert_raises
+from nose.tools import eq_
+from nose.tools import raises
+
+
+class TestThreadingHandler(unittest.TestCase):
+    def _makeOne(self, *args):
+        from kazoo.handlers.threading import SequentialThreadingHandler
+        return SequentialThreadingHandler(*args)
+
+    def _getAsync(self, *args):
+        from kazoo.handlers.threading import AsyncResult
+        return AsyncResult
+
+    def test_proper_threading(self):
+        h = self._makeOne()
+        h.start()
+        # In Python 3.3 _Event is gone, before Event is function
+        event_class = getattr(threading, '_Event', threading.Event)
+        assert isinstance(h.event_object(), event_class)
+
+    def test_matching_async(self):
+        h = self._makeOne()
+        h.start()
+        async = self._getAsync()
+        assert isinstance(h.async_result(), async)
+
+    def test_exception_raising(self):
+        h = self._makeOne()
+
+        @raises(h.timeout_exception)
+        def testit():
+            raise h.timeout_exception("This is a timeout")
+        testit()
+
+    def test_double_start_stop(self):
+        h = self._makeOne()
+        h.start()
+        self.assertTrue(h._running)
+        h.start()
+        h.stop()
+        h.stop()
+        self.assertFalse(h._running)
+
+
+class TestThreadingAsync(unittest.TestCase):
+    def _makeOne(self, *args):
+        from kazoo.handlers.threading import AsyncResult
+        return AsyncResult(*args)
+
+    def _makeHandler(self):
+        from kazoo.handlers.threading import SequentialThreadingHandler
+        return SequentialThreadingHandler()
+
+    def test_ready(self):
+        mock_handler = mock.Mock()
+        async = self._makeOne(mock_handler)
+
+        eq_(async.ready(), False)
+        async.set('val')
+        eq_(async.ready(), True)
+        eq_(async.successful(), True)
+        eq_(async.exception, None)
+
+    def test_callback_queued(self):
+        mock_handler = mock.Mock()
+        mock_handler.completion_queue = mock.Mock()
+        async = self._makeOne(mock_handler)
+
+        async.rawlink(lambda a: a)
+        async.set('val')
+
+        assert mock_handler.completion_queue.put.called
+
+    def test_set_exception(self):
+        mock_handler = mock.Mock()
+        mock_handler.completion_queue = mock.Mock()
+        async = self._makeOne(mock_handler)
+        async.rawlink(lambda a: a)
+        async.set_exception(ImportError('Error occured'))
+
+        assert isinstance(async.exception, ImportError)
+        assert mock_handler.completion_queue.put.called
+
+    def test_get_wait_while_setting(self):
+        mock_handler = mock.Mock()
+        async = self._makeOne(mock_handler)
+
+        lst = []
+        bv = threading.Event()
+        cv = threading.Event()
+
+        def wait_for_val():
+            bv.set()
+            val = async.get()
+            lst.append(val)
+            cv.set()
+        th = threading.Thread(target=wait_for_val)
+        th.start()
+        bv.wait()
+
+        async.set('fred')
+        cv.wait()
+        eq_(lst, ['fred'])
+        th.join()
+
+    def test_get_with_nowait(self):
+        mock_handler = mock.Mock()
+        async = self._makeOne(mock_handler)
+        timeout = self._makeHandler().timeout_exception
+
+        @raises(timeout)
+        def test_it():
+            async.get(block=False)
+        test_it()
+
+        @raises(timeout)
+        def test_nowait():
+            async.get_nowait()
+        test_nowait()
+
+    def test_get_with_exception(self):
+        mock_handler = mock.Mock()
+        async = self._makeOne(mock_handler)
+
+        lst = []
+        bv = threading.Event()
+        cv = threading.Event()
+
+        def wait_for_val():
+            bv.set()
+            try:
+                val = async.get()
+            except ImportError:
+                lst.append('oops')
+            else:
+                lst.append(val)
+            cv.set()
+        th = threading.Thread(target=wait_for_val)
+        th.start()
+        bv.wait()
+
+        async.set_exception(ImportError)
+        cv.wait()
+        eq_(lst, ['oops'])
+        th.join()
+
+    def test_wait(self):
+        mock_handler = mock.Mock()
+        async = self._makeOne(mock_handler)
+
+        lst = []
+        bv = threading.Event()
+        cv = threading.Event()
+
+        def wait_for_val():
+            bv.set()
+            try:
+                val = async.wait(10)
+            except ImportError:
+                lst.append('oops')
+            else:
+                lst.append(val)
+            cv.set()
+        th = threading.Thread(target=wait_for_val)
+        th.start()
+        bv.wait(10)
+
+        async.set("fred")
+        cv.wait(15)
+        eq_(lst, [True])
+        th.join()
+
+    def test_set_before_wait(self):
+        mock_handler = mock.Mock()
+        async = self._makeOne(mock_handler)
+
+        lst = []
+        cv = threading.Event()
+        async.set('fred')
+
+        def wait_for_val():
+            val = async.get()
+            lst.append(val)
+            cv.set()
+        th = threading.Thread(target=wait_for_val)
+        th.start()
+        cv.wait()
+        eq_(lst, ['fred'])
+        th.join()
+
+    def test_set_exc_before_wait(self):
+        mock_handler = mock.Mock()
+        async = self._makeOne(mock_handler)
+
+        lst = []
+        cv = threading.Event()
+        async.set_exception(ImportError)
+
+        def wait_for_val():
+            try:
+                val = async.get()
+            except ImportError:
+                lst.append('ooops')
+            else:
+                lst.append(val)
+            cv.set()
+        th = threading.Thread(target=wait_for_val)
+        th.start()
+        cv.wait()
+        eq_(lst, ['ooops'])
+        th.join()
+
+    def test_linkage(self):
+        mock_handler = mock.Mock()
+        async = self._makeOne(mock_handler)
+        cv = threading.Event()
+
+        lst = []
+
+        def add_on():
+            lst.append(True)
+
+        def wait_for_val():
+            async.get()
+            cv.set()
+
+        th = threading.Thread(target=wait_for_val)
+        th.start()
+
+        async.rawlink(add_on)
+        async.set('fred')
+        assert mock_handler.completion_queue.put.called
+        async.unlink(add_on)
+        cv.wait()
+        eq_(async.value, 'fred')
+        th.join()
+
+    def test_linkage_not_ready(self):
+        mock_handler = mock.Mock()
+        async = self._makeOne(mock_handler)
+
+        lst = []
+
+        def add_on():
+            lst.append(True)
+
+        async.set('fred')
+        assert not mock_handler.completion_queue.called
+        async.rawlink(add_on)
+        assert mock_handler.completion_queue.put.called
+
+    def test_link_and_unlink(self):
+        mock_handler = mock.Mock()
+        async = self._makeOne(mock_handler)
+
+        lst = []
+
+        def add_on():
+            lst.append(True)
+
+        async.rawlink(add_on)
+        assert not mock_handler.completion_queue.put.called
+        async.unlink(add_on)
+        async.set('fred')
+        assert not mock_handler.completion_queue.put.called
+
+    def test_captured_exception(self):
+        from kazoo.handlers.utils import capture_exceptions
+
+        mock_handler = mock.Mock()
+        async = self._makeOne(mock_handler)
+
+        @capture_exceptions(async)
+        def exceptional_function():
+            return 1/0
+
+        exceptional_function()
+
+        assert_raises(ZeroDivisionError, async.get)
+
+    def test_no_capture_exceptions(self):
+        from kazoo.handlers.utils import capture_exceptions
+
+        mock_handler = mock.Mock()
+        async = self._makeOne(mock_handler)
+
+        lst = []
+
+        def add_on():
+            lst.append(True)
+
+        async.rawlink(add_on)
+
+        @capture_exceptions(async)
+        def regular_function():
+            return True
+
+        regular_function()
+
+        assert not mock_handler.completion_queue.put.called
+
+    def test_wraps(self):
+        from kazoo.handlers.utils import wrap
+
+        mock_handler = mock.Mock()
+        async = self._makeOne(mock_handler)
+
+        lst = []
+
+        def add_on(result):
+            lst.append(result.get())
+
+        async.rawlink(add_on)
+
+        @wrap(async)
+        def regular_function():
+            return 'hello'
+
+        assert regular_function() == 'hello'
+        assert mock_handler.completion_queue.put.called
+        assert async.get() == 'hello'

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/tests/test_watchers.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/tests/test_watchers.py 
b/slider-agent/src/main/python/kazoo/tests/test_watchers.py
new file mode 100644
index 0000000..1c51d60
--- /dev/null
+++ b/slider-agent/src/main/python/kazoo/tests/test_watchers.py
@@ -0,0 +1,489 @@
+import time
+import threading
+import uuid
+
+from nose.tools import eq_
+from nose.tools import raises
+
+from kazoo.exceptions import KazooException
+from kazoo.protocol.states import EventType
+from kazoo.testing import KazooTestCase
+
+
+class KazooDataWatcherTests(KazooTestCase):
+    def setUp(self):
+        super(KazooDataWatcherTests, self).setUp()
+        self.path = "/" + uuid.uuid4().hex
+        self.client.ensure_path(self.path)
+
+    def test_data_watcher(self):
+        update = threading.Event()
+        data = [True]
+
+        # Make it a non-existent path
+        self.path += 'f'
+
+        @self.client.DataWatch(self.path)
+        def changed(d, stat):
+            data.pop()
+            data.append(d)
+            update.set()
+
+        update.wait(10)
+        eq_(data, [None])
+        update.clear()
+
+        self.client.create(self.path, b'fred')
+        update.wait(10)
+        eq_(data[0], b'fred')
+        update.clear()
+
+    def test_data_watcher_once(self):
+        update = threading.Event()
+        data = [True]
+
+        # Make it a non-existent path
+        self.path += 'f'
+
+        dwatcher = self.client.DataWatch(self.path)
+
+        @dwatcher
+        def changed(d, stat):
+            data.pop()
+            data.append(d)
+            update.set()
+
+        update.wait(10)
+        eq_(data, [None])
+        update.clear()
+
+        @raises(KazooException)
+        def test_it():
+            @dwatcher
+            def func(d, stat):
+                data.pop()
+        test_it()
+
+    def test_data_watcher_with_event(self):
+        # Test that the data watcher gets passed the event, if it
+        # accepts three arguments
+        update = threading.Event()
+        data = [True]
+
+        # Make it a non-existent path
+        self.path += 'f'
+
+        @self.client.DataWatch(self.path)
+        def changed(d, stat, event):
+            data.pop()
+            data.append(event)
+            update.set()
+
+        update.wait(10)
+        eq_(data, [None])
+        update.clear()
+
+        self.client.create(self.path, b'fred')
+        update.wait(10)
+        eq_(data[0].type, EventType.CREATED)
+        update.clear()
+
+    def test_func_style_data_watch(self):
+        update = threading.Event()
+        data = [True]
+
+        # Make it a non-existent path
+        path = self.path + 'f'
+
+        def changed(d, stat):
+            data.pop()
+            data.append(d)
+            update.set()
+        self.client.DataWatch(path, changed)
+
+        update.wait(10)
+        eq_(data, [None])
+        update.clear()
+
+        self.client.create(path, b'fred')
+        update.wait(10)
+        eq_(data[0], b'fred')
+        update.clear()
+
+    def test_datawatch_across_session_expire(self):
+        update = threading.Event()
+        data = [True]
+
+        @self.client.DataWatch(self.path)
+        def changed(d, stat):
+            data.pop()
+            data.append(d)
+            update.set()
+
+        update.wait(10)
+        eq_(data, [b""])
+        update.clear()
+
+        self.expire_session()
+        self.client.retry(self.client.set, self.path, b'fred')
+        update.wait(25)
+        eq_(data[0], b'fred')
+
+    def test_func_stops(self):
+        update = threading.Event()
+        data = [True]
+
+        self.path += "f"
+
+        fail_through = []
+
+        @self.client.DataWatch(self.path)
+        def changed(d, stat):
+            data.pop()
+            data.append(d)
+            update.set()
+            if fail_through:
+                return False
+
+        update.wait(10)
+        eq_(data, [None])
+        update.clear()
+
+        fail_through.append(True)
+        self.client.create(self.path, b'fred')
+        update.wait(10)
+        eq_(data[0], b'fred')
+        update.clear()
+
+        self.client.set(self.path, b'asdfasdf')
+        update.wait(0.2)
+        eq_(data[0], b'fred')
+
+        d, stat = self.client.get(self.path)
+        eq_(d, b'asdfasdf')
+
+    def test_no_such_node(self):
+        args = []
+
+        @self.client.DataWatch("/some/path")
+        def changed(d, stat):
+            args.extend([d, stat])
+
+        eq_(args, [None, None])
+
+    def test_bad_watch_func2(self):
+        counter = 0
+
+        @self.client.DataWatch(self.path)
+        def changed(d, stat):
+            if counter > 0:
+                raise Exception("oops")
+
+        raises(Exception)(changed)
+
+        counter += 1
+        self.client.set(self.path, b'asdfasdf')
+
+    def test_watcher_evaluating_to_false(self):
+        class WeirdWatcher(list):
+            def __call__(self, *args):
+                self.called = True
+        watcher = WeirdWatcher()
+        self.client.DataWatch(self.path, watcher)
+        self.client.set(self.path, b'mwahaha')
+        self.assertTrue(watcher.called)
+
+    def test_watcher_repeat_delete(self):
+        a = []
+        ev = threading.Event()
+
+        self.client.delete(self.path)
+
+        @self.client.DataWatch(self.path)
+        def changed(val, stat):
+            a.append(val)
+            ev.set()
+
+        eq_(a, [None])
+        ev.wait(10)
+        ev.clear()
+        self.client.create(self.path, b'blah')
+        ev.wait(10)
+        eq_(ev.is_set(), True)
+        ev.clear()
+        eq_(a, [None, b'blah'])
+        self.client.delete(self.path)
+        ev.wait(10)
+        eq_(ev.is_set(), True)
+        ev.clear()
+        eq_(a, [None, b'blah', None])
+        self.client.create(self.path, b'blah')
+        ev.wait(10)
+        eq_(ev.is_set(), True)
+        ev.clear()
+        eq_(a, [None, b'blah', None, b'blah'])
+
+    def test_watcher_with_closing(self):
+        a = []
+        ev = threading.Event()
+
+        self.client.delete(self.path)
+
+        @self.client.DataWatch(self.path)
+        def changed(val, stat):
+            a.append(val)
+            ev.set()
+        eq_(a, [None])
+
+        b = False
+        try:
+            self.client.stop()
+        except:
+            b = True
+        eq_(b, False)
+
+
+class KazooChildrenWatcherTests(KazooTestCase):
+    def setUp(self):
+        super(KazooChildrenWatcherTests, self).setUp()
+        self.path = "/" + uuid.uuid4().hex
+        self.client.ensure_path(self.path)
+
+    def test_child_watcher(self):
+        update = threading.Event()
+        all_children = ['fred']
+
+        @self.client.ChildrenWatch(self.path)
+        def changed(children):
+            while all_children:
+                all_children.pop()
+            all_children.extend(children)
+            update.set()
+
+        update.wait(10)
+        eq_(all_children, [])
+        update.clear()
+
+        self.client.create(self.path + '/' + 'smith')
+        update.wait(10)
+        eq_(all_children, ['smith'])
+        update.clear()
+
+        self.client.create(self.path + '/' + 'george')
+        update.wait(10)
+        eq_(sorted(all_children), ['george', 'smith'])
+
+    def test_child_watcher_once(self):
+        update = threading.Event()
+        all_children = ['fred']
+
+        cwatch = self.client.ChildrenWatch(self.path)
+
+        @cwatch
+        def changed(children):
+            while all_children:
+                all_children.pop()
+            all_children.extend(children)
+            update.set()
+
+        update.wait(10)
+        eq_(all_children, [])
+        update.clear()
+
+        @raises(KazooException)
+        def test_it():
+            @cwatch
+            def changed_again(children):
+                update.set()
+        test_it()
+
+    def test_child_watcher_with_event(self):
+        update = threading.Event()
+        events = [True]
+
+        @self.client.ChildrenWatch(self.path, send_event=True)
+        def changed(children, event):
+            events.pop()
+            events.append(event)
+            update.set()
+
+        update.wait(10)
+        eq_(events, [None])
+        update.clear()
+
+        self.client.create(self.path + '/' + 'smith')
+        update.wait(10)
+        eq_(events[0].type, EventType.CHILD)
+        update.clear()
+
+    def test_func_style_child_watcher(self):
+        update = threading.Event()
+        all_children = ['fred']
+
+        def changed(children):
+            while all_children:
+                all_children.pop()
+            all_children.extend(children)
+            update.set()
+
+        self.client.ChildrenWatch(self.path, changed)
+
+        update.wait(10)
+        eq_(all_children, [])
+        update.clear()
+
+        self.client.create(self.path + '/' + 'smith')
+        update.wait(10)
+        eq_(all_children, ['smith'])
+        update.clear()
+
+        self.client.create(self.path + '/' + 'george')
+        update.wait(10)
+        eq_(sorted(all_children), ['george', 'smith'])
+
+    def test_func_stops(self):
+        update = threading.Event()
+        all_children = ['fred']
+
+        fail_through = []
+
+        @self.client.ChildrenWatch(self.path)
+        def changed(children):
+            while all_children:
+                all_children.pop()
+            all_children.extend(children)
+            update.set()
+            if fail_through:
+                return False
+
+        update.wait(10)
+        eq_(all_children, [])
+        update.clear()
+
+        fail_through.append(True)
+        self.client.create(self.path + '/' + 'smith')
+        update.wait(10)
+        eq_(all_children, ['smith'])
+        update.clear()
+
+        self.client.create(self.path + '/' + 'george')
+        update.wait(0.5)
+        eq_(all_children, ['smith'])
+
+    def test_child_watch_session_loss(self):
+        update = threading.Event()
+        all_children = ['fred']
+
+        @self.client.ChildrenWatch(self.path)
+        def changed(children):
+            while all_children:
+                all_children.pop()
+            all_children.extend(children)
+            update.set()
+
+        update.wait(10)
+        eq_(all_children, [])
+        update.clear()
+
+        self.client.create(self.path + '/' + 'smith')
+        update.wait(10)
+        eq_(all_children, ['smith'])
+        update.clear()
+        self.expire_session()
+
+        self.client.retry(self.client.create,
+                          self.path + '/' + 'george')
+        update.wait(20)
+        eq_(sorted(all_children), ['george', 'smith'])
+
+    def test_child_stop_on_session_loss(self):
+        update = threading.Event()
+        all_children = ['fred']
+
+        @self.client.ChildrenWatch(self.path, allow_session_lost=False)
+        def changed(children):
+            while all_children:
+                all_children.pop()
+            all_children.extend(children)
+            update.set()
+
+        update.wait(10)
+        eq_(all_children, [])
+        update.clear()
+
+        self.client.create(self.path + '/' + 'smith')
+        update.wait(10)
+        eq_(all_children, ['smith'])
+        update.clear()
+        self.expire_session()
+
+        self.client.retry(self.client.create,
+                          self.path + '/' + 'george')
+        update.wait(4)
+        eq_(update.is_set(), False)
+        eq_(all_children, ['smith'])
+
+        children = self.client.get_children(self.path)
+        eq_(sorted(children), ['george', 'smith'])
+
+    def test_bad_children_watch_func(self):
+        counter = 0
+
+        @self.client.ChildrenWatch(self.path)
+        def changed(children):
+            if counter > 0:
+                raise Exception("oops")
+
+        raises(Exception)(changed)
+        counter += 1
+        self.client.create(self.path + '/' + 'smith')
+
+
+class KazooPatientChildrenWatcherTests(KazooTestCase):
+    def setUp(self):
+        super(KazooPatientChildrenWatcherTests, self).setUp()
+        self.path = "/" + uuid.uuid4().hex
+
+    def _makeOne(self, *args, **kwargs):
+        from kazoo.recipe.watchers import PatientChildrenWatch
+        return PatientChildrenWatch(*args, **kwargs)
+
+    def test_watch(self):
+        self.client.ensure_path(self.path)
+        watcher = self._makeOne(self.client, self.path, 0.1)
+        result = watcher.start()
+        children, asy = result.get()
+        eq_(len(children), 0)
+        eq_(asy.ready(), False)
+
+        self.client.create(self.path + '/' + 'fred')
+        asy.get(timeout=1)
+        eq_(asy.ready(), True)
+
+    def test_exception(self):
+        from kazoo.exceptions import NoNodeError
+        watcher = self._makeOne(self.client, self.path, 0.1)
+        result = watcher.start()
+
+        @raises(NoNodeError)
+        def testit():
+            result.get()
+        testit()
+
+    def test_watch_iterations(self):
+        self.client.ensure_path(self.path)
+        watcher = self._makeOne(self.client, self.path, 0.5)
+        result = watcher.start()
+        eq_(result.ready(), False)
+
+        time.sleep(0.08)
+        self.client.create(self.path + '/' + uuid.uuid4().hex)
+        eq_(result.ready(), False)
+        time.sleep(0.08)
+        eq_(result.ready(), False)
+        self.client.create(self.path + '/' + uuid.uuid4().hex)
+        time.sleep(0.08)
+        eq_(result.ready(), False)
+
+        children, asy = result.get()
+        eq_(len(children), 2)

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/tests/util.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/kazoo/tests/util.py 
b/slider-agent/src/main/python/kazoo/tests/util.py
new file mode 100644
index 0000000..298be71
--- /dev/null
+++ b/slider-agent/src/main/python/kazoo/tests/util.py
@@ -0,0 +1,126 @@
+##############################################################################
+#
+# Copyright Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+import logging
+import os
+import time
+
+TRAVIS = os.environ.get('TRAVIS', False)
+TRAVIS_ZK_VERSION = TRAVIS and os.environ.get('ZOOKEEPER_VERSION', None)
+if TRAVIS_ZK_VERSION:
+    TRAVIS_ZK_VERSION = tuple([int(n) for n in TRAVIS_ZK_VERSION.split('.')])
+
+
+class Handler(logging.Handler):
+
+    def __init__(self, *names, **kw):
+        logging.Handler.__init__(self)
+        self.names = names
+        self.records = []
+        self.setLoggerLevel(**kw)
+
+    def setLoggerLevel(self, level=1):
+        self.level = level
+        self.oldlevels = {}
+
+    def emit(self, record):
+        self.records.append(record)
+
+    def clear(self):
+        del self.records[:]
+
+    def install(self):
+        for name in self.names:
+            logger = logging.getLogger(name)
+            self.oldlevels[name] = logger.level
+            logger.setLevel(self.level)
+            logger.addHandler(self)
+
+    def uninstall(self):
+        for name in self.names:
+            logger = logging.getLogger(name)
+            logger.setLevel(self.oldlevels[name])
+            logger.removeHandler(self)
+
+    def __str__(self):
+        return '\n'.join(
+            [("%s %s\n  %s" %
+              (record.name, record.levelname,
+               '\n'.join([line
+                          for line in record.getMessage().split('\n')
+                          if line.strip()])
+               )
+              )
+              for record in self.records]
+              )
+
+
+class InstalledHandler(Handler):
+
+    def __init__(self, *names, **kw):
+        Handler.__init__(self, *names, **kw)
+        self.install()
+
+
+class Wait(object):
+
+    class TimeOutWaitingFor(Exception):
+        "A test condition timed out"
+
+    timeout = 9
+    wait = .01
+
+    def __init__(self, timeout=None, wait=None, exception=None,
+                 getnow=(lambda: time.time), getsleep=(lambda: time.sleep)):
+
+        if timeout is not None:
+            self.timeout = timeout
+
+        if wait is not None:
+            self.wait = wait
+
+        if exception is not None:
+            self.TimeOutWaitingFor = exception
+
+        self.getnow = getnow
+        self.getsleep = getsleep
+
+    def __call__(self, func=None, timeout=None, wait=None, message=None):
+        if func is None:
+            return lambda func: self(func, timeout, wait, message)
+
+        if func():
+            return
+
+        now = self.getnow()
+        sleep = self.getsleep()
+        if timeout is None:
+            timeout = self.timeout
+        if wait is None:
+            wait = self.wait
+        wait = float(wait)
+
+        deadline = now() + timeout
+        while 1:
+            sleep(wait)
+            if func():
+                return
+            if now() > deadline:
+                raise self.TimeOutWaitingFor(
+                    message or
+                    getattr(func, '__doc__') or
+                    getattr(func, '__name__')
+                    )
+
+wait = Wait()

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/test/python/agent/TestMain.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/test/python/agent/TestMain.py 
b/slider-agent/src/test/python/agent/TestMain.py
index 4ad7a8f..0d1e1de 100644
--- a/slider-agent/src/test/python/agent/TestMain.py
+++ b/slider-agent/src/test/python/agent/TestMain.py
@@ -260,13 +260,15 @@ class TestMain(unittest.TestCase):
     self.assertTrue(start_mock.called)
 
   class AgentOptions:
-      def __init__(self, label, host, port, secured_port, verbose, debug):
+      def __init__(self, label, host, port, secured_port, zk_quorum, 
zk_reg_path, verbose, debug):
           self.label = label
           self.host = host
           self.port = port
           self.secured_port = secured_port
           self.verbose = verbose
           self.debug = debug
+          self.zk_quorum = zk_quorum
+          self.zk_reg_path = zk_reg_path
 
   @patch.object(main, "setup_logging")
   @patch.object(main, "bind_signal_handlers")
@@ -292,7 +294,7 @@ class TestMain(unittest.TestCase):
       Controller_init_mock.return_value = None
       isAlive_mock.return_value = False
       parse_args_mock.return_value = (
-          TestMain.AgentOptions("agent", "host1", "8080", "8081", True, ""), 
[])
+          TestMain.AgentOptions("agent", "host1", "8080", "8081", 
"host1:2181", "/registry/org-apache-slider/cl1", True, ""), [])
       tmpdir = tempfile.gettempdir()
 
       #testing call without command-line arguments

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java 
b/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java
index c85ad91..38f55c2 100644
--- a/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java
+++ b/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java
@@ -167,4 +167,9 @@ public interface SliderKeys extends SliderXmlConfKeys {
   String PASSPHRASE = "DEV";
   String PASS_LEN = "50";
   String KEYSTORE_LOCATION = "ssl.server.keystore.location";
+
+  /**
+   * Python specific
+   */
+  String PYTHONPATH = "PYTHONPATH";
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
 
b/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
index 89b862e..a4a71dd 100644
--- 
a/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
+++ 
b/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.slider.api.ClusterDescription;
 import org.apache.slider.common.SliderKeys;
 import org.apache.slider.common.tools.ConfigHelper;
+import org.apache.slider.common.tools.SliderFileSystem;
 import org.apache.slider.common.tools.SliderUtils;
 import org.apache.slider.core.conf.AggregateConf;
 import org.apache.slider.core.exceptions.BadCommandArgumentsException;
@@ -146,6 +147,15 @@ public abstract class AbstractProviderService
 
   /**
    * No-op implementation of this method.
+   */
+  @Override
+  public void initializeApplicationConfiguration(
+      AggregateConf instanceDefinition, SliderFileSystem fileSystem)
+      throws IOException, SliderException {
+  }
+
+  /**
+   * No-op implementation of this method.
    *
    * {@inheritDoc}
    */
@@ -350,4 +360,12 @@ public abstract class AbstractProviderService
   public void addContainerRequest(AMRMClient.ContainerRequest req) {
     // no-op
   }
+
+  /**
+   * No-op implementation of this method.
+   */
+  @Override
+  public void rebuildContainerDetails(List<Container> liveContainers,
+      String applicationId, Map<Integer, ProviderRole> providerRoles) {
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java 
b/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
index e717158..0f5b4fb 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
@@ -115,6 +115,17 @@ public interface ProviderService extends ProviderCore,
     throws BadCommandArgumentsException, IOException;
 
   /**
+   * The application configuration should be initialized here
+   * 
+   * @param instanceDefinition
+   * @param fileSystem
+   * @throws IOException
+   * @throws SliderException
+   */
+  void initializeApplicationConfiguration(AggregateConf instanceDefinition,
+      SliderFileSystem fileSystem) throws IOException, SliderException;
+
+  /**
    * This is a validation of the application configuration on the AM.
    * Here is where things like the existence of keytabs and other
    * not-seen-client-side properties can be tested, before
@@ -180,4 +191,15 @@ public interface ProviderService extends ProviderCore,
    * @return the selector to use for choosing containers.
    */
   ContainerReleaseSelector createContainerReleaseSelector();
+
+  /**
+   * On AM restart (for whatever reason) this API is required to rebuild the AM
+   * internal state with the containers which were already assigned and running
+   * 
+   * @param liveContainers
+   * @param applicationId
+   * @param providerRoles
+   */
+  void rebuildContainerDetails(List<Container> liveContainers,
+      String applicationId, Map<Integer, ProviderRole> providerRoles);
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java 
b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java
index 31d09c4..419fa1a 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java
@@ -68,6 +68,8 @@ public interface AgentKeys {
   String ARG_HOST = "--host";
   String ARG_PORT = "--port";
   String ARG_SECURED_PORT = "--secured_port";
+  String ARG_ZOOKEEPER_QUORUM = "--zk-quorum";
+  String ARG_ZOOKEEPER_REGISTRY_PATH = "--zk-reg-path";
   String ARG_DEBUG = "--debug";
   String AGENT_MAIN_SCRIPT_ROOT = "./infra/agent/slider-agent/";
   String AGENT_MAIN_SCRIPT = "agent/main.py";

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
 
b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
index 419a454..59a6e29 100644
--- 
a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
+++ 
b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
@@ -19,6 +19,7 @@
 package org.apache.slider.providers.agent;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.curator.utils.ZKPaths;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.StringUtils;
@@ -27,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.slider.api.ClusterDescription;
 import org.apache.slider.api.ClusterDescriptionKeys;
 import org.apache.slider.api.ClusterNode;
@@ -39,6 +41,7 @@ import org.apache.slider.core.conf.AggregateConf;
 import org.apache.slider.core.conf.ConfTreeOperations;
 import org.apache.slider.core.conf.MapOperations;
 import org.apache.slider.core.exceptions.BadCommandArgumentsException;
+import org.apache.slider.core.exceptions.BadConfigException;
 import org.apache.slider.core.exceptions.SliderException;
 import org.apache.slider.core.launch.CommandLineBuilder;
 import org.apache.slider.core.launch.ContainerLauncher;
@@ -59,6 +62,8 @@ import 
org.apache.slider.providers.agent.application.metadata.Metainfo;
 import org.apache.slider.providers.agent.application.metadata.OSPackage;
 import org.apache.slider.providers.agent.application.metadata.OSSpecific;
 import 
org.apache.slider.server.appmaster.actions.ProviderReportedContainerLoss;
+import org.apache.slider.server.appmaster.state.ContainerPriority;
+import org.apache.slider.server.appmaster.state.RoleStatus;
 import org.apache.slider.server.appmaster.state.StateAccessForProviders;
 import org.apache.slider.server.appmaster.web.rest.agent.AgentCommandType;
 import org.apache.slider.server.appmaster.web.rest.agent.AgentRestOperations;
@@ -172,21 +177,11 @@ public class AgentProviderService extends 
AbstractProviderService implements
     clientProvider.validateInstanceDefinition(instanceDefinition);
   }
 
-  @Override
-  public void buildContainerLaunchContext(ContainerLauncher launcher,
-                                          AggregateConf instanceDefinition,
-                                          Container container,
-                                          String role,
-                                          SliderFileSystem fileSystem,
-                                          Path generatedConfPath,
-                                          MapOperations resourceComponent,
-                                          MapOperations appComponent,
-                                          Path containerTmpDirPath) throws
-      IOException,
-      SliderException {
-
-    String appDef = instanceDefinition.getAppConfOperations().
-        getGlobalOptions().getMandatoryOption(AgentKeys.APP_DEF);
+  // Reads the metainfo.xml in the application package and loads it
+  private void buildMetainfo(AggregateConf instanceDefinition,
+      SliderFileSystem fileSystem) throws IOException, SliderException {
+    String appDef = instanceDefinition.getAppConfOperations()
+        .getGlobalOptions().getMandatoryOption(AgentKeys.APP_DEF);
 
     if (metainfo == null) {
       synchronized (syncLock) {
@@ -197,15 +192,42 @@ public class AgentProviderService extends 
AbstractProviderService implements
           metainfo = getApplicationMetainfo(fileSystem, appDef);
           if (metainfo == null || metainfo.getApplication() == null) {
             log.error("metainfo.xml is unavailable or malformed at {}.", 
appDef);
-            throw new SliderException("metainfo.xml is required in app 
package.");
+            throw new SliderException(
+                "metainfo.xml is required in app package.");
           }
-
-          commandOrder = new 
ComponentCommandOrder(metainfo.getApplication().getCommandOrder());
+          commandOrder = new ComponentCommandOrder(metainfo.getApplication()
+              .getCommandOrder());
           monitor = new HeartbeatMonitor(this, getHeartbeatMonitorInterval());
           monitor.start();
         }
       }
     }
+  }
+
+  @Override
+  public void initializeApplicationConfiguration(
+      AggregateConf instanceDefinition, SliderFileSystem fileSystem)
+      throws IOException, SliderException {
+    buildMetainfo(instanceDefinition, fileSystem);
+  }
+
+  @Override
+  public void buildContainerLaunchContext(ContainerLauncher launcher,
+                                          AggregateConf instanceDefinition,
+                                          Container container,
+                                          String role,
+                                          SliderFileSystem fileSystem,
+                                          Path generatedConfPath,
+                                          MapOperations resourceComponent,
+                                          MapOperations appComponent,
+                                          Path containerTmpDirPath) throws
+      IOException,
+      SliderException {
+
+    String appDef = instanceDefinition.getAppConfOperations().
+        getGlobalOptions().getMandatoryOption(AgentKeys.APP_DEF);
+
+    initializeApplicationConfiguration(instanceDefinition, fileSystem);
 
     log.info("Build launch context for Agent");
     log.debug(instanceDefinition.toString());
@@ -235,6 +257,13 @@ public class AgentProviderService extends 
AbstractProviderService implements
       scriptPath = new File(appHome, AgentKeys.AGENT_MAIN_SCRIPT).getPath();
     }
 
+    // set PYTHONPATH
+    List<String> pythonPaths = new ArrayList<String>();
+    pythonPaths.add(AgentKeys.AGENT_MAIN_SCRIPT_ROOT);
+    String pythonPath = StringUtils.join(File.pathSeparator, pythonPaths);
+    launcher.setEnv(PYTHONPATH, pythonPath);
+    log.info("PYTHONPATH set to {}", pythonPath);
+
     String agentImage = instanceDefinition.getInternalOperations().
         get(OptionKeys.INTERNAL_APPLICATION_IMAGE_PATH);
     if (agentImage != null) {
@@ -273,12 +302,10 @@ public class AgentProviderService extends 
AbstractProviderService implements
 
     operation.add(scriptPath);
     operation.add(ARG_LABEL, label);
-    operation.add(ARG_HOST);
-    operation.add(getClusterInfoPropertyValue(StatusKeys.INFO_AM_HOSTNAME));
-    operation.add(ARG_PORT);
-    operation.add(getClusterInfoPropertyValue(StatusKeys.INFO_AM_AGENT_PORT));
-    operation.add(ARG_SECURED_PORT);
-    
operation.add(getClusterInfoPropertyValue(StatusKeys.INFO_AM_SECURED_AGENT_PORT));
+    operation.add(ARG_ZOOKEEPER_QUORUM);
+    operation.add(getClusterOptionPropertyValue(OptionKeys.ZOOKEEPER_QUORUM));
+    operation.add(ARG_ZOOKEEPER_REGISTRY_PATH);
+    operation.add(getZkRegistryPath());
 
     String debugCmd = agentLaunchParameter.getNextLaunchParameter(role);
     if (debugCmd != null && debugCmd.length() != 0) {
@@ -296,6 +323,36 @@ public class AgentProviderService extends 
AbstractProviderService implements
                                    
getClusterInfoPropertyValue(OptionKeys.APPLICATION_NAME)));
   }
 
+  // build the zookeeper registry path
+  private String getZkRegistryPath() {
+    String zkRegistryRoot = getConfig().get(REGISTRY_PATH,
+        DEFAULT_REGISTRY_PATH);
+    String appType = APP_TYPE;
+    String zkRegistryPath = ZKPaths.makePath(zkRegistryRoot, appType);
+    String clusterName = getAmState().getInternalsSnapshot().get(
+        OptionKeys.APPLICATION_NAME);
+    zkRegistryPath = ZKPaths.makePath(zkRegistryPath, clusterName);
+    return zkRegistryPath;
+  }
+
+  @Override
+  public void rebuildContainerDetails(List<Container> liveContainers,
+      String applicationId, Map<Integer, ProviderRole> providerRoleMap) {
+    for (Container container : liveContainers) {
+      // get the role name and label
+      Priority priority = container.getPriority();
+      String roleName = providerRoleMap.get(priority.getPriority()).name;
+      String label = getContainerLabel(container, roleName);
+      log.info("Rebuilding in-memory: container {} in role {} in cluster {}",
+          container.getId().toString(), roleName, applicationId);
+      getComponentStatuses()
+          .put(
+              label,
+              new ComponentInstanceState(roleName, container.getId(),
+                  applicationId));
+    }
+  }
+
   /**
    * Run this service
    *
@@ -330,6 +387,7 @@ public class AgentProviderService extends 
AbstractProviderService implements
    */
   @Override
   public RegistrationResponse handleRegistration(Register registration) {
+    log.info("Handling registration: " + registration);
     RegistrationResponse response = new RegistrationResponse();
     String label = registration.getHostname();
     if (getComponentStatuses().containsKey(label)) {
@@ -340,6 +398,7 @@ public class AgentProviderService extends 
AbstractProviderService implements
       response.setLog("Label not recognized.");
       log.warn("Received registration request from unknown label {}", label);
     }
+    log.info("Registration response: " + response);
     return response;
   }
 
@@ -350,12 +409,14 @@ public class AgentProviderService extends 
AbstractProviderService implements
    */
   @Override
   public HeartBeatResponse handleHeartBeat(HeartBeat heartBeat) {
+    log.debug("Handling heartbeat: " + heartBeat);
     HeartBeatResponse response = new HeartBeatResponse();
     long id = heartBeat.getResponseId();
     response.setResponseId(id + 1L);
 
     String label = heartBeat.getHostname();
     String roleName = getRoleName(label);
+    State agentState = heartBeat.getAgentState();
 
     String containerId = getContainerId(label);
     StateAccessForProviders accessor = getAmState();
@@ -372,6 +433,7 @@ public class AgentProviderService extends 
AbstractProviderService implements
 
     Boolean isMaster = isMaster(roleName);
     ComponentInstanceState componentStatus = getComponentStatuses().get(label);
+    updateComponentStatusWithAgentState(componentStatus, agentState);
     componentStatus.heartbeat(System.currentTimeMillis());
     // If no Master can explicitly publish then publish if its a master
     // Otherwise, wait till the master that can publish is ready
@@ -444,9 +506,17 @@ public class AgentProviderService extends 
AbstractProviderService implements
       log.warn("Component instance failed operation.", e);
     }
 
+    log.debug("Heartbeat response: " + response);
     return response;
   }
 
+  private void updateComponentStatusWithAgentState(
+      ComponentInstanceState componentStatus, State agentState) {
+    if (agentState != null) {
+      componentStatus.setState(agentState);
+    }
+  }
+
   @Override
   public Map<String, String> buildMonitorDetails(ClusterDescription 
clusterDesc) {
     Map<String, String> details = super.buildMonitorDetails(clusterDesc);
@@ -589,6 +659,14 @@ public class AgentProviderService extends 
AbstractProviderService implements
     return description.getInfo(name);
   }
 
+  protected String getClusterOptionPropertyValue(String name)
+      throws BadConfigException {
+    StateAccessForProviders accessor = getAmState();
+    assert accessor.isApplicationLive();
+    ClusterDescription description = accessor.getClusterStatus();
+    return description.getMandatoryOption(name);
+  }
+
   /**
    * Lost heartbeat from the container - release it and ask for a replacement
    * (async operation)

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
index 52f7f8f..43cda93 100644
--- 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -108,6 +108,7 @@ import 
org.apache.slider.server.appmaster.rpc.SliderClusterProtocolPBImpl;
 import org.apache.slider.server.appmaster.operations.AbstractRMOperation;
 import org.apache.slider.server.appmaster.state.AppState;
 import org.apache.slider.server.appmaster.state.ContainerAssignment;
+import org.apache.slider.server.appmaster.state.ContainerPriority;
 import org.apache.slider.server.appmaster.operations.ContainerReleaseOperation;
 import org.apache.slider.server.appmaster.state.ProviderAppState;
 import org.apache.slider.server.appmaster.operations.RMOperationHandler;
@@ -708,6 +709,9 @@ public class SliderAppMaster extends 
AbstractSliderLaunchedService
       Configuration providerConf =
         providerService.loadProviderConfigurationInformation(confDir);
 
+      providerService
+          .initializeApplicationConfiguration(instanceDefinition, fs);
+
       providerService.validateApplicationConfiguration(instanceDefinition, 
                                                        confDir,
                                                        securityEnabled);
@@ -726,6 +730,9 @@ public class SliderAppMaster extends 
AbstractSliderLaunchedService
           appInformation,
           new SimpleReleaseSelector());
 
+      providerService.rebuildContainerDetails(liveContainers,
+          instanceDefinition.getName(), appState.getRolePriorityMap());
+
       // add the AM to the list of nodes in the cluster
       
       appState.buildAppMasterNode(appMasterContainerID,

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
index 667fe99..cade115 100644
--- 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java
@@ -152,6 +152,9 @@ public class AppState {
   private final Map<String, ProviderRole> roles =
     new ConcurrentHashMap<>();
 
+  private final Map<Integer, ProviderRole> rolePriorityMap = 
+    new ConcurrentHashMap<>();
+
   /**
    * The master node.
    */
@@ -321,6 +324,10 @@ public class AppState {
     return roles;
   }
 
+  public Map<Integer, ProviderRole> getRolePriorityMap() {
+    return rolePriorityMap;
+  }
+
   private Map<ContainerId, RoleInstance> getStartingNodes() {
     return startingNodes;
   }
@@ -711,6 +718,7 @@ public class AppState {
     roleStatusMap.put(priority,
         new RoleStatus(providerRole));
     roles.put(providerRole.name, providerRole);
+    rolePriorityMap.put(priority, providerRole);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeat.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeat.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeat.java
index d3388f5..62df18d 100644
--- 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeat.java
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeat.java
@@ -18,6 +18,7 @@
 
 package org.apache.slider.server.appmaster.web.rest.agent;
 
+import org.apache.slider.providers.agent.State;
 import org.codehaus.jackson.annotate.JsonIgnoreProperties;
 import org.codehaus.jackson.annotate.JsonProperty;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
@@ -44,6 +45,7 @@ public class HeartBeat {
   HostStatus nodeStatus;
   private AgentEnv agentEnv = null;
   private String fqdn;
+  private State agentState;
 
   public long getResponseId() {
     return responseId;
@@ -123,6 +125,14 @@ public class HeartBeat {
     this.mounts = mounts;
   }
 
+  public State getAgentState() {
+    return agentState;
+  }
+
+  public void setAgentState(State agentState) {
+    this.agentState = agentState;
+  }
+
   @Override
   public String toString() {
     return "HeartBeat{" +
@@ -132,6 +142,7 @@ public class HeartBeat {
            ", reports=" + reports +
            ", componentStatus=" + componentStatus +
            ", nodeStatus=" + nodeStatus +
+           ", agentState=" + agentState +
            '}';
   }
 }

Reply via email to