[ https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83283&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83283 ]
ASF GitHub Bot logged work on BEAM-3861: ---------------------------------------- Author: ASF GitHub Bot Created on: 22/Mar/18 18:37 Start Date: 22/Mar/18 18:37 Worklog Time Spent: 10m Work Description: markflyhigh commented on a change in pull request #4930: [BEAM-3861] Complete streaming wordcount test in Python SDK URL: https://github.com/apache/beam/pull/4930#discussion_r176530410 ########## File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py ########## @@ -44,34 +44,41 @@ def run_pipeline(self, pipeline): self.result = super(TestDataflowRunner, self).run_pipeline(pipeline) if self.result.has_job: - project = pipeline._options.view_as(GoogleCloudOptions).project - region_id = pipeline._options.view_as(GoogleCloudOptions).region - job_id = self.result.job_id() # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs # in some cases. - print ( - 'Found: https://console.cloud.google.com/dataflow/jobsDetail' - '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project)) + print('Found: %s.' % self.build_console_url(pipeline.options)) if not options.view_as(StandardOptions).streaming: self.result.wait_until_finish() else: - # TODO: Ideally, we want to wait until workers start successfully. - self.wait_until_running() + self.wait_until_in_state(PipelineState.RUNNING) if on_success_matcher: from hamcrest import assert_that as hc_assert_that hc_assert_that(self.result, pickler.loads(on_success_matcher)) + if options.view_as(StandardOptions).streaming: + self.result.cancel() Review comment: I see. A customized matcher is probably one approach for this. Like `PubSubMessageMatcher`, the verification is blocked until messaged pulled (or timeout). So wait logic inside `on_success_matcher` can give pipeline time to process the data. However, cancel will be called immediately after setup only if no `on_success_matcher` is provided. Do we want a waiting step here as a general step of the test? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 83283) Time Spent: 6h (was: 5h 50m) > Build test infra for end-to-end streaming test in Python SDK > ------------------------------------------------------------ > > Key: BEAM-3861 > URL: https://issues.apache.org/jira/browse/BEAM-3861 > Project: Beam > Issue Type: Task > Components: testing > Reporter: Mark Liu > Assignee: Mark Liu > Priority: Major > Time Spent: 6h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)