BjornPrime commented on code in PR #23830:
URL: https://github.com/apache/beam/pull/23830#discussion_r1014127251
##########
sdks/python/apache_beam/ml/inference/base_test.py:
##########
@@ -171,6 +171,38 @@ def test_unexpected_inference_args_passed(self):
FakeModelHandlerFailsOnInferenceArgs(),
inference_args=inference_args)
+ def test_increment_failed_batches_counter(self):
+ with self.assertRaises(ValueError, FakeModelHandlerFailsOnInferenceArgs):
+ with TestPipeline() as pipeline:
+ examples = [1, 5, 3, 10]
+ pcoll = pipeline | 'start' >> beam.Create(examples)
+ inference_args = {'key': True}
+ _ = pcoll | base.RunInference(FakeModelHandlerFailsOnInferenceArgs(),
+ inference_args=inference_args)
+ run_result = pipeline.run()
+ run_result.wait_until_finish()
+
+ metric_results = (
+
run_result.metrics().query(MetricsFilter().with_name('failed_batches_counter')))
+ num_failed_batches_counter = metric_results['counters'][0]
+ self.assertEqual(num_failed_batches_counter.committed, 1)
Review Comment:
I've reduced the examples array to a length of one so hopefully only one
batch will run.
Are the retries handled within run_inference()? If so, I expect the error is
only thrown up to process() once the retries have been attempted, so I think
even if the batch is retried a few times, we'd only increment the counter once,
since the incrementation only happens when the error is handled in process(),
which should stop all further execution of the method. Let me know if I'm wrong
on that.
Also, if that is true and we're only counting failures once, I'm not sure
there's use in testing on a transient failure, since the error would never
reach process() to trigger the incrementation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]