Repository: spark
Updated Branches:
  refs/heads/master 7b5d873ae -> 376d78216


[SPARK-19986][TESTS] Make pyspark.streaming.tests.CheckpointTests more stable

## What changes were proposed in this pull request?

Sometimes, CheckpointTests will hang on a busy machine because the streaming 
jobs are too slow and cannot catch up. I observed the scheduled delay was 
keeping increasing for dozens of seconds locally.

This PR increases the batch interval from 0.5 seconds to 2 seconds to generate 
less Spark jobs. It should make `pyspark.streaming.tests.CheckpointTests` more 
stable. I also replaced `sleep` with `awaitTerminationOrTimeout` so that if the 
streaming job fails, it will also fail the test.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #17323 from zsxwing/SPARK-19986.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/376d7821
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/376d7821
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/376d7821

Branch: refs/heads/master
Commit: 376d782164437573880f0ad58cecae1cb5f212f2
Parents: 7b5d873
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Fri Mar 17 11:12:23 2017 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Fri Mar 17 11:12:23 2017 -0700

----------------------------------------------------------------------
 python/pyspark/streaming/tests.py | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/376d7821/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index 2e8ed69..1bec335 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -903,11 +903,11 @@ class CheckpointTests(unittest.TestCase):
         def setup():
             conf = SparkConf().set("spark.default.parallelism", 1)
             sc = SparkContext(conf=conf)
-            ssc = StreamingContext(sc, 0.5)
+            ssc = StreamingContext(sc, 2)
             dstream = ssc.textFileStream(inputd).map(lambda x: (x, 1))
             wc = dstream.updateStateByKey(updater)
             wc.map(lambda x: "%s,%d" % x).saveAsTextFiles(outputd + "test")
-            wc.checkpoint(.5)
+            wc.checkpoint(2)
             self.setupCalled = True
             return ssc
 
@@ -921,21 +921,22 @@ class CheckpointTests(unittest.TestCase):
 
         def check_output(n):
             while not os.listdir(outputd):
-                time.sleep(0.01)
+                if self.ssc.awaitTerminationOrTimeout(0.5):
+                    raise Exception("ssc stopped")
             time.sleep(1)  # make sure mtime is larger than the previous one
             with open(os.path.join(inputd, str(n)), 'w') as f:
                 f.writelines(["%d\n" % i for i in range(10)])
 
             while True:
+                if self.ssc.awaitTerminationOrTimeout(0.5):
+                    raise Exception("ssc stopped")
                 p = os.path.join(outputd, max(os.listdir(outputd)))
                 if '_SUCCESS' not in os.listdir(p):
                     # not finished
-                    time.sleep(0.01)
                     continue
                 ordd = self.ssc.sparkContext.textFile(p).map(lambda line: 
line.split(","))
                 d = ordd.values().map(int).collect()
                 if not d:
-                    time.sleep(0.01)
                     continue
                 self.assertEqual(10, len(d))
                 s = set(d)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to