This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch users/damccorm/riMpCP
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 567f7a082ba81ee0532c7d2637fd5a7baba1f490
Author: Danny McCormick <dannymccorm...@google.com>
AuthorDate: Thu Apr 25 11:32:46 2024 -0400

    Add ability to load multiple copies of a model across processes (#31052)
    
    * Add ability to load multiple copies of a model across processes
    
    * push changes I had locally not remotely
    
    * Lint
    
    * naming + lint
    
    * Changes from feedback
---
 sdks/python/apache_beam/ml/inference/base.py       | 110 +++++++++++++++++++--
 sdks/python/apache_beam/ml/inference/base_test.py  |  71 +++++++++++++
 .../ml/inference/huggingface_inference.py          |  36 +++++--
 .../apache_beam/ml/inference/onnx_inference.py     |  12 ++-
 .../apache_beam/ml/inference/pytorch_inference.py  |  24 ++++-
 .../apache_beam/ml/inference/sklearn_inference.py  |  24 ++++-
 .../ml/inference/tensorflow_inference.py           |  24 ++++-
 .../apache_beam/ml/inference/tensorrt_inference.py |  12 ++-
 8 files changed, 283 insertions(+), 30 deletions(-)

diff --git a/sdks/python/apache_beam/ml/inference/base.py 
b/sdks/python/apache_beam/ml/inference/base.py
index 587b3060c23..6fe2d5acc5c 100644
--- a/sdks/python/apache_beam/ml/inference/base.py
+++ b/sdks/python/apache_beam/ml/inference/base.py
@@ -315,6 +315,13 @@ class ModelHandler(Generic[ExampleT, PredictionT, ModelT]):
     
https://beam.apache.org/releases/pydoc/current/apache_beam.utils.multi_process_shared.html""";
     return False
 
+  def model_copies(self) -> int:
+    """Returns the maximum number of model copies that should be loaded at one
+    time. This only impacts model handlers that are using
+    share_model_across_processes to share their model across processes instead
+    of being loaded per process."""
+    return 1
+
   def override_metrics(self, metrics_namespace: str = '') -> bool:
     """Returns a boolean representing whether or not a model handler will
     override metrics reporting. If True, RunInference will not report any
@@ -795,6 +802,21 @@ class KeyedModelHandler(Generic[KeyT, ExampleT, 
PredictionT, ModelT],
       return self._unkeyed.share_model_across_processes()
     return True
 
+  def model_copies(self) -> int:
+    if self._single_model:
+      return self._unkeyed.model_copies()
+    for mh in self._id_to_mh_map.values():
+      if mh.model_copies() != 1:
+        raise ValueError(
+            'KeyedModelHandler cannot map records to multiple '
+            'models if one or more of its ModelHandlers '
+            'require multiple model copies (set via '
+            'model_copies). To fix, verify that each '
+            'ModelHandler is not set to load multiple copies of '
+            'its model.')
+
+    return 1
+
   def override_metrics(self, metrics_namespace: str = '') -> bool:
     if self._single_model:
       return self._unkeyed.override_metrics(metrics_namespace)
@@ -902,6 +924,9 @@ class MaybeKeyedModelHandler(Generic[KeyT, ExampleT, 
PredictionT, ModelT],
   def share_model_across_processes(self) -> bool:
     return self._unkeyed.share_model_across_processes()
 
+  def model_copies(self) -> int:
+    return self._unkeyed.model_copies()
+
 
 class _PrebatchedModelHandler(Generic[ExampleT, PredictionT, ModelT],
                               ModelHandler[Sequence[ExampleT],
@@ -952,6 +977,12 @@ class _PrebatchedModelHandler(Generic[ExampleT, 
PredictionT, ModelT],
   def should_skip_batching(self) -> bool:
     return True
 
+  def share_model_across_processes(self) -> bool:
+    return self._base.share_model_across_processes()
+
+  def model_copies(self) -> int:
+    return self._base.model_copies()
+
   def get_postprocess_fns(self) -> Iterable[Callable[[Any], Any]]:
     return self._base.get_postprocess_fns()
 
@@ -1012,6 +1043,12 @@ class _PreProcessingModelHandler(Generic[ExampleT,
   def should_skip_batching(self) -> bool:
     return self._base.should_skip_batching()
 
+  def share_model_across_processes(self) -> bool:
+    return self._base.share_model_across_processes()
+
+  def model_copies(self) -> int:
+    return self._base.model_copies()
+
   def get_postprocess_fns(self) -> Iterable[Callable[[Any], Any]]:
     return self._base.get_postprocess_fns()
 
@@ -1071,6 +1108,12 @@ class _PostProcessingModelHandler(Generic[ExampleT,
   def should_skip_batching(self) -> bool:
     return self._base.should_skip_batching()
 
+  def share_model_across_processes(self) -> bool:
+    return self._base.share_model_across_processes()
+
+  def model_copies(self) -> int:
+    return self._base.model_copies()
+
   def get_postprocess_fns(self) -> Iterable[Callable[[Any], Any]]:
     return self._base.get_postprocess_fns() + [self._postprocess_fn]
 
@@ -1378,6 +1421,45 @@ class _MetricsCollector:
     self._inference_request_batch_byte_size.update(examples_byte_size)
 
 
+class _ModelRoutingStrategy():
+  """A class meant to sit in a shared location for mapping incoming batches to
+  different models. Currently only supports round-robin, but can be extended
+  to support other protocols if needed.
+  """
+  def __init__(self):
+    self._cur_index = 0
+
+  def next_model_index(self, num_models):
+    self._cur_index = (self._cur_index + 1) % num_models
+    return self._cur_index
+
+
+class _SharedModelWrapper():
+  """A router class to map incoming calls to the correct model.
+  
+    This allows us to round robin calls to models sitting in different
+    processes so that we can more efficiently use resources (e.g. GPUs).
+  """
+  def __init__(self, models: List[Any], model_tag: str):
+    self.models = models
+    if len(models) > 1:
+      self.model_router = multi_process_shared.MultiProcessShared(
+          lambda: _ModelRoutingStrategy(),
+          tag=f'{model_tag}_counter',
+          always_proxy=True).acquire()
+
+  def next_model(self):
+    if len(self.models) == 1:
+      # Short circuit if there's no routing strategy needed in order to
+      # avoid the cross-process call
+      return self.models[0]
+
+    return self.models[self.model_router.next_model_index(len(self.models))]
+
+  def all_models(self):
+    return self.models
+
+
 class _RunInferenceDoFn(beam.DoFn, Generic[ExampleT, PredictionT]):
   def __init__(
       self,
@@ -1408,7 +1490,8 @@ class _RunInferenceDoFn(beam.DoFn, Generic[ExampleT, 
PredictionT]):
   def _load_model(
       self,
       side_input_model_path: Optional[Union[str,
-                                            List[KeyModelPathMapping]]] = 
None):
+                                            List[KeyModelPathMapping]]] = None
+  ) -> _SharedModelWrapper:
     def load():
       """Function for constructing shared LoadedModel."""
       memory_before = _get_current_process_memory_in_bytes()
@@ -1416,8 +1499,10 @@ class _RunInferenceDoFn(beam.DoFn, Generic[ExampleT, 
PredictionT]):
       if isinstance(side_input_model_path, str):
         self._model_handler.update_model_path(side_input_model_path)
       else:
-        self._model_handler.update_model_paths(
-            self._model, side_input_model_path)
+        if self._model is not None:
+          models = self._model.all_models()
+          for m in models:
+            self._model_handler.update_model_paths(m, side_input_model_path)
       model = self._model_handler.load_model()
       end_time = _to_milliseconds(self._clock.time_ns())
       memory_after = _get_current_process_memory_in_bytes()
@@ -1434,10 +1519,15 @@ class _RunInferenceDoFn(beam.DoFn, Generic[ExampleT, 
PredictionT]):
     if isinstance(side_input_model_path, str) and side_input_model_path != '':
       model_tag = side_input_model_path
     if self._model_handler.share_model_across_processes():
-      model = multi_process_shared.MultiProcessShared(
-          load, tag=model_tag, always_proxy=True).acquire()
+      models = []
+      for i in range(self._model_handler.model_copies()):
+        models.append(
+            multi_process_shared.MultiProcessShared(
+                load, tag=f'{model_tag}{i}', always_proxy=True).acquire())
+      model_wrapper = _SharedModelWrapper(models, model_tag)
     else:
       model = self._shared_model_handle.acquire(load, tag=model_tag)
+      model_wrapper = _SharedModelWrapper([model], model_tag)
     # since shared_model_handle is shared across threads, the model path
     # might not get updated in the model handler
     # because we directly get cached weak ref model from shared cache, instead
@@ -1445,8 +1535,11 @@ class _RunInferenceDoFn(beam.DoFn, Generic[ExampleT, 
PredictionT]):
     if isinstance(side_input_model_path, str):
       self._model_handler.update_model_path(side_input_model_path)
     else:
-      self._model_handler.update_model_paths(self._model, 
side_input_model_path)
-    return model
+      if self._model is not None:
+        models = self._model.all_models()
+        for m in models:
+          self._model_handler.update_model_paths(m, side_input_model_path)
+    return model_wrapper
 
   def get_metrics_collector(self, prefix: str = ''):
     """
@@ -1476,8 +1569,9 @@ class _RunInferenceDoFn(beam.DoFn, Generic[ExampleT, 
PredictionT]):
   def _run_inference(self, batch, inference_args):
     start_time = _to_microseconds(self._clock.time_ns())
     try:
+      model = self._model.next_model()
       result_generator = self._model_handler.run_inference(
-          batch, self._model, inference_args)
+          batch, model, inference_args)
     except BaseException as e:
       if self._metrics_collector:
         self._metrics_collector.failed_batches_counter.inc()
