[ https://issues.apache.org/jira/browse/BEAM-9767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17104733#comment-17104733 ]
Sam Rohde commented on BEAM-9767: --------------------------------- Thanks for the update Brian, I have root caused it and have a PR out now. Seems to be an edge case around the StreamingCache. > test_streaming_wordcount flaky timeouts > --------------------------------------- > > Key: BEAM-9767 > URL: https://issues.apache.org/jira/browse/BEAM-9767 > Project: Beam > Issue Type: Bug > Components: sdk-py-core, test-failures > Reporter: Udi Meiri > Assignee: Sam Rohde > Priority: Critical > Time Spent: 2h 40m > Remaining Estimate: 0h > > Timed out after 600s, typically completes in 2.8s on my workstation. > https://builds.apache.org/job/beam_PreCommit_Python_Commit/12376/ > {code} > self = > <apache_beam.runners.interactive.interactive_runner_test.InteractiveRunnerTest > testMethod=test_streaming_wordcount> > @unittest.skipIf( > sys.version_info < (3, 5, 3), > 'The tests require at least Python 3.6 to work.') > def test_streaming_wordcount(self): > class WordExtractingDoFn(beam.DoFn): > def process(self, element): > text_line = element.strip() > words = text_line.split() > return words > > # Add the TestStream so that it can be cached. > ib.options.capturable_sources.add(TestStream) > ib.options.capture_duration = timedelta(seconds=5) > > p = beam.Pipeline( > runner=interactive_runner.InteractiveRunner(), > options=StandardOptions(streaming=True)) > > data = ( > p > | TestStream() > .advance_watermark_to(0) > .advance_processing_time(1) > .add_elements(['to', 'be', 'or', 'not', 'to', 'be']) > .advance_watermark_to(20) > .advance_processing_time(1) > .add_elements(['that', 'is', 'the', 'question']) > | beam.WindowInto(beam.window.FixedWindows(10))) # yapf: disable > > counts = ( > data > | 'split' >> beam.ParDo(WordExtractingDoFn()) > | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) > | 'group' >> beam.GroupByKey() > | 'count' >> beam.Map(lambda wordones: (wordones[0], > sum(wordones[1])))) > > # Watch the local scope for Interactive Beam so that referenced > PCollections > # will be cached. > ib.watch(locals()) > > # This is normally done in the interactive_utils when a transform is > # applied but needs an IPython environment. So we manually run this > here. > ie.current_env().track_user_pipelines() > > # Create a fake limiter that cancels the BCJ once the main job receives > the > # expected amount of results. > class FakeLimiter: > def __init__(self, p, pcoll): > self.p = p > self.pcoll = pcoll > > def is_triggered(self): > result = ie.current_env().pipeline_result(self.p) > if result: > try: > results = result.get(self.pcoll) > except ValueError: > return False > return len(results) >= 10 > return False > > # This sets the limiters to stop reading when the test receives 10 > elements > # or after 5 seconds have elapsed (to eliminate the possibility of > hanging). > ie.current_env().options.capture_control.set_limiters_for_test( > [FakeLimiter(p, data), DurationLimiter(timedelta(seconds=5))]) > > # This tests that the data was correctly cached. > pane_info = PaneInfo(True, True, PaneInfoTiming.UNKNOWN, 0, 0) > expected_data_df = pd.DataFrame([ > ('to', 0, [IntervalWindow(0, 10)], pane_info), > ('be', 0, [IntervalWindow(0, 10)], pane_info), > ('or', 0, [IntervalWindow(0, 10)], pane_info), > ('not', 0, [IntervalWindow(0, 10)], pane_info), > ('to', 0, [IntervalWindow(0, 10)], pane_info), > ('be', 0, [IntervalWindow(0, 10)], pane_info), > ('that', 20000000, [IntervalWindow(20, 30)], pane_info), > ('is', 20000000, [IntervalWindow(20, 30)], pane_info), > ('the', 20000000, [IntervalWindow(20, 30)], pane_info), > ('question', 20000000, [IntervalWindow(20, 30)], pane_info) > ], columns=[0, 'event_time', 'windows', 'pane_info']) # yapf: disable > > > data_df = ib.collect(data, include_window_info=True) > apache_beam/runners/interactive/interactive_runner_test.py:237: > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ > apache_beam/runners/interactive/interactive_beam.py:451: in collect > return head(pcoll, n=-1, include_window_info=include_window_info) > apache_beam/runners/interactive/utils.py:204: in run_within_progress_indicator > return func(*args, **kwargs) > apache_beam/runners/interactive/interactive_beam.py:515: in head > result.wait_until_finish() > apache_beam/runners/interactive/interactive_runner.py:250: in > wait_until_finish > self._underlying_result.wait_until_finish() > apache_beam/runners/direct/direct_runner.py:455: in wait_until_finish > self._executor.await_completion() > apache_beam/runners/direct/executor.py:439: in await_completion > self._executor.await_completion() > apache_beam/runners/direct/executor.py:484: in await_completion > update = self.visible_updates.take() > apache_beam/runners/direct/executor.py:557: in take > item = self._queue.get(timeout=1) > /usr/lib/python3.6/queue.py:173: in get > self.not_empty.wait(remaining) > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ > self = <Condition(<unlocked _thread.lock object at 0x7f2f85244198>, 0)> > timeout = 0.9999979329295456 > def wait(self, timeout=None): > """Wait until notified or until a timeout occurs. > > If the calling thread has not acquired the lock when this method is > called, a RuntimeError is raised. > > This method releases the underlying lock, and then blocks until it is > awakened by a notify() or notify_all() call for the same condition > variable in another thread, or until the optional timeout occurs. Once > awakened or timed out, it re-acquires the lock and returns. > > When the timeout argument is present and not None, it should be a > floating point number specifying a timeout for the operation in > seconds > (or fractions thereof). > > When the underlying lock is an RLock, it is not released using its > release() method, since this may not actually unlock the lock when it > was acquired multiple times recursively. Instead, an internal > interface > of the RLock class is used, which really unlocks it even when it has > been recursively acquired several times. Another internal interface is > then used to restore the recursion level when the lock is reacquired. > > """ > if not self._is_owned(): > raise RuntimeError("cannot wait on un-acquired lock") > waiter = _allocate_lock() > waiter.acquire() > self._waiters.append(waiter) > saved_state = self._release_save() > gotit = False > try: # restore state no matter what (e.g., KeyboardInterrupt) > if timeout is None: > waiter.acquire() > gotit = True > else: > if timeout > 0: > > gotit = waiter.acquire(True, timeout) > E Failed: Timeout >600.0s > /usr/lib/python3.6/threading.py:299: Failed > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)