This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f80a96e  [BEAM-4093] Support Python ValidatesRunner test in streaming 
(#5147)
f80a96e is described below

commit f80a96e5f94c6227226305e28d56912d2c92289d
Author: Mark Liu <markflyh...@users.noreply.github.com>
AuthorDate: Thu Apr 19 18:10:36 2018 -0700

    [BEAM-4093] Support Python ValidatesRunner test in streaming (#5147)
    
    * [BEAM-4093] Support Python ValidatesRunner test in streaming
    
    * fixit! Remove unnecessary option reset
---
 .../apache_beam/examples/streaming_wordcount_it_test.py     |  2 ++
 sdks/python/apache_beam/options/pipeline_options.py         |  7 +++++++
 .../apache_beam/runners/dataflow/test_dataflow_runner.py    | 13 ++++++++-----
 sdks/python/apache_beam/testing/test_pipeline.py            |  3 ++-
 4 files changed, 19 insertions(+), 6 deletions(-)

diff --git a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py 
b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
index d0b53f5..5db1878 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
@@ -37,6 +37,7 @@ INPUT_SUB = 'wc_subscription_input'
 OUTPUT_SUB = 'wc_subscription_output'
 
 DEFAULT_INPUT_NUMBERS = 500
+WAIT_UNTIL_FINISH_DURATION = 3 * 60 * 1000   # in milliseconds
 
 
 class StreamingWordCountIT(unittest.TestCase):
@@ -87,6 +88,7 @@ class StreamingWordCountIT(unittest.TestCase):
                                                timeout=400)
     extra_opts = {'input_subscription': self.input_sub.full_name,
                   'output_topic': self.output_topic.full_name,
+                  'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION,
                   'on_success_matcher': all_of(state_verifier,
                                                pubsub_msg_verifier)}
 
diff --git a/sdks/python/apache_beam/options/pipeline_options.py 
b/sdks/python/apache_beam/options/pipeline_options.py
index 7a2cd4b..b5f9d77 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -649,6 +649,13 @@ class TestOptions(PipelineOptions):
         default=False,
         help=('Used in unit testing runners without submitting the '
               'actual job.'))
+    parser.add_argument(
+        '--wait_until_finish_duration',
+        default=None,
+        type=int,
+        help='The time to wait (in milliseconds) for test pipeline to finish. '
+             'If it is set to None, it will wait indefinitely until the job '
+             'is finished.')
 
   def validate(self, validator):
     errors = []
diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
index 765ed24..eedfa60 100644
--- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
@@ -18,6 +18,7 @@
 """Wrapper of Beam runners that's built for running and verifying e2e tests."""
 from __future__ import print_function
 
+import logging
 import time
 
 from apache_beam.internal import pickler
@@ -37,6 +38,8 @@ class TestDataflowRunner(DataflowRunner):
     """Execute test pipeline and verify test matcher"""
     options = pipeline._options.view_as(TestOptions)
     on_success_matcher = options.on_success_matcher
+    wait_duration = options.wait_until_finish_duration
+    is_streaming = options.view_as(StandardOptions).streaming
 
     # [BEAM-1889] Do not send this to remote workers also, there is no need to
     # send this option to remote executors.
@@ -49,10 +52,11 @@ class TestDataflowRunner(DataflowRunner):
       print('Found: %s.' % self.build_console_url(pipeline.options))
 
     try:
-      if not options.view_as(StandardOptions).streaming:
-        self.result.wait_until_finish()
-      else:
-        self.wait_until_in_state(PipelineState.RUNNING)
+      self.wait_until_in_state(PipelineState.RUNNING)
+
+      if is_streaming and not wait_duration:
+        logging.warning('Waiting indefinitely for streaming job.')
+      self.result.wait_until_finish(duration=wait_duration)
 
       if on_success_matcher:
         from hamcrest import assert_that as hc_assert_that
@@ -60,7 +64,6 @@ class TestDataflowRunner(DataflowRunner):
     finally:
       if not self.result.is_in_terminal_state():
         self.result.cancel()
-      if options.view_as(StandardOptions).streaming:
         self.wait_until_in_state(PipelineState.CANCELLED, timeout=300)
 
     return self.result
diff --git a/sdks/python/apache_beam/testing/test_pipeline.py 
b/sdks/python/apache_beam/testing/test_pipeline.py
index 155190c..0525945 100644
--- a/sdks/python/apache_beam/testing/test_pipeline.py
+++ b/sdks/python/apache_beam/testing/test_pipeline.py
@@ -102,7 +102,8 @@ class TestPipeline(Pipeline):
     result = super(TestPipeline, self).run(test_runner_api)
     if self.blocking:
       state = result.wait_until_finish()
-      assert state == PipelineState.DONE, "Pipeline execution failed."
+      assert state in (PipelineState.DONE, PipelineState.CANCELLED), \
+          "Pipeline execution failed."
 
     return result
 

-- 
To stop receiving notification emails like this one, please contact
al...@apache.org.

Reply via email to