diff --git a/sdks/python/apache_beam/ml/inference/base_test.py 
b/sdks/python/apache_beam/ml/inference/base_test.py
index d237aee1ce9..ec1664f494c 100644
--- a/sdks/python/apache_beam/ml/inference/base_test.py
+++ b/sdks/python/apache_beam/ml/inference/base_test.py
@@ -63,6 +63,15 @@ class FakeStatefulModel:
     self._state += amount
 
 
+class FakeIncrementingModel:
+  def __init__(self):
+    self._state = 0
+
+  def predict(self, example: int) -> int:
+    self._state += 1
+    return self._state
+
+
 class FakeModelHandler(base.ModelHandler[int, int, FakeModel]):
   def __init__(
       self,
@@ -71,6 +80,8 @@ class FakeModelHandler(base.ModelHandler[int, int, 
FakeModel]):
       max_batch_size=9999,
       multi_process_shared=False,
       state=None,
+      incrementing=False,
+      max_copies=1,
       num_bytes_per_element=None,
       **kwargs):
     self._fake_clock = clock
@@ -79,11 +90,16 @@ class FakeModelHandler(base.ModelHandler[int, int, 
FakeModel]):
     self._env_vars = kwargs.get('env_vars', {})
     self._multi_process_shared = multi_process_shared
     self._state = state
