tvalentyn commented on issue #21121:
URL: https://github.com/apache/beam/issues/21121#issuecomment-1319154978
encountered again:
```
_____ StreamingWordCountIT.test_streaming_wordcount_it _______________
[gw0] linux -- Python 3.7.12
/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/build/gradleenv/-1734967052/bin/python3.7
self =
<apache_beam.examples.streaming_wordcount_it_test.StreamingWordCountIT
testMethod=test_streaming_wordcount_it>
@pytest.mark.it_postcommit
def test_streaming_wordcount_it(self):
# Build expected dataset.
expected_msg = [('%d: 1' % num).encode('utf-8')
for num in range(DEFAULT_INPUT_NUMBERS)]
# Set extra options to the pipeline for test purpose
state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
pubsub_msg_verifier = PubSubMessageMatcher(
self.project, self.output_sub.name, expected_msg, timeout=400)
extra_opts = {
'input_subscription': self.input_sub.name,
'output_topic': self.output_topic.name,
'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION,
'on_success_matcher': all_of(state_verifier, pubsub_msg_verifier)
}
# Generate input data and inject to PubSub.
self._inject_numbers(self.input_topic, DEFAULT_INPUT_NUMBERS)
# Get pipeline options from command argument: --test-pipeline-options,
# and start pipeline job by calling pipeline main function.
streaming_wordcount.run(
self.test_pipeline.get_full_options_as_args(**extra_opts),
> save_main_session=False)
apache_beam/examples/streaming_wordcount_it_test.py:120:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _
apache_beam/examples/streaming_wordcount.py:103: in run
output | beam.io.WriteToPubSub(known_args.output_topic)
apache_beam/pipeline.py:600: in __exit__
self.result = self.run()
apache_beam/pipeline.py:577: in run
return self.runner.run_pipeline(self, self._options)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _
self = <apache_beam.runners.dataflow.test_dataflow_runner.TestDataflowRunner
object at 0x7fb48857f0d0>
pipeline = <apache_beam.pipeline.Pipeline object at 0x7fb488584490>
options = <apache_beam.options.pipeline_options.PipelineOptions object at
0x7fb4885841d0>
def run_pipeline(self, pipeline, options):
"""Execute test pipeline and verify test matcher"""
test_options = options.view_as(TestOptions)
on_success_matcher = test_options.on_success_matcher
wait_duration = test_options.wait_until_finish_duration
is_streaming = options.view_as(StandardOptions).streaming
# [BEAM-1889] Do not send this to remote workers also, there is no
need to
# send this option to remote executors.
test_options.on_success_matcher = None
self.result = super().run_pipeline(pipeline, options)
if self.result.has_job:
# TODO(markflyhigh)(https://github.com/apache/beam/issues/18254): Use
# print since Nose dosen't show logs in some cases.
print('Worker logs: %s' % self.build_console_url(options))
_LOGGER.info('Console log: ')
_LOGGER.info(self.build_console_url(options))
try:
self.wait_until_in_state(PipelineState.RUNNING)
if is_streaming and not wait_duration:
_LOGGER.warning('Waiting indefinitely for streaming job.')
self.result.wait_until_finish(duration=wait_duration)
if on_success_matcher:
from hamcrest import assert_that as hc_assert_that
> hc_assert_that(self.result, pickler.loads(on_success_matcher))
E AssertionError:
E Expected: (Test pipeline expected terminated in state: RUNNING and
Expected 500 messages.)
E but: Expected 500 messages. Got 486 messages. Diffs (item,
count):
E Expected but not in actual: dict_items([(b'11: 1', 1), (b'159:
1', 1), (b'161: 1', 1), (b'176: 1', 1), (b'195: 1', 1), (b'202: 1', 1), (b'203:
1', 1), (b'217: 1', 1), (b'219: 1', 1), (b'277: 1', 1), (b'320: 1', 1), (b'446:
1', 1), (b'466: 1', 1), (b'485: 1', 1)])
E Unexpected: dict_items([])
E Unexpected (with all details): []
apache_beam/runners/dataflow/test_dataflow_runner.py:70: AssertionError
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]