Index: python3/test_futures.py
===================================================================
--- python3/test_futures.py	(revision 55)
+++ python3/test_futures.py	(working copy)
@@ -180,6 +180,12 @@
             p.join()
 
 class WaitTests(unittest.TestCase):
+    def setUp(self):
+        self.executor = self.executor_constructor(1)
+
+    def tearDown(self):
+        self.executor.shutdown(wait=True)
+
     def test_first_completed(self):
         def wait_test():
             while not future1._waiters:
@@ -406,21 +412,53 @@
 
 
 class ThreadPoolWaitTests(WaitTests):
-    def setUp(self):
-        self.executor = futures.ThreadPoolExecutor(max_threads=1)
+    executor_constructor = futures.ThreadPoolExecutor
 
-    def tearDown(self):
-        self.executor.shutdown(wait=True)
+    # In a 1-worker pool, inside a task running on the pool, queueing
+    # a task on the same pool and waiting on it should not deadlock.
+    # The naive implementation does.
+    #
+    # This problem doesn't exist on a ProcessPoolExecutor because
+    # users can't create cross-process cycles.
+    def test_self_deadlock(self):
+        def do_nothing():
+            return 42
+        def submit_and_wait(executor):
+            nested_future = executor.submit(do_nothing)
+            return nested_future.result(timeout=1)
+        outer_future = self.executor.submit(submit_and_wait, self.executor)
+        self.assertEquals(outer_future.result(timeout=1), 42)
 
+    # The self-deadlock is also possible in any cycle of executors
+    # waiting on each other.
+    def test_cycle_deadlock(self):
+        def do_nothing():
+            return 42
+        def submit1_and_wait(executor1, executor2):
+            nested_future = executor1.submit(do_nothing)
+            return nested_future.result(timeout=1)
+        def submit2_and_wait(executor1, executor2):
+            nested_future = executor2.submit(submit1_and_wait,
+                                             executor1, executor2)
+            return nested_future.result(timeout=1)
+
+        executor1 = self.executor_constructor(1)
+        executor2 = self.executor_constructor(1)
+        outer_future = executor1.submit(submit2_and_wait,
+                                        executor1, executor2)
+        self.assertEquals(outer_future.result(timeout=1), 42)
+
 class ProcessPoolWaitTests(WaitTests):
+    executor_constructor = futures.ProcessPoolExecutor
+
+class AsCompletedTests(unittest.TestCase):
+    # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
     def setUp(self):
-        self.executor = futures.ProcessPoolExecutor(max_processes=1)
+        self.executor = self.executor_constructor(1)
 
     def tearDown(self):
         self.executor.shutdown(wait=True)
 
-class AsCompletedTests(unittest.TestCase):
-    # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
     def test_no_timeout(self):
         def wait_test():
             while not future1._waiters:
@@ -476,22 +514,20 @@
             call1.set_can()
 
 class ThreadPoolAsCompletedTests(AsCompletedTests):
-    def setUp(self):
-        self.executor = futures.ThreadPoolExecutor(max_threads=1)
+    executor_constructor = futures.ThreadPoolExecutor
 
-    def tearDown(self):
-        self.executor.shutdown(wait=True)
+class ProcessPoolAsCompletedTests(AsCompletedTests):
+    executor_constructor = futures.ProcessPoolExecutor
 
-class ProcessPoolAsCompletedTests(AsCompletedTests):
+class ExecutorTest(unittest.TestCase):
+    # Executor.shutdown() and context manager usage is tested by
+    # ExecutorShutdownTest.
     def setUp(self):
-        self.executor = futures.ProcessPoolExecutor(max_processes=1)
+        self.executor = self.executor_constructor(1)
 
     def tearDown(self):
         self.executor.shutdown(wait=True)
 
-class ExecutorTest(unittest.TestCase):
-    # Executor.shutdown() and context manager usage is tested by
-    # ExecutorShutdownTest.
     def test_submit(self):
         future = self.executor.submit(pow, 2, 8)
         self.assertEquals(256, future.result())
@@ -523,19 +559,11 @@
         self.assertEquals([None, None], results)
 
 class ThreadPoolExecutorTest(ExecutorTest):
-    def setUp(self):
-        self.executor = futures.ThreadPoolExecutor(max_threads=1)
+    executor_constructor = futures.ThreadPoolExecutor
 
-    def tearDown(self):
-        self.executor.shutdown(wait=True)
-
 class ProcessPoolExecutorTest(ExecutorTest):
-    def setUp(self):
-        self.executor = futures.ProcessPoolExecutor(max_processes=1)
+    executor_constructor = futures.ProcessPoolExecutor
 
-    def tearDown(self):
-        self.executor.shutdown(wait=True)
-
 class FutureTests(unittest.TestCase):
     def test_repr(self):
         self.assertRegexpMatches(repr(PENDING_FUTURE),