+    self._incrementing = incrementing
+    self._max_copies = max_copies
     self._num_bytes_per_element = num_bytes_per_element
 
   def load_model(self):
+    assert (not self._incrementing or self._state is None)
     if self._fake_clock:
       self._fake_clock.current_time_ns += 500_000_000  # 500ms
+    if self._incrementing:
+      return FakeIncrementingModel()
     if self._state is not None:
       return FakeStatefulModel(self._state)
     return FakeModel()
@@ -116,6 +132,9 @@ class FakeModelHandler(base.ModelHandler[int, int, 
FakeModel]):
   def share_model_across_processes(self):
     return self._multi_process_shared
 
+  def model_copies(self):
+    return self._max_copies
+
   def get_num_bytes(self, batch: Sequence[int]) -> int:
     if self._num_bytes_per_element:
       return self._num_bytes_per_element * len(batch)
@@ -258,6 +277,58 @@ class RunInferenceBaseTest(unittest.TestCase):
           FakeModelHandler(multi_process_shared=True))
       assert_that(actual, equal_to(expected), label='assert:inferences')
 
+  def test_run_inference_impl_simple_examples_multi_process_shared_multi_copy(
+      self):
+    with TestPipeline() as pipeline:
+      examples = [1, 5, 3, 10]
+      expected = [example + 1 for example in examples]
+      pcoll = pipeline | 'start' >> beam.Create(examples)
+      actual = pcoll | base.RunInference(
+          FakeModelHandler(multi_process_shared=True, max_copies=4))
+      assert_that(actual, equal_to(expected), label='assert:inferences')
+
+  def test_run_inference_impl_multi_process_shared_incrementing_multi_copy(
+      self):
+    with TestPipeline() as pipeline:
+      examples = [1, 5, 3, 10, 1, 5, 3, 10, 1, 5, 3, 10, 1, 5, 3, 10]
+      expected = [1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3, 4, 4, 4, 4]
+      pcoll = pipeline | 'start' >> beam.Create(examples)
+      actual = pcoll | base.RunInference(
+          FakeModelHandler(
+              multi_process_shared=True,
+              max_copies=4,
+              incrementing=True,
+              max_batch_size=1))
+      assert_that(actual, equal_to(expected), label='assert:inferences')
+
+  def test_run_inference_impl_mps_nobatch_incrementing_multi_copy(self):
+    with TestPipeline() as pipeline:
+      examples = [1, 5, 3, 10, 1, 5, 3, 10, 1, 5, 3, 10, 1, 5, 3, 10]
+      expected = [1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3, 4, 4, 4, 4]
+      batched_examples = [[example] for example in examples]
+      pcoll = pipeline | 'start' >> beam.Create(batched_examples)
+      actual = pcoll | base.RunInference(
+          FakeModelHandler(
+              multi_process_shared=True, max_copies=4,
+              incrementing=True).with_no_batching())
+      assert_that(actual, equal_to(expected), label='assert:inferences')
+
+  def test_run_inference_impl_keyed_mps_incrementing_multi_copy(self):
+    with TestPipeline() as pipeline:
+      examples = [1, 5, 3, 10, 1, 5, 3, 10, 1, 5, 3, 10, 1, 5, 3, 10]
+      keyed_examples = [('abc', example) for example in examples]
+      expected = [1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3, 4, 4, 4, 4]
+      keyed_expected = [('abc', val) for val in expected]
+      pcoll = pipeline | 'start' >> beam.Create(keyed_examples)
+      actual = pcoll | base.RunInference(
+          base.KeyedModelHandler(
+              FakeModelHandler(
+                  multi_process_shared=True,
+                  max_copies=4,
+                  incrementing=True,
+                  max_batch_size=1)))
+      assert_that(actual, equal_to(keyed_expected), label='assert:inferences')
+
   def test_run_inference_impl_with_keyed_examples(self):
     with TestPipeline() as pipeline:
       examples = [1, 5, 3, 10]
