[ 
https://issues.apache.org/jira/browse/BEAM-9118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17121726#comment-17121726
 ] 

Kenneth Knowles commented on BEAM-9118:
---------------------------------------

This issue is assigned but has not received an update in 30 days so it has been 
labeled "stale-assigned". If you are still working on the issue, please give an 
update and remove the label. If you are no longer working on the issue, please 
unassign so someone else may work on it. In 7 days the issue will be 
automatically unassigned.

> apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses
>  is flaky
> ------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-9118
>                 URL: https://issues.apache.org/jira/browse/BEAM-9118
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-core
>            Reporter: Valentyn Tymofieiev
>            Assignee: Robert Bradshaw
>            Priority: P2
>              Labels: stale-assigned
>
> Sample errors:
> https://builds.apache.org/job/beam_PreCommit_Python_Phrase/1373
> {noformat}
> 4:30:12  self = 
> <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses
>  testMethod=test_pardo_unfusable_side_inputs>
> 14:30:12  
> 14:30:12      def test_pardo_unfusable_side_inputs(self):
> 14:30:12        def cross_product(elem, sides):
> 14:30:12          for side in sides:
> 14:30:12            yield elem, side
> 14:30:12        with self.create_pipeline() as p:
> 14:30:12          pcoll = p | beam.Create(['a', 'b'])
> 14:30:12          assert_that(
> 14:30:12              pcoll | beam.FlatMap(cross_product, 
> beam.pvalue.AsList(pcoll)),
> 14:30:12              equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b', 
> 'b')]))
> 14:30:12      
> 14:30:12        with self.create_pipeline() as p:
> 14:30:12          pcoll = p | beam.Create(['a', 'b'])
> 14:30:12          derived = ((pcoll,) | beam.Flatten()
> 14:30:12                     | beam.Map(lambda x: (x, x))
> 14:30:12                     | beam.GroupByKey()
> 14:30:12                     | 'Unkey' >> beam.Map(lambda kv: kv[0]))
> 14:30:12          assert_that(
> 14:30:12              pcoll | beam.FlatMap(cross_product, 
> beam.pvalue.AsList(derived)),
> 14:30:12  >           equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b', 
> 'b')]))
> 14:30:12  
> 14:30:12  apache_beam/runners/portability/fn_api_runner_test.py:258: 
> 14:30:12  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ _ _ _ _ _ 
> 14:30:12  apache_beam/pipeline.py:481: in __exit__
> 14:30:12      self.run().wait_until_finish()
> 14:30:12  apache_beam/runners/portability/portable_runner.py:445: in 
> wait_until_finish
> 14:30:12      for state_response in self._state_stream:
> 14:30:12  
> target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_channel.py:416:
>  in __next__
> 14:30:12      return self._next()
> 14:30:12  
> target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_channel.py:694:
>  in _next
> 14:30:12      _common.wait(self._state.condition.wait, _response_ready)
> 14:30:12  
> target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_common.py:140:
>  in wait
> 14:30:12      _wait_once(wait_fn, MAXIMUM_WAIT_TIMEOUT, spin_cb)
> 14:30:12  
> target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_common.py:105:
>  in _wait_once
> 14:30:12      wait_fn(timeout=timeout)
> 14:30:12  /usr/lib/python3.6/threading.py:299: in wait
> 14:30:12      gotit = waiter.acquire(True, timeout)
> 14:30:12  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ _ _ _ _ _ 
> 14:30:12  
> 14:30:12  signum = 14, frame = <frame object at 0x31daf68>
> 14:30:12  
> 14:30:12      def handler(signum, frame):
> 14:30:12        msg = 'Timed out after %s seconds.' % self.TIMEOUT_SECS
> 14:30:12        print('=' * 20, msg, '=' * 20)
> 14:30:12        traceback.print_stack(frame)
> 14:30:12        threads_by_id = {th.ident: th for th in threading.enumerate()}
> 14:30:12        for thread_id, stack in sys._current_frames().items():
> 14:30:12          th = threads_by_id.get(thread_id)
> 14:30:12          print()
> 14:30:12          print('# Thread:', th or thread_id)
> 14:30:12          traceback.print_stack(stack)
> 14:30:12  >     raise BaseException(msg)
> 14:30:12  E     BaseException: Timed out after 60 seconds.
> 14:30:12  
> 14:30:12  apache_beam/runners/portability/portable_runner_test.py:77: 
> BaseException
> {noformat}
> https://builds.apache.org/job/beam_PreCommit_Python_Phrase/1366/
> {noformat}
> 09:06:01  self = 
> <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocessesAndMultiWorkers
>  testMethod=test_assert_that>
> 09:06:01  
> 09:06:01      def test_assert_that(self):
> 09:06:01        # TODO: figure out a way for fn_api_runner to parse and raise 
> the
> 09:06:01        # underlying exception.
> 09:06:01        with self.assertRaisesRegex(Exception, 'Failed assert'):
> 09:06:01          with self.create_pipeline() as p:
> 09:06:01  >         assert_that(p | beam.Create(['a', 'b']), equal_to(['a']))
> 09:06:01  E         AssertionError: "Failed assert" does not match "Pipeline 
> timed out waiting for job service subprocess."
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to