http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/tests/test_lock.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/tests/test_lock.py b/slider-agent/src/main/python/kazoo/tests/test_lock.py new file mode 100644 index 0000000..36b6d42 --- /dev/null +++ b/slider-agent/src/main/python/kazoo/tests/test_lock.py @@ -0,0 +1,517 @@ +import uuid +import threading + +from nose.tools import eq_, ok_ + +from kazoo.exceptions import CancelledError +from kazoo.exceptions import LockTimeout +from kazoo.testing import KazooTestCase +from kazoo.tests.util import wait + + +class KazooLockTests(KazooTestCase): + def setUp(self): + super(KazooLockTests, self).setUp() + self.lockpath = "/" + uuid.uuid4().hex + + self.condition = threading.Condition() + self.released = threading.Event() + self.active_thread = None + self.cancelled_threads = [] + + def _thread_lock_acquire_til_event(self, name, lock, event): + try: + with lock: + with self.condition: + eq_(self.active_thread, None) + self.active_thread = name + self.condition.notify_all() + + event.wait() + + with self.condition: + eq_(self.active_thread, name) + self.active_thread = None + self.condition.notify_all() + self.released.set() + except CancelledError: + with self.condition: + self.cancelled_threads.append(name) + self.condition.notify_all() + + def test_lock_one(self): + lock_name = uuid.uuid4().hex + lock = self.client.Lock(self.lockpath, lock_name) + event = threading.Event() + + thread = threading.Thread(target=self._thread_lock_acquire_til_event, + args=(lock_name, lock, event)) + thread.start() + + lock2_name = uuid.uuid4().hex + anotherlock = self.client.Lock(self.lockpath, lock2_name) + + # wait for any contender to show up on the lock + wait(anotherlock.contenders) + eq_(anotherlock.contenders(), [lock_name]) + + with self.condition: + while self.active_thread != lock_name: + self.condition.wait() + + # release the lock + event.set() + + with self.condition: + while self.active_thread: + self.condition.wait() + self.released.wait() + thread.join() + + def test_lock(self): + threads = [] + names = ["contender" + str(i) for i in range(5)] + + contender_bits = {} + + for name in names: + e = threading.Event() + + l = self.client.Lock(self.lockpath, name) + t = threading.Thread(target=self._thread_lock_acquire_til_event, + args=(name, l, e)) + contender_bits[name] = (t, e) + threads.append(t) + + # acquire the lock ourselves first to make the others line up + lock = self.client.Lock(self.lockpath, "test") + lock.acquire() + + for t in threads: + t.start() + + # wait for everyone to line up on the lock + wait(lambda: len(lock.contenders()) == 6) + contenders = lock.contenders() + + eq_(contenders[0], "test") + contenders = contenders[1:] + remaining = list(contenders) + + # release the lock and contenders should claim it in order + lock.release() + + for contender in contenders: + thread, event = contender_bits[contender] + + with self.condition: + while not self.active_thread: + self.condition.wait() + eq_(self.active_thread, contender) + + eq_(lock.contenders(), remaining) + remaining = remaining[1:] + + event.set() + + with self.condition: + while self.active_thread: + self.condition.wait() + for thread in threads: + thread.join() + + def test_lock_reconnect(self): + event = threading.Event() + other_lock = self.client.Lock(self.lockpath, 'contender') + thread = threading.Thread(target=self._thread_lock_acquire_til_event, + args=('contender', other_lock, event)) + + # acquire the lock ourselves first to make the contender line up + lock = self.client.Lock(self.lockpath, "test") + lock.acquire() + + thread.start() + # wait for the contender to line up on the lock + wait(lambda: len(lock.contenders()) == 2) + eq_(lock.contenders(), ['test', 'contender']) + + self.expire_session() + + lock.release() + + with self.condition: + while not self.active_thread: + self.condition.wait() + eq_(self.active_thread, 'contender') + + event.set() + thread.join() + + def test_lock_non_blocking(self): + lock_name = uuid.uuid4().hex + lock = self.client.Lock(self.lockpath, lock_name) + event = threading.Event() + + thread = threading.Thread(target=self._thread_lock_acquire_til_event, + args=(lock_name, lock, event)) + thread.start() + + lock1 = self.client.Lock(self.lockpath, lock_name) + + # wait for the thread to acquire the lock + with self.condition: + if not self.active_thread: + self.condition.wait(5) + + ok_(not lock1.acquire(blocking=False)) + eq_(lock.contenders(), [lock_name]) # just one - itself + + event.set() + thread.join() + + def test_lock_fail_first_call(self): + event1 = threading.Event() + lock1 = self.client.Lock(self.lockpath, "one") + thread1 = threading.Thread(target=self._thread_lock_acquire_til_event, + args=("one", lock1, event1)) + thread1.start() + + # wait for this thread to acquire the lock + with self.condition: + if not self.active_thread: + self.condition.wait(5) + eq_(self.active_thread, "one") + eq_(lock1.contenders(), ["one"]) + event1.set() + thread1.join() + + def test_lock_cancel(self): + event1 = threading.Event() + lock1 = self.client.Lock(self.lockpath, "one") + thread1 = threading.Thread(target=self._thread_lock_acquire_til_event, + args=("one", lock1, event1)) + thread1.start() + + # wait for this thread to acquire the lock + with self.condition: + if not self.active_thread: + self.condition.wait(5) + eq_(self.active_thread, "one") + + client2 = self._get_client() + client2.start() + event2 = threading.Event() + lock2 = client2.Lock(self.lockpath, "two") + thread2 = threading.Thread(target=self._thread_lock_acquire_til_event, + args=("two", lock2, event2)) + thread2.start() + + # this one should block in acquire. check that it is a contender + wait(lambda: len(lock2.contenders()) > 1) + eq_(lock2.contenders(), ["one", "two"]) + + lock2.cancel() + with self.condition: + if not "two" in self.cancelled_threads: + self.condition.wait() + assert "two" in self.cancelled_threads + + eq_(lock2.contenders(), ["one"]) + + thread2.join() + event1.set() + thread1.join() + client2.stop() + + def test_lock_double_calls(self): + lock1 = self.client.Lock(self.lockpath, "one") + lock1.acquire() + lock1.acquire() + lock1.release() + lock1.release() + + def test_lock_reacquire(self): + lock = self.client.Lock(self.lockpath, "one") + lock.acquire() + lock.release() + lock.acquire() + lock.release() + + def test_lock_timeout(self): + timeout = 3 + e = threading.Event() + started = threading.Event() + + # In the background thread, acquire the lock and wait thrice the time + # that the main thread is going to wait to acquire the lock. + lock1 = self.client.Lock(self.lockpath, "one") + + def _thread(lock, event, timeout): + with lock: + started.set() + event.wait(timeout) + if not event.isSet(): + # Eventually fail to avoid hanging the tests + self.fail("lock2 never timed out") + + t = threading.Thread(target=_thread, args=(lock1, e, timeout * 3)) + t.start() + + # Start the main thread's kazoo client and try to acquire the lock + # but give up after `timeout` seconds + client2 = self._get_client() + client2.start() + started.wait(5) + self.assertTrue(started.isSet()) + lock2 = client2.Lock(self.lockpath, "two") + try: + lock2.acquire(timeout=timeout) + except LockTimeout: + # A timeout is the behavior we're expecting, since the background + # thread should still be holding onto the lock + pass + else: + self.fail("Main thread unexpectedly acquired the lock") + finally: + # Cleanup + e.set() + t.join() + client2.stop() + + +class TestSemaphore(KazooTestCase): + def setUp(self): + super(TestSemaphore, self).setUp() + self.lockpath = "/" + uuid.uuid4().hex + + self.condition = threading.Condition() + self.released = threading.Event() + self.active_thread = None + self.cancelled_threads = [] + + def test_basic(self): + sem1 = self.client.Semaphore(self.lockpath) + sem1.acquire() + sem1.release() + + def test_lock_one(self): + sem1 = self.client.Semaphore(self.lockpath, max_leases=1) + sem2 = self.client.Semaphore(self.lockpath, max_leases=1) + started = threading.Event() + event = threading.Event() + + sem1.acquire() + + def sema_one(): + started.set() + with sem2: + event.set() + + thread = threading.Thread(target=sema_one, args=()) + thread.start() + started.wait(10) + + self.assertFalse(event.is_set()) + + sem1.release() + event.wait(10) + self.assert_(event.is_set()) + thread.join() + + def test_non_blocking(self): + sem1 = self.client.Semaphore( + self.lockpath, identifier='sem1', max_leases=2) + sem2 = self.client.Semaphore( + self.lockpath, identifier='sem2', max_leases=2) + sem3 = self.client.Semaphore( + self.lockpath, identifier='sem3', max_leases=2) + + sem1.acquire() + sem2.acquire() + ok_(not sem3.acquire(blocking=False)) + eq_(set(sem1.lease_holders()), set(['sem1', 'sem2'])) + sem2.release() + # the next line isn't required, but avoids timing issues in tests + sem3.acquire() + eq_(set(sem1.lease_holders()), set(['sem1', 'sem3'])) + sem1.release() + sem3.release() + + def test_non_blocking_release(self): + sem1 = self.client.Semaphore( + self.lockpath, identifier='sem1', max_leases=1) + sem2 = self.client.Semaphore( + self.lockpath, identifier='sem2', max_leases=1) + sem1.acquire() + sem2.acquire(blocking=False) + + # make sure there's no shutdown / cleanup error + sem1.release() + sem2.release() + + def test_holders(self): + started = threading.Event() + event = threading.Event() + + def sema_one(): + with self.client.Semaphore(self.lockpath, 'fred', max_leases=1): + started.set() + event.wait() + + thread = threading.Thread(target=sema_one, args=()) + thread.start() + started.wait() + sem1 = self.client.Semaphore(self.lockpath) + holders = sem1.lease_holders() + eq_(holders, ['fred']) + event.set() + thread.join() + + def test_semaphore_cancel(self): + sem1 = self.client.Semaphore(self.lockpath, 'fred', max_leases=1) + sem2 = self.client.Semaphore(self.lockpath, 'george', max_leases=1) + sem1.acquire() + started = threading.Event() + event = threading.Event() + + def sema_one(): + started.set() + try: + with sem2: + started.set() + except CancelledError: + event.set() + + thread = threading.Thread(target=sema_one, args=()) + thread.start() + started.wait() + eq_(sem1.lease_holders(), ['fred']) + eq_(event.is_set(), False) + sem2.cancel() + event.wait() + eq_(event.is_set(), True) + thread.join() + + def test_multiple_acquire_and_release(self): + sem1 = self.client.Semaphore(self.lockpath, 'fred', max_leases=1) + sem1.acquire() + sem1.acquire() + + eq_(True, sem1.release()) + eq_(False, sem1.release()) + + def test_handle_session_loss(self): + expire_semaphore = self.client.Semaphore(self.lockpath, 'fred', + max_leases=1) + + client = self._get_client() + client.start() + lh_semaphore = client.Semaphore(self.lockpath, 'george', max_leases=1) + lh_semaphore.acquire() + + started = threading.Event() + event = threading.Event() + event2 = threading.Event() + + def sema_one(): + started.set() + with expire_semaphore: + event.set() + event2.wait() + + thread = threading.Thread(target=sema_one, args=()) + thread.start() + + started.wait() + eq_(lh_semaphore.lease_holders(), ['george']) + + # Fired in a separate thread to make sure we can see the effect + expired = threading.Event() + + def expire(): + self.expire_session() + expired.set() + + thread = threading.Thread(target=expire, args=()) + thread.start() + expire_semaphore.wake_event.wait() + expired.wait() + + lh_semaphore.release() + client.stop() + + event.wait(5) + eq_(expire_semaphore.lease_holders(), ['fred']) + event2.set() + thread.join() + + def test_inconsistent_max_leases(self): + sem1 = self.client.Semaphore(self.lockpath, max_leases=1) + sem2 = self.client.Semaphore(self.lockpath, max_leases=2) + + sem1.acquire() + self.assertRaises(ValueError, sem2.acquire) + + def test_inconsistent_max_leases_other_data(self): + sem1 = self.client.Semaphore(self.lockpath, max_leases=1) + sem2 = self.client.Semaphore(self.lockpath, max_leases=2) + + self.client.ensure_path(self.lockpath) + self.client.set(self.lockpath, b'a$') + + sem1.acquire() + # sem2 thinks it's ok to have two lease holders + ok_(sem2.acquire(blocking=False)) + + def test_reacquire(self): + lock = self.client.Semaphore(self.lockpath) + lock.acquire() + lock.release() + lock.acquire() + lock.release() + + def test_acquire_after_cancelled(self): + lock = self.client.Semaphore(self.lockpath) + self.assertTrue(lock.acquire()) + self.assertTrue(lock.release()) + lock.cancel() + self.assertTrue(lock.cancelled) + self.assertTrue(lock.acquire()) + + def test_timeout(self): + timeout = 3 + e = threading.Event() + started = threading.Event() + + # In the background thread, acquire the lock and wait thrice the time + # that the main thread is going to wait to acquire the lock. + sem1 = self.client.Semaphore(self.lockpath, "one") + + def _thread(sem, event, timeout): + with sem: + started.set() + event.wait(timeout) + if not event.isSet(): + # Eventually fail to avoid hanging the tests + self.fail("sem2 never timed out") + + t = threading.Thread(target=_thread, args=(sem1, e, timeout * 3)) + t.start() + + # Start the main thread's kazoo client and try to acquire the lock + # but give up after `timeout` seconds + client2 = self._get_client() + client2.start() + started.wait(5) + self.assertTrue(started.isSet()) + sem2 = client2.Semaphore(self.lockpath, "two") + try: + sem2.acquire(timeout=timeout) + except LockTimeout: + # A timeout is the behavior we're expecting, since the background + # thread will still be holding onto the lock + e.set() + finally: + # Cleanup + t.join() + client2.stop()
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/tests/test_partitioner.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/tests/test_partitioner.py b/slider-agent/src/main/python/kazoo/tests/test_partitioner.py new file mode 100644 index 0000000..a323d07 --- /dev/null +++ b/slider-agent/src/main/python/kazoo/tests/test_partitioner.py @@ -0,0 +1,92 @@ +import uuid +import time + +from nose.tools import eq_ + +from kazoo.testing import KazooTestCase +from kazoo.recipe.partitioner import PartitionState + + +class KazooPartitionerTests(KazooTestCase): + def setUp(self): + super(KazooPartitionerTests, self).setUp() + self.path = "/" + uuid.uuid4().hex + + def test_party_of_one(self): + partitioner = self.client.SetPartitioner( + self.path, set=(1, 2, 3), time_boundary=0.2) + partitioner.wait_for_acquire(14) + eq_(partitioner.state, PartitionState.ACQUIRED) + eq_(list(partitioner), [1, 2, 3]) + partitioner.finish() + + def test_party_of_two(self): + partitioners = [self.client.SetPartitioner(self.path, (1, 2), + identifier="p%s" % i, time_boundary=0.2) + for i in range(2)] + + partitioners[0].wait_for_acquire(14) + partitioners[1].wait_for_acquire(14) + eq_(list(partitioners[0]), [1]) + eq_(list(partitioners[1]), [2]) + partitioners[0].finish() + time.sleep(0.1) + eq_(partitioners[1].release, True) + partitioners[1].finish() + + def test_party_expansion(self): + partitioners = [self.client.SetPartitioner(self.path, (1, 2, 3), + identifier="p%s" % i, time_boundary=0.2) + for i in range(2)] + + partitioners[0].wait_for_acquire(14) + partitioners[1].wait_for_acquire(14) + eq_(partitioners[0].state, PartitionState.ACQUIRED) + eq_(partitioners[1].state, PartitionState.ACQUIRED) + + eq_(list(partitioners[0]), [1, 3]) + eq_(list(partitioners[1]), [2]) + + # Add another partition, wait till they settle + partitioners.append(self.client.SetPartitioner(self.path, (1, 2, 3), + identifier="p2", time_boundary=0.2)) + time.sleep(0.1) + eq_(partitioners[0].release, True) + for p in partitioners[:-1]: + p.release_set() + + for p in partitioners: + p.wait_for_acquire(14) + + eq_(list(partitioners[0]), [1]) + eq_(list(partitioners[1]), [2]) + eq_(list(partitioners[2]), [3]) + + for p in partitioners: + p.finish() + + def test_more_members_than_set_items(self): + partitioners = [self.client.SetPartitioner(self.path, (1,), + identifier="p%s" % i, time_boundary=0.2) + for i in range(2)] + + partitioners[0].wait_for_acquire(14) + partitioners[1].wait_for_acquire(14) + eq_(partitioners[0].state, PartitionState.ACQUIRED) + eq_(partitioners[1].state, PartitionState.ACQUIRED) + + eq_(list(partitioners[0]), [1]) + eq_(list(partitioners[1]), []) + + for p in partitioners: + p.finish() + + def test_party_session_failure(self): + partitioner = self.client.SetPartitioner( + self.path, set=(1, 2, 3), time_boundary=0.2) + partitioner.wait_for_acquire(14) + eq_(partitioner.state, PartitionState.ACQUIRED) + # simulate session failure + partitioner._fail_out() + partitioner.release_set() + self.assertTrue(partitioner.failed) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/tests/test_party.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/tests/test_party.py b/slider-agent/src/main/python/kazoo/tests/test_party.py new file mode 100644 index 0000000..d44eec7 --- /dev/null +++ b/slider-agent/src/main/python/kazoo/tests/test_party.py @@ -0,0 +1,84 @@ +import uuid + +from nose.tools import eq_ + +from kazoo.testing import KazooTestCase + + +class KazooPartyTests(KazooTestCase): + def setUp(self): + super(KazooPartyTests, self).setUp() + self.path = "/" + uuid.uuid4().hex + + def test_party(self): + parties = [self.client.Party(self.path, "p%s" % i) + for i in range(5)] + + one_party = parties[0] + + eq_(list(one_party), []) + eq_(len(one_party), 0) + + participants = set() + for party in parties: + party.join() + participants.add(party.data.decode('utf-8')) + + eq_(set(party), participants) + eq_(len(party), len(participants)) + + for party in parties: + party.leave() + participants.remove(party.data.decode('utf-8')) + + eq_(set(party), participants) + eq_(len(party), len(participants)) + + def test_party_reuse_node(self): + party = self.client.Party(self.path, "p1") + self.client.ensure_path(self.path) + self.client.create(party.create_path) + party.join() + self.assertTrue(party.participating) + party.leave() + self.assertFalse(party.participating) + self.assertEqual(len(party), 0) + + def test_party_vanishing_node(self): + party = self.client.Party(self.path, "p1") + party.join() + self.assertTrue(party.participating) + self.client.delete(party.create_path) + party.leave() + self.assertFalse(party.participating) + self.assertEqual(len(party), 0) + + +class KazooShallowPartyTests(KazooTestCase): + def setUp(self): + super(KazooShallowPartyTests, self).setUp() + self.path = "/" + uuid.uuid4().hex + + def test_party(self): + parties = [self.client.ShallowParty(self.path, "p%s" % i) + for i in range(5)] + + one_party = parties[0] + + eq_(list(one_party), []) + eq_(len(one_party), 0) + + participants = set() + for party in parties: + party.join() + participants.add(party.data.decode('utf-8')) + + eq_(set(party), participants) + eq_(len(party), len(participants)) + + for party in parties: + party.leave() + participants.remove(party.data.decode('utf-8')) + + eq_(set(party), participants) + eq_(len(party), len(participants)) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/tests/test_paths.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/tests/test_paths.py b/slider-agent/src/main/python/kazoo/tests/test_paths.py new file mode 100644 index 0000000..e092196 --- /dev/null +++ b/slider-agent/src/main/python/kazoo/tests/test_paths.py @@ -0,0 +1,98 @@ +import sys +from unittest import TestCase + +from kazoo.protocol import paths + + +if sys.version_info > (3, ): # pragma: nocover + def u(s): + return s +else: # pragma: nocover + def u(s): + return unicode(s, "unicode_escape") + + +class NormPathTestCase(TestCase): + + def test_normpath(self): + self.assertEqual(paths.normpath('/a/b'), '/a/b') + + def test_normpath_empty(self): + self.assertEqual(paths.normpath(''), '') + + def test_normpath_unicode(self): + self.assertEqual(paths.normpath(u('/\xe4/b')), u('/\xe4/b')) + + def test_normpath_dots(self): + self.assertEqual(paths.normpath('/a./b../c'), '/a./b../c') + + def test_normpath_slash(self): + self.assertEqual(paths.normpath('/'), '/') + + def test_normpath_multiple_slashes(self): + self.assertEqual(paths.normpath('//'), '/') + self.assertEqual(paths.normpath('//a/b'), '/a/b') + self.assertEqual(paths.normpath('/a//b//'), '/a/b') + self.assertEqual(paths.normpath('//a////b///c/'), '/a/b/c') + + def test_normpath_relative(self): + self.assertRaises(ValueError, paths.normpath, './a/b') + self.assertRaises(ValueError, paths.normpath, '/a/../b') + + +class JoinTestCase(TestCase): + + def test_join(self): + self.assertEqual(paths.join('/a'), '/a') + self.assertEqual(paths.join('/a', 'b/'), '/a/b/') + self.assertEqual(paths.join('/a', 'b', 'c'), '/a/b/c') + + def test_join_empty(self): + self.assertEqual(paths.join(''), '') + self.assertEqual(paths.join('', 'a', 'b'), 'a/b') + self.assertEqual(paths.join('/a', '', 'b/', 'c'), '/a/b/c') + + def test_join_absolute(self): + self.assertEqual(paths.join('/a/b', '/c'), '/c') + + +class IsAbsTestCase(TestCase): + + def test_isabs(self): + self.assertTrue(paths.isabs('/')) + self.assertTrue(paths.isabs('/a')) + self.assertTrue(paths.isabs('/a//b/c')) + self.assertTrue(paths.isabs('//a/b')) + + def test_isabs_false(self): + self.assertFalse(paths.isabs('')) + self.assertFalse(paths.isabs('a/')) + self.assertFalse(paths.isabs('a/../')) + + +class BaseNameTestCase(TestCase): + + def test_basename(self): + self.assertEquals(paths.basename(''), '') + self.assertEquals(paths.basename('/'), '') + self.assertEquals(paths.basename('//a'), 'a') + self.assertEquals(paths.basename('//a/'), '') + self.assertEquals(paths.basename('/a/b.//c..'), 'c..') + + +class PrefixRootTestCase(TestCase): + + def test_prefix_root(self): + self.assertEquals(paths._prefix_root('/a/', 'b/c'), '/a/b/c') + self.assertEquals(paths._prefix_root('/a/b', 'c/d'), '/a/b/c/d') + self.assertEquals(paths._prefix_root('/a', '/b/c'), '/a/b/c') + self.assertEquals(paths._prefix_root('/a', '//b/c.'), '/a/b/c.') + + +class NormRootTestCase(TestCase): + + def test_norm_root(self): + self.assertEquals(paths._norm_root(''), '/') + self.assertEquals(paths._norm_root('/'), '/') + self.assertEquals(paths._norm_root('//a'), '/a') + self.assertEquals(paths._norm_root('//a./b'), '/a./b') http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/tests/test_queue.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/tests/test_queue.py b/slider-agent/src/main/python/kazoo/tests/test_queue.py new file mode 100644 index 0000000..6a9ec68 --- /dev/null +++ b/slider-agent/src/main/python/kazoo/tests/test_queue.py @@ -0,0 +1,179 @@ +import uuid + +from nose import SkipTest +from nose.tools import eq_, ok_ + +from kazoo.testing import KazooTestCase +from kazoo.tests.util import TRAVIS_ZK_VERSION + + +class KazooQueueTests(KazooTestCase): + + def _makeOne(self): + path = "/" + uuid.uuid4().hex + return self.client.Queue(path) + + def test_queue_validation(self): + queue = self._makeOne() + self.assertRaises(TypeError, queue.put, {}) + self.assertRaises(TypeError, queue.put, b"one", b"100") + self.assertRaises(TypeError, queue.put, b"one", 10.0) + self.assertRaises(ValueError, queue.put, b"one", -100) + self.assertRaises(ValueError, queue.put, b"one", 100000) + + def test_empty_queue(self): + queue = self._makeOne() + eq_(len(queue), 0) + self.assertTrue(queue.get() is None) + eq_(len(queue), 0) + + def test_queue(self): + queue = self._makeOne() + queue.put(b"one") + queue.put(b"two") + queue.put(b"three") + eq_(len(queue), 3) + + eq_(queue.get(), b"one") + eq_(queue.get(), b"two") + eq_(queue.get(), b"three") + eq_(len(queue), 0) + + def test_priority(self): + queue = self._makeOne() + queue.put(b"four", priority=101) + queue.put(b"one", priority=0) + queue.put(b"two", priority=0) + queue.put(b"three", priority=10) + + eq_(queue.get(), b"one") + eq_(queue.get(), b"two") + eq_(queue.get(), b"three") + eq_(queue.get(), b"four") + + +class KazooLockingQueueTests(KazooTestCase): + + def setUp(self): + KazooTestCase.setUp(self) + skip = False + if TRAVIS_ZK_VERSION and TRAVIS_ZK_VERSION < (3, 4): + skip = True + elif TRAVIS_ZK_VERSION and TRAVIS_ZK_VERSION >= (3, 4): + skip = False + else: + ver = self.client.server_version() + if ver[1] < 4: + skip = True + if skip: + raise SkipTest("Must use Zookeeper 3.4 or above") + + def _makeOne(self): + path = "/" + uuid.uuid4().hex + return self.client.LockingQueue(path) + + def test_queue_validation(self): + queue = self._makeOne() + self.assertRaises(TypeError, queue.put, {}) + self.assertRaises(TypeError, queue.put, b"one", b"100") + self.assertRaises(TypeError, queue.put, b"one", 10.0) + self.assertRaises(ValueError, queue.put, b"one", -100) + self.assertRaises(ValueError, queue.put, b"one", 100000) + self.assertRaises(TypeError, queue.put_all, {}) + self.assertRaises(TypeError, queue.put_all, [{}]) + self.assertRaises(TypeError, queue.put_all, [b"one"], b"100") + self.assertRaises(TypeError, queue.put_all, [b"one"], 10.0) + self.assertRaises(ValueError, queue.put_all, [b"one"], -100) + self.assertRaises(ValueError, queue.put_all, [b"one"], 100000) + + def test_empty_queue(self): + queue = self._makeOne() + eq_(len(queue), 0) + self.assertTrue(queue.get(0) is None) + eq_(len(queue), 0) + + def test_queue(self): + queue = self._makeOne() + queue.put(b"one") + queue.put_all([b"two", b"three"]) + eq_(len(queue), 3) + + ok_(not queue.consume()) + ok_(not queue.holds_lock()) + eq_(queue.get(1), b"one") + ok_(queue.holds_lock()) + # Without consuming, should return the same element + eq_(queue.get(1), b"one") + ok_(queue.consume()) + ok_(not queue.holds_lock()) + eq_(queue.get(1), b"two") + ok_(queue.holds_lock()) + ok_(queue.consume()) + ok_(not queue.holds_lock()) + eq_(queue.get(1), b"three") + ok_(queue.holds_lock()) + ok_(queue.consume()) + ok_(not queue.holds_lock()) + ok_(not queue.consume()) + eq_(len(queue), 0) + + def test_consume(self): + queue = self._makeOne() + + queue.put(b"one") + ok_(not queue.consume()) + queue.get(.1) + ok_(queue.consume()) + ok_(not queue.consume()) + + def test_holds_lock(self): + queue = self._makeOne() + + ok_(not queue.holds_lock()) + queue.put(b"one") + queue.get(.1) + ok_(queue.holds_lock()) + queue.consume() + ok_(not queue.holds_lock()) + + def test_priority(self): + queue = self._makeOne() + queue.put(b"four", priority=101) + queue.put(b"one", priority=0) + queue.put(b"two", priority=0) + queue.put(b"three", priority=10) + + eq_(queue.get(1), b"one") + ok_(queue.consume()) + eq_(queue.get(1), b"two") + ok_(queue.consume()) + eq_(queue.get(1), b"three") + ok_(queue.consume()) + eq_(queue.get(1), b"four") + ok_(queue.consume()) + + def test_concurrent_execution(self): + queue = self._makeOne() + value1 = [] + value2 = [] + value3 = [] + event1 = self.client.handler.event_object() + event2 = self.client.handler.event_object() + event3 = self.client.handler.event_object() + + def get_concurrently(value, event): + q = self.client.LockingQueue(queue.path) + value.append(q.get(.1)) + event.set() + + self.client.handler.spawn(get_concurrently, value1, event1) + self.client.handler.spawn(get_concurrently, value2, event2) + self.client.handler.spawn(get_concurrently, value3, event3) + queue.put(b"one") + event1.wait(.2) + event2.wait(.2) + event3.wait(.2) + + result = value1 + value2 + value3 + eq_(result.count(b"one"), 1) + eq_(result.count(None), 2) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/tests/test_retry.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/tests/test_retry.py b/slider-agent/src/main/python/kazoo/tests/test_retry.py new file mode 100644 index 0000000..b85739b --- /dev/null +++ b/slider-agent/src/main/python/kazoo/tests/test_retry.py @@ -0,0 +1,77 @@ +import unittest + +from nose.tools import eq_ + + +class TestRetrySleeper(unittest.TestCase): + + def _pass(self): + pass + + def _fail(self, times=1): + from kazoo.retry import ForceRetryError + scope = dict(times=0) + + def inner(): + if scope['times'] >= times: + pass + else: + scope['times'] += 1 + raise ForceRetryError('Failed!') + return inner + + def _makeOne(self, *args, **kwargs): + from kazoo.retry import KazooRetry + return KazooRetry(*args, **kwargs) + + def test_reset(self): + retry = self._makeOne(delay=0, max_tries=2) + retry(self._fail()) + eq_(retry._attempts, 1) + retry.reset() + eq_(retry._attempts, 0) + + def test_too_many_tries(self): + from kazoo.retry import RetryFailedError + retry = self._makeOne(delay=0) + self.assertRaises(RetryFailedError, retry, self._fail(times=999)) + eq_(retry._attempts, 1) + + def test_maximum_delay(self): + def sleep_func(_time): + pass + + retry = self._makeOne(delay=10, max_tries=100, sleep_func=sleep_func) + retry(self._fail(times=10)) + self.assertTrue(retry._cur_delay < 4000, retry._cur_delay) + # gevent's sleep function is picky about the type + eq_(type(retry._cur_delay), float) + + def test_copy(self): + _sleep = lambda t: None + retry = self._makeOne(sleep_func=_sleep) + rcopy = retry.copy() + self.assertTrue(rcopy.sleep_func is _sleep) + + +class TestKazooRetry(unittest.TestCase): + + def _makeOne(self, **kw): + from kazoo.retry import KazooRetry + return KazooRetry(**kw) + + def test_connection_closed(self): + from kazoo.exceptions import ConnectionClosedError + retry = self._makeOne() + + def testit(): + raise ConnectionClosedError() + self.assertRaises(ConnectionClosedError, retry, testit) + + def test_session_expired(self): + from kazoo.exceptions import SessionExpiredError + retry = self._makeOne(max_tries=1) + + def testit(): + raise SessionExpiredError() + self.assertRaises(Exception, retry, testit) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/tests/test_security.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/tests/test_security.py b/slider-agent/src/main/python/kazoo/tests/test_security.py new file mode 100644 index 0000000..4a7e670 --- /dev/null +++ b/slider-agent/src/main/python/kazoo/tests/test_security.py @@ -0,0 +1,40 @@ +import unittest + +from nose.tools import eq_ +from kazoo.security import Permissions + + +class TestACL(unittest.TestCase): + def _makeOne(self, *args, **kwargs): + from kazoo.security import make_acl + return make_acl(*args, **kwargs) + + def test_read_acl(self): + acl = self._makeOne("digest", ":", read=True) + eq_(acl.perms & Permissions.READ, Permissions.READ) + + def test_all_perms(self): + acl = self._makeOne("digest", ":", read=True, write=True, + create=True, delete=True, admin=True) + for perm in [Permissions.READ, Permissions.CREATE, Permissions.WRITE, + Permissions.DELETE, Permissions.ADMIN]: + eq_(acl.perms & perm, perm) + + def test_perm_listing(self): + from kazoo.security import ACL + f = ACL(15, 'fred') + self.assert_('READ' in f.acl_list) + self.assert_('WRITE' in f.acl_list) + self.assert_('CREATE' in f.acl_list) + self.assert_('DELETE' in f.acl_list) + + f = ACL(16, 'fred') + self.assert_('ADMIN' in f.acl_list) + + f = ACL(31, 'george') + self.assert_('ALL' in f.acl_list) + + def test_perm_repr(self): + from kazoo.security import ACL + f = ACL(16, 'fred') + self.assert_("ACL(perms=16, acl_list=['ADMIN']" in repr(f)) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/tests/test_threading_handler.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/tests/test_threading_handler.py b/slider-agent/src/main/python/kazoo/tests/test_threading_handler.py new file mode 100644 index 0000000..b4b93f1 --- /dev/null +++ b/slider-agent/src/main/python/kazoo/tests/test_threading_handler.py @@ -0,0 +1,326 @@ +import threading +import unittest + +import mock +from nose.tools import assert_raises +from nose.tools import eq_ +from nose.tools import raises + + +class TestThreadingHandler(unittest.TestCase): + def _makeOne(self, *args): + from kazoo.handlers.threading import SequentialThreadingHandler + return SequentialThreadingHandler(*args) + + def _getAsync(self, *args): + from kazoo.handlers.threading import AsyncResult + return AsyncResult + + def test_proper_threading(self): + h = self._makeOne() + h.start() + # In Python 3.3 _Event is gone, before Event is function + event_class = getattr(threading, '_Event', threading.Event) + assert isinstance(h.event_object(), event_class) + + def test_matching_async(self): + h = self._makeOne() + h.start() + async = self._getAsync() + assert isinstance(h.async_result(), async) + + def test_exception_raising(self): + h = self._makeOne() + + @raises(h.timeout_exception) + def testit(): + raise h.timeout_exception("This is a timeout") + testit() + + def test_double_start_stop(self): + h = self._makeOne() + h.start() + self.assertTrue(h._running) + h.start() + h.stop() + h.stop() + self.assertFalse(h._running) + + +class TestThreadingAsync(unittest.TestCase): + def _makeOne(self, *args): + from kazoo.handlers.threading import AsyncResult + return AsyncResult(*args) + + def _makeHandler(self): + from kazoo.handlers.threading import SequentialThreadingHandler + return SequentialThreadingHandler() + + def test_ready(self): + mock_handler = mock.Mock() + async = self._makeOne(mock_handler) + + eq_(async.ready(), False) + async.set('val') + eq_(async.ready(), True) + eq_(async.successful(), True) + eq_(async.exception, None) + + def test_callback_queued(self): + mock_handler = mock.Mock() + mock_handler.completion_queue = mock.Mock() + async = self._makeOne(mock_handler) + + async.rawlink(lambda a: a) + async.set('val') + + assert mock_handler.completion_queue.put.called + + def test_set_exception(self): + mock_handler = mock.Mock() + mock_handler.completion_queue = mock.Mock() + async = self._makeOne(mock_handler) + async.rawlink(lambda a: a) + async.set_exception(ImportError('Error occured')) + + assert isinstance(async.exception, ImportError) + assert mock_handler.completion_queue.put.called + + def test_get_wait_while_setting(self): + mock_handler = mock.Mock() + async = self._makeOne(mock_handler) + + lst = [] + bv = threading.Event() + cv = threading.Event() + + def wait_for_val(): + bv.set() + val = async.get() + lst.append(val) + cv.set() + th = threading.Thread(target=wait_for_val) + th.start() + bv.wait() + + async.set('fred') + cv.wait() + eq_(lst, ['fred']) + th.join() + + def test_get_with_nowait(self): + mock_handler = mock.Mock() + async = self._makeOne(mock_handler) + timeout = self._makeHandler().timeout_exception + + @raises(timeout) + def test_it(): + async.get(block=False) + test_it() + + @raises(timeout) + def test_nowait(): + async.get_nowait() + test_nowait() + + def test_get_with_exception(self): + mock_handler = mock.Mock() + async = self._makeOne(mock_handler) + + lst = [] + bv = threading.Event() + cv = threading.Event() + + def wait_for_val(): + bv.set() + try: + val = async.get() + except ImportError: + lst.append('oops') + else: + lst.append(val) + cv.set() + th = threading.Thread(target=wait_for_val) + th.start() + bv.wait() + + async.set_exception(ImportError) + cv.wait() + eq_(lst, ['oops']) + th.join() + + def test_wait(self): + mock_handler = mock.Mock() + async = self._makeOne(mock_handler) + + lst = [] + bv = threading.Event() + cv = threading.Event() + + def wait_for_val(): + bv.set() + try: + val = async.wait(10) + except ImportError: + lst.append('oops') + else: + lst.append(val) + cv.set() + th = threading.Thread(target=wait_for_val) + th.start() + bv.wait(10) + + async.set("fred") + cv.wait(15) + eq_(lst, [True]) + th.join() + + def test_set_before_wait(self): + mock_handler = mock.Mock() + async = self._makeOne(mock_handler) + + lst = [] + cv = threading.Event() + async.set('fred') + + def wait_for_val(): + val = async.get() + lst.append(val) + cv.set() + th = threading.Thread(target=wait_for_val) + th.start() + cv.wait() + eq_(lst, ['fred']) + th.join() + + def test_set_exc_before_wait(self): + mock_handler = mock.Mock() + async = self._makeOne(mock_handler) + + lst = [] + cv = threading.Event() + async.set_exception(ImportError) + + def wait_for_val(): + try: + val = async.get() + except ImportError: + lst.append('ooops') + else: + lst.append(val) + cv.set() + th = threading.Thread(target=wait_for_val) + th.start() + cv.wait() + eq_(lst, ['ooops']) + th.join() + + def test_linkage(self): + mock_handler = mock.Mock() + async = self._makeOne(mock_handler) + cv = threading.Event() + + lst = [] + + def add_on(): + lst.append(True) + + def wait_for_val(): + async.get() + cv.set() + + th = threading.Thread(target=wait_for_val) + th.start() + + async.rawlink(add_on) + async.set('fred') + assert mock_handler.completion_queue.put.called + async.unlink(add_on) + cv.wait() + eq_(async.value, 'fred') + th.join() + + def test_linkage_not_ready(self): + mock_handler = mock.Mock() + async = self._makeOne(mock_handler) + + lst = [] + + def add_on(): + lst.append(True) + + async.set('fred') + assert not mock_handler.completion_queue.called + async.rawlink(add_on) + assert mock_handler.completion_queue.put.called + + def test_link_and_unlink(self): + mock_handler = mock.Mock() + async = self._makeOne(mock_handler) + + lst = [] + + def add_on(): + lst.append(True) + + async.rawlink(add_on) + assert not mock_handler.completion_queue.put.called + async.unlink(add_on) + async.set('fred') + assert not mock_handler.completion_queue.put.called + + def test_captured_exception(self): + from kazoo.handlers.utils import capture_exceptions + + mock_handler = mock.Mock() + async = self._makeOne(mock_handler) + + @capture_exceptions(async) + def exceptional_function(): + return 1/0 + + exceptional_function() + + assert_raises(ZeroDivisionError, async.get) + + def test_no_capture_exceptions(self): + from kazoo.handlers.utils import capture_exceptions + + mock_handler = mock.Mock() + async = self._makeOne(mock_handler) + + lst = [] + + def add_on(): + lst.append(True) + + async.rawlink(add_on) + + @capture_exceptions(async) + def regular_function(): + return True + + regular_function() + + assert not mock_handler.completion_queue.put.called + + def test_wraps(self): + from kazoo.handlers.utils import wrap + + mock_handler = mock.Mock() + async = self._makeOne(mock_handler) + + lst = [] + + def add_on(result): + lst.append(result.get()) + + async.rawlink(add_on) + + @wrap(async) + def regular_function(): + return 'hello' + + assert regular_function() == 'hello' + assert mock_handler.completion_queue.put.called + assert async.get() == 'hello' http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/tests/test_watchers.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/tests/test_watchers.py b/slider-agent/src/main/python/kazoo/tests/test_watchers.py new file mode 100644 index 0000000..1c51d60 --- /dev/null +++ b/slider-agent/src/main/python/kazoo/tests/test_watchers.py @@ -0,0 +1,489 @@ +import time +import threading +import uuid + +from nose.tools import eq_ +from nose.tools import raises + +from kazoo.exceptions import KazooException +from kazoo.protocol.states import EventType +from kazoo.testing import KazooTestCase + + +class KazooDataWatcherTests(KazooTestCase): + def setUp(self): + super(KazooDataWatcherTests, self).setUp() + self.path = "/" + uuid.uuid4().hex + self.client.ensure_path(self.path) + + def test_data_watcher(self): + update = threading.Event() + data = [True] + + # Make it a non-existent path + self.path += 'f' + + @self.client.DataWatch(self.path) + def changed(d, stat): + data.pop() + data.append(d) + update.set() + + update.wait(10) + eq_(data, [None]) + update.clear() + + self.client.create(self.path, b'fred') + update.wait(10) + eq_(data[0], b'fred') + update.clear() + + def test_data_watcher_once(self): + update = threading.Event() + data = [True] + + # Make it a non-existent path + self.path += 'f' + + dwatcher = self.client.DataWatch(self.path) + + @dwatcher + def changed(d, stat): + data.pop() + data.append(d) + update.set() + + update.wait(10) + eq_(data, [None]) + update.clear() + + @raises(KazooException) + def test_it(): + @dwatcher + def func(d, stat): + data.pop() + test_it() + + def test_data_watcher_with_event(self): + # Test that the data watcher gets passed the event, if it + # accepts three arguments + update = threading.Event() + data = [True] + + # Make it a non-existent path + self.path += 'f' + + @self.client.DataWatch(self.path) + def changed(d, stat, event): + data.pop() + data.append(event) + update.set() + + update.wait(10) + eq_(data, [None]) + update.clear() + + self.client.create(self.path, b'fred') + update.wait(10) + eq_(data[0].type, EventType.CREATED) + update.clear() + + def test_func_style_data_watch(self): + update = threading.Event() + data = [True] + + # Make it a non-existent path + path = self.path + 'f' + + def changed(d, stat): + data.pop() + data.append(d) + update.set() + self.client.DataWatch(path, changed) + + update.wait(10) + eq_(data, [None]) + update.clear() + + self.client.create(path, b'fred') + update.wait(10) + eq_(data[0], b'fred') + update.clear() + + def test_datawatch_across_session_expire(self): + update = threading.Event() + data = [True] + + @self.client.DataWatch(self.path) + def changed(d, stat): + data.pop() + data.append(d) + update.set() + + update.wait(10) + eq_(data, [b""]) + update.clear() + + self.expire_session() + self.client.retry(self.client.set, self.path, b'fred') + update.wait(25) + eq_(data[0], b'fred') + + def test_func_stops(self): + update = threading.Event() + data = [True] + + self.path += "f" + + fail_through = [] + + @self.client.DataWatch(self.path) + def changed(d, stat): + data.pop() + data.append(d) + update.set() + if fail_through: + return False + + update.wait(10) + eq_(data, [None]) + update.clear() + + fail_through.append(True) + self.client.create(self.path, b'fred') + update.wait(10) + eq_(data[0], b'fred') + update.clear() + + self.client.set(self.path, b'asdfasdf') + update.wait(0.2) + eq_(data[0], b'fred') + + d, stat = self.client.get(self.path) + eq_(d, b'asdfasdf') + + def test_no_such_node(self): + args = [] + + @self.client.DataWatch("/some/path") + def changed(d, stat): + args.extend([d, stat]) + + eq_(args, [None, None]) + + def test_bad_watch_func2(self): + counter = 0 + + @self.client.DataWatch(self.path) + def changed(d, stat): + if counter > 0: + raise Exception("oops") + + raises(Exception)(changed) + + counter += 1 + self.client.set(self.path, b'asdfasdf') + + def test_watcher_evaluating_to_false(self): + class WeirdWatcher(list): + def __call__(self, *args): + self.called = True + watcher = WeirdWatcher() + self.client.DataWatch(self.path, watcher) + self.client.set(self.path, b'mwahaha') + self.assertTrue(watcher.called) + + def test_watcher_repeat_delete(self): + a = [] + ev = threading.Event() + + self.client.delete(self.path) + + @self.client.DataWatch(self.path) + def changed(val, stat): + a.append(val) + ev.set() + + eq_(a, [None]) + ev.wait(10) + ev.clear() + self.client.create(self.path, b'blah') + ev.wait(10) + eq_(ev.is_set(), True) + ev.clear() + eq_(a, [None, b'blah']) + self.client.delete(self.path) + ev.wait(10) + eq_(ev.is_set(), True) + ev.clear() + eq_(a, [None, b'blah', None]) + self.client.create(self.path, b'blah') + ev.wait(10) + eq_(ev.is_set(), True) + ev.clear() + eq_(a, [None, b'blah', None, b'blah']) + + def test_watcher_with_closing(self): + a = [] + ev = threading.Event() + + self.client.delete(self.path) + + @self.client.DataWatch(self.path) + def changed(val, stat): + a.append(val) + ev.set() + eq_(a, [None]) + + b = False + try: + self.client.stop() + except: + b = True + eq_(b, False) + + +class KazooChildrenWatcherTests(KazooTestCase): + def setUp(self): + super(KazooChildrenWatcherTests, self).setUp() + self.path = "/" + uuid.uuid4().hex + self.client.ensure_path(self.path) + + def test_child_watcher(self): + update = threading.Event() + all_children = ['fred'] + + @self.client.ChildrenWatch(self.path) + def changed(children): + while all_children: + all_children.pop() + all_children.extend(children) + update.set() + + update.wait(10) + eq_(all_children, []) + update.clear() + + self.client.create(self.path + '/' + 'smith') + update.wait(10) + eq_(all_children, ['smith']) + update.clear() + + self.client.create(self.path + '/' + 'george') + update.wait(10) + eq_(sorted(all_children), ['george', 'smith']) + + def test_child_watcher_once(self): + update = threading.Event() + all_children = ['fred'] + + cwatch = self.client.ChildrenWatch(self.path) + + @cwatch + def changed(children): + while all_children: + all_children.pop() + all_children.extend(children) + update.set() + + update.wait(10) + eq_(all_children, []) + update.clear() + + @raises(KazooException) + def test_it(): + @cwatch + def changed_again(children): + update.set() + test_it() + + def test_child_watcher_with_event(self): + update = threading.Event() + events = [True] + + @self.client.ChildrenWatch(self.path, send_event=True) + def changed(children, event): + events.pop() + events.append(event) + update.set() + + update.wait(10) + eq_(events, [None]) + update.clear() + + self.client.create(self.path + '/' + 'smith') + update.wait(10) + eq_(events[0].type, EventType.CHILD) + update.clear() + + def test_func_style_child_watcher(self): + update = threading.Event() + all_children = ['fred'] + + def changed(children): + while all_children: + all_children.pop() + all_children.extend(children) + update.set() + + self.client.ChildrenWatch(self.path, changed) + + update.wait(10) + eq_(all_children, []) + update.clear() + + self.client.create(self.path + '/' + 'smith') + update.wait(10) + eq_(all_children, ['smith']) + update.clear() + + self.client.create(self.path + '/' + 'george') + update.wait(10) + eq_(sorted(all_children), ['george', 'smith']) + + def test_func_stops(self): + update = threading.Event() + all_children = ['fred'] + + fail_through = [] + + @self.client.ChildrenWatch(self.path) + def changed(children): + while all_children: + all_children.pop() + all_children.extend(children) + update.set() + if fail_through: + return False + + update.wait(10) + eq_(all_children, []) + update.clear() + + fail_through.append(True) + self.client.create(self.path + '/' + 'smith') + update.wait(10) + eq_(all_children, ['smith']) + update.clear() + + self.client.create(self.path + '/' + 'george') + update.wait(0.5) + eq_(all_children, ['smith']) + + def test_child_watch_session_loss(self): + update = threading.Event() + all_children = ['fred'] + + @self.client.ChildrenWatch(self.path) + def changed(children): + while all_children: + all_children.pop() + all_children.extend(children) + update.set() + + update.wait(10) + eq_(all_children, []) + update.clear() + + self.client.create(self.path + '/' + 'smith') + update.wait(10) + eq_(all_children, ['smith']) + update.clear() + self.expire_session() + + self.client.retry(self.client.create, + self.path + '/' + 'george') + update.wait(20) + eq_(sorted(all_children), ['george', 'smith']) + + def test_child_stop_on_session_loss(self): + update = threading.Event() + all_children = ['fred'] + + @self.client.ChildrenWatch(self.path, allow_session_lost=False) + def changed(children): + while all_children: + all_children.pop() + all_children.extend(children) + update.set() + + update.wait(10) + eq_(all_children, []) + update.clear() + + self.client.create(self.path + '/' + 'smith') + update.wait(10) + eq_(all_children, ['smith']) + update.clear() + self.expire_session() + + self.client.retry(self.client.create, + self.path + '/' + 'george') + update.wait(4) + eq_(update.is_set(), False) + eq_(all_children, ['smith']) + + children = self.client.get_children(self.path) + eq_(sorted(children), ['george', 'smith']) + + def test_bad_children_watch_func(self): + counter = 0 + + @self.client.ChildrenWatch(self.path) + def changed(children): + if counter > 0: + raise Exception("oops") + + raises(Exception)(changed) + counter += 1 + self.client.create(self.path + '/' + 'smith') + + +class KazooPatientChildrenWatcherTests(KazooTestCase): + def setUp(self): + super(KazooPatientChildrenWatcherTests, self).setUp() + self.path = "/" + uuid.uuid4().hex + + def _makeOne(self, *args, **kwargs): + from kazoo.recipe.watchers import PatientChildrenWatch + return PatientChildrenWatch(*args, **kwargs) + + def test_watch(self): + self.client.ensure_path(self.path) + watcher = self._makeOne(self.client, self.path, 0.1) + result = watcher.start() + children, asy = result.get() + eq_(len(children), 0) + eq_(asy.ready(), False) + + self.client.create(self.path + '/' + 'fred') + asy.get(timeout=1) + eq_(asy.ready(), True) + + def test_exception(self): + from kazoo.exceptions import NoNodeError + watcher = self._makeOne(self.client, self.path, 0.1) + result = watcher.start() + + @raises(NoNodeError) + def testit(): + result.get() + testit() + + def test_watch_iterations(self): + self.client.ensure_path(self.path) + watcher = self._makeOne(self.client, self.path, 0.5) + result = watcher.start() + eq_(result.ready(), False) + + time.sleep(0.08) + self.client.create(self.path + '/' + uuid.uuid4().hex) + eq_(result.ready(), False) + time.sleep(0.08) + eq_(result.ready(), False) + self.client.create(self.path + '/' + uuid.uuid4().hex) + time.sleep(0.08) + eq_(result.ready(), False) + + children, asy = result.get() + eq_(len(children), 2) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/main/python/kazoo/tests/util.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/kazoo/tests/util.py b/slider-agent/src/main/python/kazoo/tests/util.py new file mode 100644 index 0000000..298be71 --- /dev/null +++ b/slider-agent/src/main/python/kazoo/tests/util.py @@ -0,0 +1,126 @@ +############################################################################## +# +# Copyright Zope Foundation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## + +import logging +import os +import time + +TRAVIS = os.environ.get('TRAVIS', False) +TRAVIS_ZK_VERSION = TRAVIS and os.environ.get('ZOOKEEPER_VERSION', None) +if TRAVIS_ZK_VERSION: + TRAVIS_ZK_VERSION = tuple([int(n) for n in TRAVIS_ZK_VERSION.split('.')]) + + +class Handler(logging.Handler): + + def __init__(self, *names, **kw): + logging.Handler.__init__(self) + self.names = names + self.records = [] + self.setLoggerLevel(**kw) + + def setLoggerLevel(self, level=1): + self.level = level + self.oldlevels = {} + + def emit(self, record): + self.records.append(record) + + def clear(self): + del self.records[:] + + def install(self): + for name in self.names: + logger = logging.getLogger(name) + self.oldlevels[name] = logger.level + logger.setLevel(self.level) + logger.addHandler(self) + + def uninstall(self): + for name in self.names: + logger = logging.getLogger(name) + logger.setLevel(self.oldlevels[name]) + logger.removeHandler(self) + + def __str__(self): + return '\n'.join( + [("%s %s\n %s" % + (record.name, record.levelname, + '\n'.join([line + for line in record.getMessage().split('\n') + if line.strip()]) + ) + ) + for record in self.records] + ) + + +class InstalledHandler(Handler): + + def __init__(self, *names, **kw): + Handler.__init__(self, *names, **kw) + self.install() + + +class Wait(object): + + class TimeOutWaitingFor(Exception): + "A test condition timed out" + + timeout = 9 + wait = .01 + + def __init__(self, timeout=None, wait=None, exception=None, + getnow=(lambda: time.time), getsleep=(lambda: time.sleep)): + + if timeout is not None: + self.timeout = timeout + + if wait is not None: + self.wait = wait + + if exception is not None: + self.TimeOutWaitingFor = exception + + self.getnow = getnow + self.getsleep = getsleep + + def __call__(self, func=None, timeout=None, wait=None, message=None): + if func is None: + return lambda func: self(func, timeout, wait, message) + + if func(): + return + + now = self.getnow() + sleep = self.getsleep() + if timeout is None: + timeout = self.timeout + if wait is None: + wait = self.wait + wait = float(wait) + + deadline = now() + timeout + while 1: + sleep(wait) + if func(): + return + if now() > deadline: + raise self.TimeOutWaitingFor( + message or + getattr(func, '__doc__') or + getattr(func, '__name__') + ) + +wait = Wait() http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-agent/src/test/python/agent/TestMain.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/test/python/agent/TestMain.py b/slider-agent/src/test/python/agent/TestMain.py index 4ad7a8f..0d1e1de 100644 --- a/slider-agent/src/test/python/agent/TestMain.py +++ b/slider-agent/src/test/python/agent/TestMain.py @@ -260,13 +260,15 @@ class TestMain(unittest.TestCase): self.assertTrue(start_mock.called) class AgentOptions: - def __init__(self, label, host, port, secured_port, verbose, debug): + def __init__(self, label, host, port, secured_port, zk_quorum, zk_reg_path, verbose, debug): self.label = label self.host = host self.port = port self.secured_port = secured_port self.verbose = verbose self.debug = debug + self.zk_quorum = zk_quorum + self.zk_reg_path = zk_reg_path @patch.object(main, "setup_logging") @patch.object(main, "bind_signal_handlers") @@ -292,7 +294,7 @@ class TestMain(unittest.TestCase): Controller_init_mock.return_value = None isAlive_mock.return_value = False parse_args_mock.return_value = ( - TestMain.AgentOptions("agent", "host1", "8080", "8081", True, ""), []) + TestMain.AgentOptions("agent", "host1", "8080", "8081", "host1:2181", "/registry/org-apache-slider/cl1", True, ""), []) tmpdir = tempfile.gettempdir() #testing call without command-line arguments http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java b/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java index c85ad91..38f55c2 100644 --- a/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java +++ b/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java @@ -167,4 +167,9 @@ public interface SliderKeys extends SliderXmlConfKeys { String PASSPHRASE = "DEV"; String PASS_LEN = "50"; String KEYSTORE_LOCATION = "ssl.server.keystore.location"; + + /** + * Python specific + */ + String PYTHONPATH = "PYTHONPATH"; } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java index 89b862e..a4a71dd 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java +++ b/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.slider.api.ClusterDescription; import org.apache.slider.common.SliderKeys; import org.apache.slider.common.tools.ConfigHelper; +import org.apache.slider.common.tools.SliderFileSystem; import org.apache.slider.common.tools.SliderUtils; import org.apache.slider.core.conf.AggregateConf; import org.apache.slider.core.exceptions.BadCommandArgumentsException; @@ -146,6 +147,15 @@ public abstract class AbstractProviderService /** * No-op implementation of this method. + */ + @Override + public void initializeApplicationConfiguration( + AggregateConf instanceDefinition, SliderFileSystem fileSystem) + throws IOException, SliderException { + } + + /** + * No-op implementation of this method. * * {@inheritDoc} */ @@ -350,4 +360,12 @@ public abstract class AbstractProviderService public void addContainerRequest(AMRMClient.ContainerRequest req) { // no-op } + + /** + * No-op implementation of this method. + */ + @Override + public void rebuildContainerDetails(List<Container> liveContainers, + String applicationId, Map<Integer, ProviderRole> providerRoles) { + } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java index e717158..0f5b4fb 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java +++ b/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java @@ -115,6 +115,17 @@ public interface ProviderService extends ProviderCore, throws BadCommandArgumentsException, IOException; /** + * The application configuration should be initialized here + * + * @param instanceDefinition + * @param fileSystem + * @throws IOException + * @throws SliderException + */ + void initializeApplicationConfiguration(AggregateConf instanceDefinition, + SliderFileSystem fileSystem) throws IOException, SliderException; + + /** * This is a validation of the application configuration on the AM. * Here is where things like the existence of keytabs and other * not-seen-client-side properties can be tested, before @@ -180,4 +191,15 @@ public interface ProviderService extends ProviderCore, * @return the selector to use for choosing containers. */ ContainerReleaseSelector createContainerReleaseSelector(); + + /** + * On AM restart (for whatever reason) this API is required to rebuild the AM + * internal state with the containers which were already assigned and running + * + * @param liveContainers + * @param applicationId + * @param providerRoles + */ + void rebuildContainerDetails(List<Container> liveContainers, + String applicationId, Map<Integer, ProviderRole> providerRoles); } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java index 31d09c4..419fa1a 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java +++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java @@ -68,6 +68,8 @@ public interface AgentKeys { String ARG_HOST = "--host"; String ARG_PORT = "--port"; String ARG_SECURED_PORT = "--secured_port"; + String ARG_ZOOKEEPER_QUORUM = "--zk-quorum"; + String ARG_ZOOKEEPER_REGISTRY_PATH = "--zk-reg-path"; String ARG_DEBUG = "--debug"; String AGENT_MAIN_SCRIPT_ROOT = "./infra/agent/slider-agent/"; String AGENT_MAIN_SCRIPT = "agent/main.py"; http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java index 419a454..59a6e29 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java +++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java @@ -19,6 +19,7 @@ package org.apache.slider.providers.agent; import com.google.common.annotations.VisibleForTesting; +import org.apache.curator.utils.ZKPaths; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.StringUtils; @@ -27,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.slider.api.ClusterDescription; import org.apache.slider.api.ClusterDescriptionKeys; import org.apache.slider.api.ClusterNode; @@ -39,6 +41,7 @@ import org.apache.slider.core.conf.AggregateConf; import org.apache.slider.core.conf.ConfTreeOperations; import org.apache.slider.core.conf.MapOperations; import org.apache.slider.core.exceptions.BadCommandArgumentsException; +import org.apache.slider.core.exceptions.BadConfigException; import org.apache.slider.core.exceptions.SliderException; import org.apache.slider.core.launch.CommandLineBuilder; import org.apache.slider.core.launch.ContainerLauncher; @@ -59,6 +62,8 @@ import org.apache.slider.providers.agent.application.metadata.Metainfo; import org.apache.slider.providers.agent.application.metadata.OSPackage; import org.apache.slider.providers.agent.application.metadata.OSSpecific; import org.apache.slider.server.appmaster.actions.ProviderReportedContainerLoss; +import org.apache.slider.server.appmaster.state.ContainerPriority; +import org.apache.slider.server.appmaster.state.RoleStatus; import org.apache.slider.server.appmaster.state.StateAccessForProviders; import org.apache.slider.server.appmaster.web.rest.agent.AgentCommandType; import org.apache.slider.server.appmaster.web.rest.agent.AgentRestOperations; @@ -172,21 +177,11 @@ public class AgentProviderService extends AbstractProviderService implements clientProvider.validateInstanceDefinition(instanceDefinition); } - @Override - public void buildContainerLaunchContext(ContainerLauncher launcher, - AggregateConf instanceDefinition, - Container container, - String role, - SliderFileSystem fileSystem, - Path generatedConfPath, - MapOperations resourceComponent, - MapOperations appComponent, - Path containerTmpDirPath) throws - IOException, - SliderException { - - String appDef = instanceDefinition.getAppConfOperations(). - getGlobalOptions().getMandatoryOption(AgentKeys.APP_DEF); + // Reads the metainfo.xml in the application package and loads it + private void buildMetainfo(AggregateConf instanceDefinition, + SliderFileSystem fileSystem) throws IOException, SliderException { + String appDef = instanceDefinition.getAppConfOperations() + .getGlobalOptions().getMandatoryOption(AgentKeys.APP_DEF); if (metainfo == null) { synchronized (syncLock) { @@ -197,15 +192,42 @@ public class AgentProviderService extends AbstractProviderService implements metainfo = getApplicationMetainfo(fileSystem, appDef); if (metainfo == null || metainfo.getApplication() == null) { log.error("metainfo.xml is unavailable or malformed at {}.", appDef); - throw new SliderException("metainfo.xml is required in app package."); + throw new SliderException( + "metainfo.xml is required in app package."); } - - commandOrder = new ComponentCommandOrder(metainfo.getApplication().getCommandOrder()); + commandOrder = new ComponentCommandOrder(metainfo.getApplication() + .getCommandOrder()); monitor = new HeartbeatMonitor(this, getHeartbeatMonitorInterval()); monitor.start(); } } } + } + + @Override + public void initializeApplicationConfiguration( + AggregateConf instanceDefinition, SliderFileSystem fileSystem) + throws IOException, SliderException { + buildMetainfo(instanceDefinition, fileSystem); + } + + @Override + public void buildContainerLaunchContext(ContainerLauncher launcher, + AggregateConf instanceDefinition, + Container container, + String role, + SliderFileSystem fileSystem, + Path generatedConfPath, + MapOperations resourceComponent, + MapOperations appComponent, + Path containerTmpDirPath) throws + IOException, + SliderException { + + String appDef = instanceDefinition.getAppConfOperations(). + getGlobalOptions().getMandatoryOption(AgentKeys.APP_DEF); + + initializeApplicationConfiguration(instanceDefinition, fileSystem); log.info("Build launch context for Agent"); log.debug(instanceDefinition.toString()); @@ -235,6 +257,13 @@ public class AgentProviderService extends AbstractProviderService implements scriptPath = new File(appHome, AgentKeys.AGENT_MAIN_SCRIPT).getPath(); } + // set PYTHONPATH + List<String> pythonPaths = new ArrayList<String>(); + pythonPaths.add(AgentKeys.AGENT_MAIN_SCRIPT_ROOT); + String pythonPath = StringUtils.join(File.pathSeparator, pythonPaths); + launcher.setEnv(PYTHONPATH, pythonPath); + log.info("PYTHONPATH set to {}", pythonPath); + String agentImage = instanceDefinition.getInternalOperations(). get(OptionKeys.INTERNAL_APPLICATION_IMAGE_PATH); if (agentImage != null) { @@ -273,12 +302,10 @@ public class AgentProviderService extends AbstractProviderService implements operation.add(scriptPath); operation.add(ARG_LABEL, label); - operation.add(ARG_HOST); - operation.add(getClusterInfoPropertyValue(StatusKeys.INFO_AM_HOSTNAME)); - operation.add(ARG_PORT); - operation.add(getClusterInfoPropertyValue(StatusKeys.INFO_AM_AGENT_PORT)); - operation.add(ARG_SECURED_PORT); - operation.add(getClusterInfoPropertyValue(StatusKeys.INFO_AM_SECURED_AGENT_PORT)); + operation.add(ARG_ZOOKEEPER_QUORUM); + operation.add(getClusterOptionPropertyValue(OptionKeys.ZOOKEEPER_QUORUM)); + operation.add(ARG_ZOOKEEPER_REGISTRY_PATH); + operation.add(getZkRegistryPath()); String debugCmd = agentLaunchParameter.getNextLaunchParameter(role); if (debugCmd != null && debugCmd.length() != 0) { @@ -296,6 +323,36 @@ public class AgentProviderService extends AbstractProviderService implements getClusterInfoPropertyValue(OptionKeys.APPLICATION_NAME))); } + // build the zookeeper registry path + private String getZkRegistryPath() { + String zkRegistryRoot = getConfig().get(REGISTRY_PATH, + DEFAULT_REGISTRY_PATH); + String appType = APP_TYPE; + String zkRegistryPath = ZKPaths.makePath(zkRegistryRoot, appType); + String clusterName = getAmState().getInternalsSnapshot().get( + OptionKeys.APPLICATION_NAME); + zkRegistryPath = ZKPaths.makePath(zkRegistryPath, clusterName); + return zkRegistryPath; + } + + @Override + public void rebuildContainerDetails(List<Container> liveContainers, + String applicationId, Map<Integer, ProviderRole> providerRoleMap) { + for (Container container : liveContainers) { + // get the role name and label + Priority priority = container.getPriority(); + String roleName = providerRoleMap.get(priority.getPriority()).name; + String label = getContainerLabel(container, roleName); + log.info("Rebuilding in-memory: container {} in role {} in cluster {}", + container.getId().toString(), roleName, applicationId); + getComponentStatuses() + .put( + label, + new ComponentInstanceState(roleName, container.getId(), + applicationId)); + } + } + /** * Run this service * @@ -330,6 +387,7 @@ public class AgentProviderService extends AbstractProviderService implements */ @Override public RegistrationResponse handleRegistration(Register registration) { + log.info("Handling registration: " + registration); RegistrationResponse response = new RegistrationResponse(); String label = registration.getHostname(); if (getComponentStatuses().containsKey(label)) { @@ -340,6 +398,7 @@ public class AgentProviderService extends AbstractProviderService implements response.setLog("Label not recognized."); log.warn("Received registration request from unknown label {}", label); } + log.info("Registration response: " + response); return response; } @@ -350,12 +409,14 @@ public class AgentProviderService extends AbstractProviderService implements */ @Override public HeartBeatResponse handleHeartBeat(HeartBeat heartBeat) { + log.debug("Handling heartbeat: " + heartBeat); HeartBeatResponse response = new HeartBeatResponse(); long id = heartBeat.getResponseId(); response.setResponseId(id + 1L); String label = heartBeat.getHostname(); String roleName = getRoleName(label); + State agentState = heartBeat.getAgentState(); String containerId = getContainerId(label); StateAccessForProviders accessor = getAmState(); @@ -372,6 +433,7 @@ public class AgentProviderService extends AbstractProviderService implements Boolean isMaster = isMaster(roleName); ComponentInstanceState componentStatus = getComponentStatuses().get(label); + updateComponentStatusWithAgentState(componentStatus, agentState); componentStatus.heartbeat(System.currentTimeMillis()); // If no Master can explicitly publish then publish if its a master // Otherwise, wait till the master that can publish is ready @@ -444,9 +506,17 @@ public class AgentProviderService extends AbstractProviderService implements log.warn("Component instance failed operation.", e); } + log.debug("Heartbeat response: " + response); return response; } + private void updateComponentStatusWithAgentState( + ComponentInstanceState componentStatus, State agentState) { + if (agentState != null) { + componentStatus.setState(agentState); + } + } + @Override public Map<String, String> buildMonitorDetails(ClusterDescription clusterDesc) { Map<String, String> details = super.buildMonitorDetails(clusterDesc); @@ -589,6 +659,14 @@ public class AgentProviderService extends AbstractProviderService implements return description.getInfo(name); } + protected String getClusterOptionPropertyValue(String name) + throws BadConfigException { + StateAccessForProviders accessor = getAmState(); + assert accessor.isApplicationLive(); + ClusterDescription description = accessor.getClusterStatus(); + return description.getMandatoryOption(name); + } + /** * Lost heartbeat from the container - release it and ask for a replacement * (async operation) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java index 52f7f8f..43cda93 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java @@ -108,6 +108,7 @@ import org.apache.slider.server.appmaster.rpc.SliderClusterProtocolPBImpl; import org.apache.slider.server.appmaster.operations.AbstractRMOperation; import org.apache.slider.server.appmaster.state.AppState; import org.apache.slider.server.appmaster.state.ContainerAssignment; +import org.apache.slider.server.appmaster.state.ContainerPriority; import org.apache.slider.server.appmaster.operations.ContainerReleaseOperation; import org.apache.slider.server.appmaster.state.ProviderAppState; import org.apache.slider.server.appmaster.operations.RMOperationHandler; @@ -708,6 +709,9 @@ public class SliderAppMaster extends AbstractSliderLaunchedService Configuration providerConf = providerService.loadProviderConfigurationInformation(confDir); + providerService + .initializeApplicationConfiguration(instanceDefinition, fs); + providerService.validateApplicationConfiguration(instanceDefinition, confDir, securityEnabled); @@ -726,6 +730,9 @@ public class SliderAppMaster extends AbstractSliderLaunchedService appInformation, new SimpleReleaseSelector()); + providerService.rebuildContainerDetails(liveContainers, + instanceDefinition.getName(), appState.getRolePriorityMap()); + // add the AM to the list of nodes in the cluster appState.buildAppMasterNode(appMasterContainerID, http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java index 667fe99..cade115 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java @@ -152,6 +152,9 @@ public class AppState { private final Map<String, ProviderRole> roles = new ConcurrentHashMap<>(); + private final Map<Integer, ProviderRole> rolePriorityMap = + new ConcurrentHashMap<>(); + /** * The master node. */ @@ -321,6 +324,10 @@ public class AppState { return roles; } + public Map<Integer, ProviderRole> getRolePriorityMap() { + return rolePriorityMap; + } + private Map<ContainerId, RoleInstance> getStartingNodes() { return startingNodes; } @@ -711,6 +718,7 @@ public class AppState { roleStatusMap.put(priority, new RoleStatus(providerRole)); roles.put(providerRole.name, providerRole); + rolePriorityMap.put(priority, providerRole); } /** http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/58014d80/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeat.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeat.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeat.java index d3388f5..62df18d 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeat.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeat.java @@ -18,6 +18,7 @@ package org.apache.slider.server.appmaster.web.rest.agent; +import org.apache.slider.providers.agent.State; import org.codehaus.jackson.annotate.JsonIgnoreProperties; import org.codehaus.jackson.annotate.JsonProperty; import org.codehaus.jackson.map.annotate.JsonSerialize; @@ -44,6 +45,7 @@ public class HeartBeat { HostStatus nodeStatus; private AgentEnv agentEnv = null; private String fqdn; + private State agentState; public long getResponseId() { return responseId; @@ -123,6 +125,14 @@ public class HeartBeat { this.mounts = mounts; } + public State getAgentState() { + return agentState; + } + + public void setAgentState(State agentState) { + this.agentState = agentState; + } + @Override public String toString() { return "HeartBeat{" + @@ -132,6 +142,7 @@ public class HeartBeat { ", reports=" + reports + ", componentStatus=" + componentStatus + ", nodeStatus=" + nodeStatus + + ", agentState=" + agentState + '}'; } }