damccorm commented on code in PR #23830:
URL: https://github.com/apache/beam/pull/23830#discussion_r1004757497
##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -337,6 +337,7 @@ def __init__(self, namespace: str):
# Metrics
self._inference_counter = beam.metrics.Metrics.counter(
namespace, 'num_inferences')
+ self.failed_batches_counter = 0
Review Comment:
This should probably be initialized with
`beam.metrics.Metrics.counter(namespace, 'failed_batches_counter')` so that we
correctly capture the namespace
##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -426,17 +427,22 @@ def setup(self):
def process(self, batch, inference_args):
start_time = _to_microseconds(self._clock.time_ns())
- result_generator = self._model_handler.run_inference(
+ try:
+ result_generator = self._model_handler.run_inference(
batch, self._model, inference_args)
- predictions = list(result_generator)
+ except BaseException:
+ self._metrics_collector.failed_batches_counter.inc()
+ raise
+ else:
Review Comment:
We don't need to nest this clause in an else if we're raising the exception
##########
sdks/python/apache_beam/ml/inference/base_test.py:
##########
@@ -171,6 +171,38 @@ def test_unexpected_inference_args_passed(self):
FakeModelHandlerFailsOnInferenceArgs(),
inference_args=inference_args)
+ def test_increment_failed_batches_counter(self):
+ with self.assertRaises(ValueError, FakeModelHandlerFailsOnInferenceArgs):
+ with TestPipeline() as pipeline:
+ examples = [1, 5, 3, 10]
+ pcoll = pipeline | 'start' >> beam.Create(examples)
+ inference_args = {'key': True}
+ _ = pcoll | base.RunInference(FakeModelHandlerFailsOnInferenceArgs(),
+ inference_args=inference_args)
+ run_result = pipeline.run()
+ run_result.wait_until_finish()
+
+ metric_results = (
+
run_result.metrics().query(MetricsFilter().with_name('failed_batches_counter')))
+ num_failed_batches_counter = metric_results['counters'][0]
+ self.assertEqual(num_failed_batches_counter.committed, 1)
Review Comment:
Is this guaranteed to be 1, or could it be larger if the example got batched
differently (e.g. if our batch size is 1)? Could we pass through
`min_batch_size` and `max_batch_size` to force the batching to a reasonable
size?
##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -337,6 +337,7 @@ def __init__(self, namespace: str):
# Metrics
self._inference_counter = beam.metrics.Metrics.counter(
namespace, 'num_inferences')
+ self.failed_batches_counter = 0
Review Comment:
It looks like this is causing the tests to fail to run FWIW -
https://ci-beam.apache.org/job/beam_PreCommit_Python_Commit/25468/consoleFull
##########
sdks/python/apache_beam/ml/inference/base_test.py:
##########
@@ -171,6 +171,38 @@ def test_unexpected_inference_args_passed(self):
FakeModelHandlerFailsOnInferenceArgs(),
inference_args=inference_args)
+ def test_increment_failed_batches_counter(self):
+ with self.assertRaises(ValueError, FakeModelHandlerFailsOnInferenceArgs):
+ with TestPipeline() as pipeline:
+ examples = [1, 5, 3, 10]
+ pcoll = pipeline | 'start' >> beam.Create(examples)
+ inference_args = {'key': True}
+ _ = pcoll | base.RunInference(FakeModelHandlerFailsOnInferenceArgs(),
+ inference_args=inference_args)
+ run_result = pipeline.run()
+ run_result.wait_until_finish()
+
+ metric_results = (
+
run_result.metrics().query(MetricsFilter().with_name('failed_batches_counter')))
+ num_failed_batches_counter = metric_results['counters'][0]
+ self.assertEqual(num_failed_batches_counter.committed, 1)
Review Comment:
FWIW, this whole setup may just cause the job to fail and retry a few times
(in which case the failed counter would probably be 3 since we'd do that many
retries). One testing alternative would be to optionally track if we've failed
an inference in the FakeModelHandlerFailsOnInferenceArgs and succeed on retry.
That would simulate a transient failure
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]