[ 
https://issues.apache.org/jira/browse/BEAM-9803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089871#comment-17089871
 ] 

Ning Kang commented on BEAM-9803:
---------------------------------

We'll probably need to remove such integration test from being executed on 
Jenkins.

The Jenkins machines could be executing many different things at the same time. 
It's possible that the machine does nothing for the test at all in the given 
5-second timeout.

> test_streaming_wordcount flaky
> ------------------------------
>
>                 Key: BEAM-9803
>                 URL: https://issues.apache.org/jira/browse/BEAM-9803
>             Project: Beam
>          Issue Type: Test
>          Components: sdk-py-core, test-failures
>            Reporter: Ning Kang
>            Assignee: Sam Rohde
>            Priority: Major
>
> {code:java}
> Regressionapache_beam.runners.interactive.interactive_runner_test.InteractiveRunnerTest.test_streaming_wordcount
>  (from py37-cython)Failing for the past 1 build (Since #12462 )Took 7.7 
> sec.Error MessageAssertionError: DataFrame are different  DataFrame shape 
> mismatch [left]:  (10, 4) [right]: (6, 4)Stacktraceself = 
> <apache_beam.runners.interactive.interactive_runner_test.InteractiveRunnerTest
>  testMethod=test_streaming_wordcount>
>     @unittest.skipIf(
>         sys.version_info < (3, 5, 3),
>         'The tests require at least Python 3.6 to work.')
>     def test_streaming_wordcount(self):
>       class WordExtractingDoFn(beam.DoFn):
>         def process(self, element):
>           text_line = element.strip()
>           words = text_line.split()
>           return words
>     
>       # Add the TestStream so that it can be cached.
>       ib.options.capturable_sources.add(TestStream)
>       ib.options.capture_duration = timedelta(seconds=5)
>     
>       p = beam.Pipeline(
>           runner=interactive_runner.InteractiveRunner(),
>           options=StandardOptions(streaming=True))
>     
>       data = (
>           p
>           | TestStream()
>               .advance_watermark_to(0)
>               .advance_processing_time(1)
>               .add_elements(['to', 'be', 'or', 'not', 'to', 'be'])
>               .advance_watermark_to(20)
>               .advance_processing_time(1)
>               .add_elements(['that', 'is', 'the', 'question'])
>           | beam.WindowInto(beam.window.FixedWindows(10))) # yapf: disable
>     
>       counts = (
>           data
>           | 'split' >> beam.ParDo(WordExtractingDoFn())
>           | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
>           | 'group' >> beam.GroupByKey()
>           | 'count' >> beam.Map(lambda wordones: (wordones[0], 
> sum(wordones[1]))))
>     
>       # Watch the local scope for Interactive Beam so that referenced 
> PCollections
>       # will be cached.
>       ib.watch(locals())
>     
>       # This is normally done in the interactive_utils when a transform is
>       # applied but needs an IPython environment. So we manually run this 
> here.
>       ie.current_env().track_user_pipelines()
>     
>       # Create a fake limiter that cancels the BCJ once the main job receives 
> the
>       # expected amount of results.
>       class FakeLimiter:
>         def __init__(self, p, pcoll):
>           self.p = p
>           self.pcoll = pcoll
>     
>         def is_triggered(self):
>           result = ie.current_env().pipeline_result(self.p)
>           if result:
>             try:
>               results = result.get(self.pcoll)
>             except ValueError:
>               return False
>             return len(results) >= 10
>           return False
>     
>       # This sets the limiters to stop reading when the test receives 10 
> elements
>       # or after 5 seconds have elapsed (to eliminate the possibility of 
> hanging).
>       ie.current_env().options.capture_control.set_limiters_for_test(
>           [FakeLimiter(p, data), DurationLimiter(timedelta(seconds=5))])
>     
>       # This tests that the data was correctly cached.
>       pane_info = PaneInfo(True, True, PaneInfoTiming.UNKNOWN, 0, 0)
>       expected_data_df = pd.DataFrame([
>           ('to', 0, [IntervalWindow(0, 10)], pane_info),
>           ('be', 0, [IntervalWindow(0, 10)], pane_info),
>           ('or', 0, [IntervalWindow(0, 10)], pane_info),
>           ('not', 0, [IntervalWindow(0, 10)], pane_info),
>           ('to', 0, [IntervalWindow(0, 10)], pane_info),
>           ('be', 0, [IntervalWindow(0, 10)], pane_info),
>           ('that', 20000000, [IntervalWindow(20, 30)], pane_info),
>           ('is', 20000000, [IntervalWindow(20, 30)], pane_info),
>           ('the', 20000000, [IntervalWindow(20, 30)], pane_info),
>           ('question', 20000000, [IntervalWindow(20, 30)], pane_info)
>       ], columns=[0, 'event_time', 'windows', 'pane_info']) # yapf: disable
>     
>       data_df = ib.collect(data, include_window_info=True)
> >     pd.testing.assert_frame_equal(expected_data_df, data_df)
> E     AssertionError: DataFrame are different
> E     
> E     DataFrame shape mismatch
> E     [left]:  (10, 4)
> E     [right]: (6, 4)
> apache_beam/runners/interactive/interactive_runner_test.py:238: AssertionError
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to