This is an automated email from the ASF dual-hosted git repository. altay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new f80a96e [BEAM-4093] Support Python ValidatesRunner test in streaming (#5147) f80a96e is described below commit f80a96e5f94c6227226305e28d56912d2c92289d Author: Mark Liu <markflyh...@users.noreply.github.com> AuthorDate: Thu Apr 19 18:10:36 2018 -0700 [BEAM-4093] Support Python ValidatesRunner test in streaming (#5147) * [BEAM-4093] Support Python ValidatesRunner test in streaming * fixit! Remove unnecessary option reset --- .../apache_beam/examples/streaming_wordcount_it_test.py | 2 ++ sdks/python/apache_beam/options/pipeline_options.py | 7 +++++++ .../apache_beam/runners/dataflow/test_dataflow_runner.py | 13 ++++++++----- sdks/python/apache_beam/testing/test_pipeline.py | 3 ++- 4 files changed, 19 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py index d0b53f5..5db1878 100644 --- a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py +++ b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py @@ -37,6 +37,7 @@ INPUT_SUB = 'wc_subscription_input' OUTPUT_SUB = 'wc_subscription_output' DEFAULT_INPUT_NUMBERS = 500 +WAIT_UNTIL_FINISH_DURATION = 3 * 60 * 1000 # in milliseconds class StreamingWordCountIT(unittest.TestCase): @@ -87,6 +88,7 @@ class StreamingWordCountIT(unittest.TestCase): timeout=400) extra_opts = {'input_subscription': self.input_sub.full_name, 'output_topic': self.output_topic.full_name, + 'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION, 'on_success_matcher': all_of(state_verifier, pubsub_msg_verifier)} diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 7a2cd4b..b5f9d77 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -649,6 +649,13 @@ class TestOptions(PipelineOptions): default=False, help=('Used in unit testing runners without submitting the ' 'actual job.')) + parser.add_argument( + '--wait_until_finish_duration', + default=None, + type=int, + help='The time to wait (in milliseconds) for test pipeline to finish. ' + 'If it is set to None, it will wait indefinitely until the job ' + 'is finished.') def validate(self, validator): errors = [] diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py index 765ed24..eedfa60 100644 --- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py @@ -18,6 +18,7 @@ """Wrapper of Beam runners that's built for running and verifying e2e tests.""" from __future__ import print_function +import logging import time from apache_beam.internal import pickler @@ -37,6 +38,8 @@ class TestDataflowRunner(DataflowRunner): """Execute test pipeline and verify test matcher""" options = pipeline._options.view_as(TestOptions) on_success_matcher = options.on_success_matcher + wait_duration = options.wait_until_finish_duration + is_streaming = options.view_as(StandardOptions).streaming # [BEAM-1889] Do not send this to remote workers also, there is no need to # send this option to remote executors. @@ -49,10 +52,11 @@ class TestDataflowRunner(DataflowRunner): print('Found: %s.' % self.build_console_url(pipeline.options)) try: - if not options.view_as(StandardOptions).streaming: - self.result.wait_until_finish() - else: - self.wait_until_in_state(PipelineState.RUNNING) + self.wait_until_in_state(PipelineState.RUNNING) + + if is_streaming and not wait_duration: + logging.warning('Waiting indefinitely for streaming job.') + self.result.wait_until_finish(duration=wait_duration) if on_success_matcher: from hamcrest import assert_that as hc_assert_that @@ -60,7 +64,6 @@ class TestDataflowRunner(DataflowRunner): finally: if not self.result.is_in_terminal_state(): self.result.cancel() - if options.view_as(StandardOptions).streaming: self.wait_until_in_state(PipelineState.CANCELLED, timeout=300) return self.result diff --git a/sdks/python/apache_beam/testing/test_pipeline.py b/sdks/python/apache_beam/testing/test_pipeline.py index 155190c..0525945 100644 --- a/sdks/python/apache_beam/testing/test_pipeline.py +++ b/sdks/python/apache_beam/testing/test_pipeline.py @@ -102,7 +102,8 @@ class TestPipeline(Pipeline): result = super(TestPipeline, self).run(test_runner_api) if self.blocking: state = result.wait_until_finish() - assert state == PipelineState.DONE, "Pipeline execution failed." + assert state in (PipelineState.DONE, PipelineState.CANCELLED), \ + "Pipeline execution failed." return result -- To stop receiving notification emails like this one, please contact al...@apache.org.