[ 
https://issues.apache.org/jira/browse/BEAM-3861?focusedWorklogId=83283&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83283
 ]

ASF GitHub Bot logged work on BEAM-3861:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/Mar/18 18:37
            Start Date: 22/Mar/18 18:37
    Worklog Time Spent: 10m 
      Work Description: markflyhigh commented on a change in pull request 
#4930: [BEAM-3861] Complete streaming wordcount test in Python SDK
URL: https://github.com/apache/beam/pull/4930#discussion_r176530410
 
 

 ##########
 File path: sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
 ##########
 @@ -44,34 +44,41 @@ def run_pipeline(self, pipeline):
 
     self.result = super(TestDataflowRunner, self).run_pipeline(pipeline)
     if self.result.has_job:
-      project = pipeline._options.view_as(GoogleCloudOptions).project
-      region_id = pipeline._options.view_as(GoogleCloudOptions).region
-      job_id = self.result.job_id()
       # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs
       # in some cases.
-      print (
-          'Found: https://console.cloud.google.com/dataflow/jobsDetail'
-          '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project))
+      print('Found: %s.' % self.build_console_url(pipeline.options))
 
     if not options.view_as(StandardOptions).streaming:
       self.result.wait_until_finish()
     else:
-      # TODO: Ideally, we want to wait until workers start successfully.
-      self.wait_until_running()
+      self.wait_until_in_state(PipelineState.RUNNING)
 
     if on_success_matcher:
       from hamcrest import assert_that as hc_assert_that
       hc_assert_that(self.result, pickler.loads(on_success_matcher))
 
+    if options.view_as(StandardOptions).streaming:
+      self.result.cancel()
 
 Review comment:
   I see. A customized matcher is probably one approach for this. Like 
`PubSubMessageMatcher`, the verification is blocked until messaged pulled (or 
timeout). So wait logic inside `on_success_matcher` can give pipeline time to 
process the data. However, cancel will be called immediately after setup only 
if no `on_success_matcher` is provided.
   
   Do we want a waiting step here as a general step of the test?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 83283)
    Time Spent: 6h  (was: 5h 50m)

> Build test infra for end-to-end streaming test in Python SDK
> ------------------------------------------------------------
>
>                 Key: BEAM-3861
>                 URL: https://issues.apache.org/jira/browse/BEAM-3861
>             Project: Beam
>          Issue Type: Task
>          Components: testing
>            Reporter: Mark Liu
>            Assignee: Mark Liu
>            Priority: Major
>          Time Spent: 6h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to