tvalentyn opened a new issue, #37779: URL: https://github.com/apache/beam/issues/37779
### What happened? Saw the below error on https://github.com/apache/beam/actions/runs/22721571966/job/65884874499?pr=37725: ``` =================================== FAILURES =================================== ______ TestAnomalyDetection.test_multiple_detectors_without_aggregation_0 ______ [gw2] linux -- Python 3.13.3 /runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-ml/py313-ml/bin/python a = (<apache_beam.ml.anomaly.transforms_test.TestAnomalyDetection testMethod=test_multiple_detectors_without_aggregation_0>,) kw = {} @wraps(func) def standalone_func(*a, **kw): > return func(*(a + p.args), **p.kwargs, **kw) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/parameterized/parameterized.py:620: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ apache_beam/ml/anomaly/transforms_test.py:254: in test_multiple_detectors_without_aggregation with beam.Pipeline() as p: ^^^^^^^^^^^^^^^ target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/apache_beam/pipeline.py:655: in __exit__ self.result.wait_until_finish() _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7cf663edb380> duration = None def wait_until_finish(self, duration=None): """ :param duration: The maximum time in milliseconds to wait for the result of the execution. If None or zero, will wait until the pipeline finishes. :return: The result of the pipeline, i.e. PipelineResult. """ last_error_text = None def read_messages() -> None: nonlocal last_error_text previous_state = -1 for message in self._message_stream: if message.HasField('message_response'): mr = message.message_response logging.log(MESSAGE_LOG_LEVELS[mr.importance], "%s", mr.message_text) if mr.importance == beam_job_api_pb2.JobMessage.JOB_MESSAGE_ERROR: last_error_text = mr.message_text else: current_state = message.state_response.state if current_state != previous_state: _LOGGER.info( "Job state changed to %s", self.runner_api_state_to_pipeline_state(current_state)) previous_state = current_state self._messages.append(message) message_thread = threading.Thread( target=read_messages, name='wait_until_finish_read') message_thread.daemon = True message_thread.start() if duration: state_thread = threading.Thread( target=functools.partial(self._observe_state, message_thread), name='wait_until_finish_state_observer') E File "apache_beam/runners/common.py", line 1588, in apache_beam.runners.common.DoFnRunner._reraise_augmented E raise exn E File "apache_beam/runners/common.py", line 1498, in apache_beam.runners.common.DoFnRunner.process E return self.do_fn_invoker.invoke_process(windowed_value) E File "apache_beam/runners/common.py", line 684, in apache_beam.runners.common.SimpleInvoker.invoke_process E self.output_handler.handle_process_outputs( E File "apache_beam/runners/common.py", line 1683, in apache_beam.runners.common._OutputHandler.handle_process_outputs E self._write_value_to_tag(tag, windowed_value, watermark_estimator) E File "apache_beam/runners/common.py", line 1796, in apache_beam.runners.common._OutputHandler._write_value_to_tag E self.main_receivers.receive(windowed_value) E File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive E self.consumer.process(windowed_value) E File "apache_beam/runners/worker/operations.py", line 954, in apache_beam.runners.worker.operations.DoOperation.process E with self.scoped_process_state: E File "apache_beam/runners/worker/operations.py", line 955, in apache_beam.runners.worker.operations.DoOperation.process E delayed_applications = self.dofn_runner.process(o) E File "apache_beam/runners/common.py", line 1500, in apache_beam.runners.common.DoFnRunner.process E self._reraise_augmented(exn, windowed_value) E File "apache_beam/runners/common.py", line 1609, in apache_beam.runners.common.DoFnRunner._reraise_augmented E raise new_exn E File "apache_beam/runners/common.py", line 1498, in apache_beam.runners.common.DoFnRunner.process E return self.do_fn_invoker.invoke_process(windowed_value) E File "apache_beam/runners/common.py", line 912, in apache_beam.runners.common.PerWindowInvoker.invoke_process E self._invoke_process_per_window( E File "apache_beam/runners/common.py", line 1057, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window E self.process_method(*args_for_process, **kwargs_for_process), E File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/apache_beam/transforms/core.py", line 2116, in <lambda> E wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)] E ~~^^^^^^^^^^^^^^^^^^^^ E File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py313/build/srcs/sdks/python/target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/apache_beam/testing/util.py", line 203, in _equal E raise BeamAssertException(msg) E apache_beam.testing.util.BeamAssertException: Failed assert: [(1, AnomalyResult(example=Row(x1=1, x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, label=-2, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=nan, label=-2, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=Row(x1=2, x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, label=-2, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=nan, label=-2, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=Row(x1=3, x2=5), predictions=[AnomalyPrediction(model_id='zscore_x1', score=2.1213203435596424, label=0, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=0, label=0, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=Row(x1=10, x2=4), predictions=[AnomalyPrediction(model_id ='zscore_x1', score=8.0, label=1, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=0.5773502691896252, label=0, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=Row(x1=2, x2=10), predictions=[AnomalyPrediction(model_id='zscore_x1', score=0.4898979485566356, label=0, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=11.5, label=1, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=Row(x1=3, x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=0.16452254913212455, label=0, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=0.5368754921931594, label=0, threshold=2, info='', source_predictions=None)])), (2, AnomalyResult(example=Row(x1=100, x2=5), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, label=-2, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_ id='zscore_x2', score=nan, label=-2, threshold=2, info='', source_predictions=None)]))] == [(1, AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=1, x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, label=-2, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=nan, label=-2, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=2, x2=10), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, label=-2, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=11.5, label=1, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=10, x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=8.0, label=1, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=0.57735 02691896252, label=0, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=2, x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, label=-2, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=nan, label=-2, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=3, x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, label=-2, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=0.5368754921931594, label=0, threshold=2, info='', source_predictions=None)])), (2, AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=100, x2=5), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, label=-2, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=nan, label= -2, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=3, x2=5), predictions=[AnomalyPrediction(model_id='zscore_x1', score=2.1213203435596424, label=0, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=0.0, label=0, threshold=2, info='', source_predictions=None)]))], unexpected elements [(1, AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=2, x2=10), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, label=-2, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=11.5, label=1, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=BeamSchema_541f8cd6_174e_4098_9cd8_ce13d8d5a388(x1=3, x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=nan, label=-2, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=0.5368754 921931594, label=0, threshold=2, info='', source_predictions=None)]))], missing elements [(1, AnomalyResult(example=Row(x1=2, x2=10), predictions=[AnomalyPrediction(model_id='zscore_x1', score=0.4898979485566356, label=0, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=11.5, label=1, threshold=2, info='', source_predictions=None)])), (1, AnomalyResult(example=Row(x1=3, x2=4), predictions=[AnomalyPrediction(model_id='zscore_x1', score=0.16452254913212455, label=0, threshold=3, info='', source_predictions=None), AnomalyPrediction(model_id='zscore_x2', score=0.5368754921931594, label=0, threshold=2, info='', source_predictions=None)]))] [while running 'assert_that/Match'] ``` ### Issue Failure Failure: Test is flaky ### Issue Priority Priority: 1 (unhealthy code / failing or flaky postcommit so we cannot be sure the product is healthy) ### Issue Components - [ ] Component: Python SDK - [ ] Component: Java SDK - [ ] Component: Go SDK - [ ] Component: Typescript SDK - [ ] Component: IO connector - [ ] Component: Beam YAML - [ ] Component: Beam examples - [ ] Component: Beam playground - [ ] Component: Beam katas - [ ] Component: Website - [ ] Component: Infrastructure - [ ] Component: Spark Runner - [ ] Component: Flink Runner - [ ] Component: Samza Runner - [ ] Component: Twister2 Runner - [ ] Component: Hazelcast Jet Runner - [ ] Component: Google Cloud Dataflow Runner -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