diff --git a/sdks/python/apache_beam/ml/inference/huggingface_inference.py 
b/sdks/python/apache_beam/ml/inference/huggingface_inference.py
index 25367d22eaa..28e24d920fb 100644
--- a/sdks/python/apache_beam/ml/inference/huggingface_inference.py
+++ b/sdks/python/apache_beam/ml/inference/huggingface_inference.py
@@ -225,6 +225,7 @@ class 
HuggingFaceModelHandlerKeyedTensor(ModelHandler[Dict[str,
       max_batch_size: Optional[int] = None,
       max_batch_duration_secs: Optional[int] = None,
       large_model: bool = False,
+      model_copies: Optional[int] = None,
       **kwargs):
     """
     Implementation of the ModelHandler interface for HuggingFace with
@@ -257,6 +258,9 @@ class 
HuggingFaceModelHandlerKeyedTensor(ModelHandler[Dict[str,
         memory pressure if you load multiple copies. Given a model that
         consumes N memory and a machine with W cores and M memory, you should
         set this to True if N*W > M.
+      model_copies: The exact number of models that you would like loaded
+        onto your machine. This can be useful if you exactly know your CPU or
+        GPU capacity and want to maximize resource utilization.
       kwargs: 'env_vars' can be used to set environment variables
         before loading the model.
 
@@ -276,7 +280,8 @@ class 
HuggingFaceModelHandlerKeyedTensor(ModelHandler[Dict[str,
       self._batching_kwargs["max_batch_size"] = max_batch_size
     if max_batch_duration_secs is not None:
       self._batching_kwargs["max_batch_duration_secs"] = 
max_batch_duration_secs
-    self._large_model = large_model
+    self._share_across_processes = large_model or (model_copies is not None)
+    self._model_copies = model_copies or 1
     self._framework = framework
 
     _validate_constructor_args(
@@ -350,7 +355,10 @@ class 
HuggingFaceModelHandlerKeyedTensor(ModelHandler[Dict[str,
     return self._batching_kwargs
 
   def share_model_across_processes(self) -> bool:
-    return self._large_model
+    return self._share_across_processes
+
+  def model_copies(self) -> int:
+    return self._model_copies
 
   def get_metrics_namespace(self) -> str:
     """
@@ -405,6 +413,7 @@ class 
HuggingFaceModelHandlerTensor(ModelHandler[Union[tf.Tensor, torch.Tensor],
       max_batch_size: Optional[int] = None,
       max_batch_duration_secs: Optional[int] = None,
       large_model: bool = False,
+      model_copies: Optional[int] = None,
       **kwargs):
     """
     Implementation of the ModelHandler interface for HuggingFace with
@@ -437,6 +446,9 @@ class 
HuggingFaceModelHandlerTensor(ModelHandler[Union[tf.Tensor, torch.Tensor],
         memory pressure if you load multiple copies. Given a model that
         consumes N memory and a machine with W cores and M memory, you should
         set this to True if N*W > M.
+      model_copies: The exact number of models that you would like loaded
+        onto your machine. This can be useful if you exactly know your CPU or
+        GPU capacity and want to maximize resource utilization.
       kwargs: 'env_vars' can be used to set environment variables
         before loading the model.
 
@@ -456,7 +468,8 @@ class 
HuggingFaceModelHandlerTensor(ModelHandler[Union[tf.Tensor, torch.Tensor],
       self._batching_kwargs["max_batch_size"] = max_batch_size
     if max_batch_duration_secs is not None:
       self._batching_kwargs["max_batch_duration_secs"] = 
max_batch_duration_secs
-    self._large_model = large_model
+    self._share_across_processes = large_model or (model_copies is not None)
+    self._model_copies = model_copies or 1
     self._framework = ""
 
     _validate_constructor_args(
@@ -537,7 +550,10 @@ class 
HuggingFaceModelHandlerTensor(ModelHandler[Union[tf.Tensor, torch.Tensor],
     return self._batching_kwargs
 
   def share_model_across_processes(self) -> bool:
-    return self._large_model
+    return self._share_across_processes
+
+  def model_copies(self) -> int:
+    return self._model_copies
 
   def get_metrics_namespace(self) -> str:
     """
@@ -578,6 +594,7 @@ class HuggingFacePipelineModelHandler(ModelHandler[str,
       max_batch_size: Optional[int] = None,
       max_batch_duration_secs: Optional[int] = None,
       large_model: bool = False,
+      model_copies: Optional[int] = None,
       **kwargs):
     """
     Implementation of the ModelHandler interface for Hugging Face Pipelines.
@@ -618,6 +635,9 @@ class HuggingFacePipelineModelHandler(ModelHandler[str,
         memory pressure if you load multiple copies. Given a model that
         consumes N memory and a machine with W cores and M memory, you should
         set this to True if N*W > M.
+      model_copies: The exact number of models that you would like loaded
+        onto your machine. This can be useful if you exactly know your CPU or
+        GPU capacity and want to maximize resource utilization.
       kwargs: 'env_vars' can be used to set environment variables
         before loading the model.
 
@@ -637,7 +657,8 @@ class HuggingFacePipelineModelHandler(ModelHandler[str,
       self._batching_kwargs['max_batch_size'] = max_batch_size
     if max_batch_duration_secs is not None:
       self._batching_kwargs["max_batch_duration_secs"] = 
max_batch_duration_secs
-    self._large_model = large_model
+    self._share_across_processes = large_model or (model_copies is not None)
+    self._model_copies = model_copies or 1
 
     # Check if the device is specified twice. If true then the device parameter
     # of model handler is overridden.
@@ -718,7 +739,10 @@ class HuggingFacePipelineModelHandler(ModelHandler[str,
     return self._batching_kwargs
 
   def share_model_across_processes(self) -> bool:
-    return self._large_model
+    return self._share_across_processes
+
+  def model_copies(self) -> int:
+    return self._model_copies
 
   def get_metrics_namespace(self) -> str:
     """
diff --git a/sdks/python/apache_beam/ml/inference/onnx_inference.py 
b/sdks/python/apache_beam/ml/inference/onnx_inference.py
index f7b6c0115af..e7af114ad43 100644
--- a/sdks/python/apache_beam/ml/inference/onnx_inference.py
+++ b/sdks/python/apache_beam/ml/inference/onnx_inference.py
@@ -64,6 +64,7 @@ class OnnxModelHandlerNumpy(ModelHandler[numpy.ndarray,
       *,
       inference_fn: NumpyInferenceFn = default_numpy_inference_fn,
       large_model: bool = False,
+      model_copies: Optional[int] = None,
       min_batch_size: Optional[int] = None,
       max_batch_size: Optional[int] = None,
       max_batch_duration_secs: Optional[int] = None,
@@ -84,6 +85,9 @@ class OnnxModelHandlerNumpy(ModelHandler[numpy.ndarray,
         memory pressure if you load multiple copies. Given a model that
         consumes N memory and a machine with W cores and M memory, you should
         set this to True if N*W > M.
+      model_copies: The exact number of models that you would like loaded
+        onto your machine. This can be useful if you exactly know your CPU or
+        GPU capacity and want to maximize resource utilization.
       min_batch_size: the minimum batch size to use when batching inputs.
       max_batch_size: the maximum batch size to use when batching inputs.
       max_batch_duration_secs: the maximum amount of time to buffer a batch
@@ -97,7 +101,8 @@ class OnnxModelHandlerNumpy(ModelHandler[numpy.ndarray,
     self._provider_options = provider_options
     self._model_inference_fn = inference_fn
     self._env_vars = kwargs.get('env_vars', {})
-    self._large_model = large_model
+    self._share_across_processes = large_model or (model_copies is not None)
+    self._model_copies = model_copies or 1
     self._batching_kwargs = {}
     if min_batch_size is not None:
       self._batching_kwargs["min_batch_size"] = min_batch_size
@@ -157,7 +162,10 @@ class OnnxModelHandlerNumpy(ModelHandler[numpy.ndarray,
     return 'BeamML_Onnx'
 
   def share_model_across_processes(self) -> bool:
-    return self._large_model
+    return self._share_across_processes
+
+  def model_copies(self) -> int:
+    return self._model_copies
 
   def batch_elements_kwargs(self) -> Mapping[str, Any]:
     return self._batching_kwargs
diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference.py 
b/sdks/python/apache_beam/ml/inference/pytorch_inference.py
index 480dc538195..9a89cba7243 100644
--- a/sdks/python/apache_beam/ml/inference/pytorch_inference.py
+++ b/sdks/python/apache_beam/ml/inference/pytorch_inference.py
@@ -195,6 +195,7 @@ class PytorchModelHandlerTensor(ModelHandler[torch.Tensor,
       max_batch_size: Optional[int] = None,
       max_batch_duration_secs: Optional[int] = None,
       large_model: bool = False,
+      model_copies: Optional[int] = None,
       load_model_args: Optional[Dict[str, Any]] = None,
       **kwargs):
     """Implementation of the ModelHandler interface for PyTorch.
@@ -234,6 +235,9 @@ class PytorchModelHandlerTensor(ModelHandler[torch.Tensor,
         memory pressure if you load multiple copies. Given a model that
         consumes N memory and a machine with W cores and M memory, you should
         set this to True if N*W > M.
+      model_copies: The exact number of models that you would like loaded
+        onto your machine. This can be useful if you exactly know your CPU or
+        GPU capacity and want to maximize resource utilization.
       load_model_args: a dictionary of parameters passed to the torch.load
         function to specify custom config for loading models.
       kwargs: 'env_vars' can be used to set environment variables
@@ -262,7 +266,8 @@ class PytorchModelHandlerTensor(ModelHandler[torch.Tensor,
     self._torch_script_model_path = torch_script_model_path
     self._load_model_args = load_model_args if load_model_args else {}
     self._env_vars = kwargs.get('env_vars', {})
-    self._large_model = large_model
+    self._share_across_processes = large_model or (model_copies is not None)
+    self._model_copies = model_copies or 1
 
     _validate_constructor_args(
         state_dict_path=self._state_dict_path,
@@ -344,7 +349,10 @@ class PytorchModelHandlerTensor(ModelHandler[torch.Tensor,
     return self._batching_kwargs
 
   def share_model_across_processes(self) -> bool:
-    return self._large_model
+    return self._share_across_processes
+
+  def model_copies(self) -> int:
+    return self._model_copies
 
 
 def default_keyed_tensor_inference_fn(
@@ -428,6 +436,7 @@ class PytorchModelHandlerKeyedTensor(ModelHandler[Dict[str, 
torch.Tensor],
       max_batch_size: Optional[int] = None,
       max_batch_duration_secs: Optional[int] = None,
       large_model: bool = False,
+      model_copies: Optional[int] = None,
       load_model_args: Optional[Dict[str, Any]] = None,
       **kwargs):
     """Implementation of the ModelHandler interface for PyTorch.
@@ -472,6 +481,9 @@ class PytorchModelHandlerKeyedTensor(ModelHandler[Dict[str, 
torch.Tensor],
         memory pressure if you load multiple copies. Given a model that
         consumes N memory and a machine with W cores and M memory, you should
         set this to True if N*W > M.
+      model_copies: The exact number of models that you would like loaded
+        onto your machine. This can be useful if you exactly know your CPU or
+        GPU capacity and want to maximize resource utilization.
       load_model_args: a dictionary of parameters passed to the torch.load
         function to specify custom config for loading models.
       kwargs: 'env_vars' can be used to set environment variables
@@ -500,7 +512,8 @@ class PytorchModelHandlerKeyedTensor(ModelHandler[Dict[str, 
torch.Tensor],
     self._torch_script_model_path = torch_script_model_path
     self._load_model_args = load_model_args if load_model_args else {}
     self._env_vars = kwargs.get('env_vars', {})
-    self._large_model = large_model
+    self._share_across_processes = large_model or (model_copies is not None)
+    self._model_copies = model_copies or 1
 
     _validate_constructor_args(
         state_dict_path=self._state_dict_path,
@@ -584,4 +597,7 @@ class PytorchModelHandlerKeyedTensor(ModelHandler[Dict[str, 
torch.Tensor],
     return self._batching_kwargs
 
   def share_model_across_processes(self) -> bool:
-    return self._large_model
+    return self._share_across_processes
+
+  def model_copies(self) -> int:
+    return self._model_copies
diff --git a/sdks/python/apache_beam/ml/inference/sklearn_inference.py 
b/sdks/python/apache_beam/ml/inference/sklearn_inference.py
index befeca7f33b..a29657968ea 100644
--- a/sdks/python/apache_beam/ml/inference/sklearn_inference.py
+++ b/sdks/python/apache_beam/ml/inference/sklearn_inference.py
@@ -92,6 +92,7 @@ class SklearnModelHandlerNumpy(ModelHandler[numpy.ndarray,
       max_batch_size: Optional[int] = None,
       max_batch_duration_secs: Optional[int] = None,
       large_model: bool = False,
+      model_copies: Optional[int] = None,
       **kwargs):
     """ Implementation of the ModelHandler interface for scikit-learn
     using numpy arrays as input.
@@ -118,6 +119,9 @@ class SklearnModelHandlerNumpy(ModelHandler[numpy.ndarray,
         memory pressure if you load multiple copies. Given a model that
         consumes N memory and a machine with W cores and M memory, you should
         set this to True if N*W > M.
+      model_copies: The exact number of models that you would like loaded
+        onto your machine. This can be useful if you exactly know your CPU or
+        GPU capacity and want to maximize resource utilization.
       kwargs: 'env_vars' can be used to set environment variables
         before loading the model.
     """
@@ -132,7 +136,8 @@ class SklearnModelHandlerNumpy(ModelHandler[numpy.ndarray,
     if max_batch_duration_secs is not None:
       self._batching_kwargs["max_batch_duration_secs"] = 
max_batch_duration_secs
     self._env_vars = kwargs.get('env_vars', {})
-    self._large_model = large_model
+    self._share_across_processes = large_model or (model_copies is not None)
+    self._model_copies = model_copies or 1
 
   def load_model(self) -> BaseEstimator:
     """Loads and initializes a model for processing."""
@@ -186,7 +191,10 @@ class SklearnModelHandlerNumpy(ModelHandler[numpy.ndarray,
     return self._batching_kwargs
 
   def share_model_across_processes(self) -> bool:
-    return self._large_model
+    return self._share_across_processes
+
+  def model_copies(self) -> int:
+    return self._model_copies
 
 
 PandasInferenceFn = Callable[
@@ -219,6 +227,7 @@ class 
SklearnModelHandlerPandas(ModelHandler[pandas.DataFrame,
       max_batch_size: Optional[int] = None,
       max_batch_duration_secs: Optional[int] = None,
       large_model: bool = False,
+      model_copies: Optional[int] = None,
       **kwargs):
     """Implementation of the ModelHandler interface for scikit-learn that
     supports pandas dataframes.
@@ -248,6 +257,9 @@ class 
SklearnModelHandlerPandas(ModelHandler[pandas.DataFrame,
         memory pressure if you load multiple copies. Given a model that
         consumes N memory and a machine with W cores and M memory, you should
         set this to True if N*W > M.
+      model_copies: The exact number of models that you would like loaded
+        onto your machine. This can be useful if you exactly know your CPU or
+        GPU capacity and want to maximize resource utilization.
       kwargs: 'env_vars' can be used to set environment variables
         before loading the model.
     """
@@ -262,7 +274,8 @@ class 
SklearnModelHandlerPandas(ModelHandler[pandas.DataFrame,
     if max_batch_duration_secs is not None:
       self._batching_kwargs["max_batch_duration_secs"] = 
max_batch_duration_secs
     self._env_vars = kwargs.get('env_vars', {})
-    self._large_model = large_model
+    self._share_across_processes = large_model or (model_copies is not None)
+    self._model_copies = model_copies or 1
 
   def load_model(self) -> BaseEstimator:
     """Loads and initializes a model for processing."""
@@ -318,4 +331,7 @@ class 
SklearnModelHandlerPandas(ModelHandler[pandas.DataFrame,
     return self._batching_kwargs
 
   def share_model_across_processes(self) -> bool:
-    return self._large_model
+    return self._share_across_processes
+
+  def model_copies(self) -> int:
+    return self._model_copies
diff --git a/sdks/python/apache_beam/ml/inference/tensorflow_inference.py 
b/sdks/python/apache_beam/ml/inference/tensorflow_inference.py
index 0802868a1dd..78b59975e63 100644
--- a/sdks/python/apache_beam/ml/inference/tensorflow_inference.py
+++ b/sdks/python/apache_beam/ml/inference/tensorflow_inference.py
@@ -112,6 +112,7 @@ class TFModelHandlerNumpy(ModelHandler[numpy.ndarray,
       max_batch_size: Optional[int] = None,
       max_batch_duration_secs: Optional[int] = None,
       large_model: bool = False,
+      model_copies: Optional[int] = None,
       **kwargs):
     """Implementation of the ModelHandler interface for Tensorflow.
 
@@ -137,6 +138,9 @@ class TFModelHandlerNumpy(ModelHandler[numpy.ndarray,
           memory pressure if you load multiple copies. Given a model that
           consumes N memory and a machine with W cores and M memory, you should
           set this to True if N*W > M.
+        model_copies: The exact number of models that you would like loaded
+          onto your machine. This can be useful if you exactly know your CPU or
+          GPU capacity and want to maximize resource utilization.
         kwargs: 'env_vars' can be used to set environment variables
           before loading the model.
 
@@ -157,7 +161,8 @@ class TFModelHandlerNumpy(ModelHandler[numpy.ndarray,
       self._batching_kwargs['max_batch_size'] = max_batch_size
     if max_batch_duration_secs is not None:
       self._batching_kwargs["max_batch_duration_secs"] = 
max_batch_duration_secs
-    self._large_model = large_model
+    self._share_across_processes = large_model or (model_copies is not None)
+    self._model_copies = model_copies or 1
 
   def load_model(self) -> tf.Module:
     """Loads and initializes a Tensorflow model for processing."""
@@ -222,7 +227,10 @@ class TFModelHandlerNumpy(ModelHandler[numpy.ndarray,
     return self._batching_kwargs
 
   def share_model_across_processes(self) -> bool:
-    return self._large_model
+    return self._share_across_processes
+
+  def model_copies(self) -> int:
+    return self._model_copies
 
 
 class TFModelHandlerTensor(ModelHandler[tf.Tensor, PredictionResult,
@@ -240,6 +248,7 @@ class TFModelHandlerTensor(ModelHandler[tf.Tensor, 
PredictionResult,
       max_batch_size: Optional[int] = None,
       max_batch_duration_secs: Optional[int] = None,
       large_model: bool = False,
+      model_copies: Optional[int] = None,
       **kwargs):
     """Implementation of the ModelHandler interface for Tensorflow.
 
@@ -270,6 +279,9 @@ class TFModelHandlerTensor(ModelHandler[tf.Tensor, 
PredictionResult,
           memory pressure if you load multiple copies. Given a model that
           consumes N memory and a machine with W cores and M memory, you should
           set this to True if N*W > M.
+        model_copies: The exact number of models that you would like loaded
+          onto your machine. This can be useful if you exactly know your CPU or
+          GPU capacity and want to maximize resource utilization.
         kwargs: 'env_vars' can be used to set environment variables
           before loading the model.
 
@@ -290,7 +302,8 @@ class TFModelHandlerTensor(ModelHandler[tf.Tensor, 
PredictionResult,
       self._batching_kwargs['max_batch_size'] = max_batch_size
     if max_batch_duration_secs is not None:
       self._batching_kwargs["max_batch_duration_secs"] = 
max_batch_duration_secs
-    self._large_model = large_model
+    self._share_across_processes = large_model or (model_copies is not None)
+    self._model_copies = model_copies or 1
 
   def load_model(self) -> tf.Module:
     """Loads and initializes a tensorflow model for processing."""
@@ -355,4 +368,7 @@ class TFModelHandlerTensor(ModelHandler[tf.Tensor, 
PredictionResult,
     return self._batching_kwargs
 
   def share_model_across_processes(self) -> bool:
-    return self._large_model
+    return self._share_across_processes
+
+  def model_copies(self) -> int:
+    return self._model_copies
diff --git a/sdks/python/apache_beam/ml/inference/tensorrt_inference.py 
b/sdks/python/apache_beam/ml/inference/tensorrt_inference.py
index 53b81c0c36c..b38947b494c 100644
--- a/sdks/python/apache_beam/ml/inference/tensorrt_inference.py
+++ b/sdks/python/apache_beam/ml/inference/tensorrt_inference.py
@@ -230,6 +230,7 @@ class TensorRTEngineHandlerNumPy(ModelHandler[np.ndarray,
       *,
       inference_fn: TensorRTInferenceFn = _default_tensorRT_inference_fn,
       large_model: bool = False,
+      model_copies: Optional[int] = None,
       max_batch_duration_secs: Optional[int] = None,
       **kwargs):
     """Implementation of the ModelHandler interface for TensorRT.
@@ -254,6 +255,9 @@ class TensorRTEngineHandlerNumPy(ModelHandler[np.ndarray,
         memory pressure if you load multiple copies. Given a model that
         consumes N memory and a machine with W cores and M memory, you should
         set this to True if N*W > M.
+      model_copies: The exact number of models that you would like loaded
+        onto your machine. This can be useful if you exactly know your CPU or
+        GPU capacity and want to maximize resource utilization.
       max_batch_duration_secs: the maximum amount of time to buffer 
         a batch before emitting; used in streaming contexts.
       kwargs: Additional arguments like 'engine_path' and 'onnx_path' are
@@ -272,7 +276,8 @@ class TensorRTEngineHandlerNumPy(ModelHandler[np.ndarray,
     elif 'onnx_path' in kwargs:
       self.onnx_path = kwargs.get('onnx_path')
     self._env_vars = kwargs.get('env_vars', {})
-    self._large_model = large_model
+    self._share_across_processes = large_model or (model_copies is not None)
+    self._model_copies = model_copies or 1
 
   def batch_elements_kwargs(self):
     """Sets min_batch_size and max_batch_size of a TensorRT engine."""
@@ -334,4 +339,7 @@ class TensorRTEngineHandlerNumPy(ModelHandler[np.ndarray,
     return 'BeamML_TensorRT'
 
   def share_model_across_processes(self) -> bool:
-    return self._large_model
+    return self._share_across_processes
+
+  def model_copies(self) -> int:
+    return self._model_copies


Reply via email to