This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 632f7d0800a Add integration tests to exercise large model (#28121)
632f7d0800a is described below

commit 632f7d0800aaa8fa7572547b8d0d4f63673936e3
Author: Danny McCormick <dannymccorm...@google.com>
AuthorDate: Wed Aug 23 15:07:32 2023 -0400

    Add integration tests to exercise large model (#28121)
---
 .../inference/huggingface_language_modeling.py     | 10 ++++++-
 .../inference/pytorch_language_modeling.py         | 10 ++++++-
 .../inference/sklearn_mnist_classification.py      | 10 ++++++-
 .../inference/tensorflow_mnist_classification.py   | 11 ++++++-
 .../inference/xgboost_iris_classification.py       | 10 ++++++-
 .../ml/inference/huggingface_inference_it_test.py  | 35 ++++++++++++++++++++++
 .../ml/inference/pytorch_inference_it_test.py      | 35 ++++++++++++++++++++++
 .../ml/inference/sklearn_inference_it_test.py      | 32 ++++++++++++++++++++
 .../ml/inference/tensorflow_inference_it_test.py   | 31 +++++++++++++++++++
 .../ml/inference/xgboost_inference_it_test.py      | 34 +++++++++++++++++++++
 10 files changed, 213 insertions(+), 5 deletions(-)

diff --git 
a/sdks/python/apache_beam/examples/inference/huggingface_language_modeling.py 
b/sdks/python/apache_beam/examples/inference/huggingface_language_modeling.py
index f6cb3de72b7..dbdb8c0651a 100644
--- 
a/sdks/python/apache_beam/examples/inference/huggingface_language_modeling.py
+++ 
b/sdks/python/apache_beam/examples/inference/huggingface_language_modeling.py
@@ -114,6 +114,13 @@ def parse_known_args(argv):
       dest='model_class',
       default=AutoModelForMaskedLM,
       help="Name of the model from Hugging Face")
+  parser.add_argument(
+      '--large_model',
+      action='store_true',
+      dest='large_model',
+      default=False,
+      help='Set to true if your model is large enough to run into memory '
+      'pressure if you load multiple copies.')
   return parser.parse_known_args(argv)
 
 
@@ -139,7 +146,8 @@ def run(
       model_uri=known_args.model_name,
       model_class=known_args.model_class,
       framework='pt',
-      max_batch_size=1)
+      max_batch_size=1,
+      large_model=known_args.large_model)
   if not known_args.input:
     text = (
         pipeline | 'CreateSentences' >> beam.Create([
diff --git 
a/sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py 
b/sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py
index b5fabbb1f1e..9de10e73e11 100644
--- a/sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py
+++ b/sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py
@@ -118,6 +118,13 @@ def parse_known_args(argv):
       dest='model_state_dict_path',
       required=True,
       help="Path to the model's state_dict.")
+  parser.add_argument(
+      '--large_model',
+      action='store_true',
+      dest='large_model',
+      default=False,
+      help='Set to true if your model is large enough to run into memory '
+      'pressure if you load multiple copies.')
   return parser.parse_known_args(argv)
 
 
@@ -166,7 +173,8 @@ def run(
   model_handler = PytorchNoBatchModelHandler(
       state_dict_path=known_args.model_state_dict_path,
       model_class=model_class,
-      model_params=model_params)
+      model_params=model_params,
+      large_model=known_args.large_model)
 
   pipeline = test_pipeline
   if not test_pipeline:
diff --git 
a/sdks/python/apache_beam/examples/inference/sklearn_mnist_classification.py 
b/sdks/python/apache_beam/examples/inference/sklearn_mnist_classification.py
index 6f8ea929bbb..5392cdf7dda 100644
--- a/sdks/python/apache_beam/examples/inference/sklearn_mnist_classification.py
+++ b/sdks/python/apache_beam/examples/inference/sklearn_mnist_classification.py
@@ -77,6 +77,13 @@ def parse_known_args(argv):
       dest='model_path',
       required=True,
       help='Path to load the Sklearn model for Inference.')
+  parser.add_argument(
+      '--large_model',
+      action='store_true',
+      dest='large_model',
+      default=False,
+      help='Set to true if your model is large enough to run into memory '
+      'pressure if you load multiple copies.')
   return parser.parse_known_args(argv)
 
 
@@ -103,7 +110,8 @@ def run(
   model_loader = KeyedModelHandler(
       SklearnModelHandlerNumpy(
           model_file_type=ModelFileType.PICKLE,
-          model_uri=known_args.model_path))
+          model_uri=known_args.model_path,
+          large_model=known_args.large_model))
 
   pipeline = test_pipeline
   if not test_pipeline:
diff --git 
a/sdks/python/apache_beam/examples/inference/tensorflow_mnist_classification.py 
b/sdks/python/apache_beam/examples/inference/tensorflow_mnist_classification.py
index 174d21b26af..6cf746e77cd 100644
--- 
a/sdks/python/apache_beam/examples/inference/tensorflow_mnist_classification.py
+++ 
b/sdks/python/apache_beam/examples/inference/tensorflow_mnist_classification.py
@@ -70,6 +70,13 @@ def parse_known_args(argv):
       dest='model_path',
       required=True,
       help='Path to load the Tensorflow model for Inference.')
+  parser.add_argument(
+      '--large_model',
+      action='store_true',
+      dest='large_model',
+      default=False,
+      help='Set to true if your model is large enough to run into memory '
+      'pressure if you load multiple copies.')
   return parser.parse_known_args(argv)
 
 
@@ -89,7 +96,9 @@ def run(
   # Therefore, we use KeyedModelHandler wrapper over TFModelHandlerNumpy.
   model_loader = KeyedModelHandler(
       TFModelHandlerNumpy(
-          model_uri=known_args.model_path, model_type=ModelType.SAVED_MODEL))
+          model_uri=known_args.model_path,
+          model_type=ModelType.SAVED_MODEL,
+          large_model=known_args.large_model))
 
   pipeline = test_pipeline
   if not test_pipeline:
diff --git 
a/sdks/python/apache_beam/examples/inference/xgboost_iris_classification.py 
b/sdks/python/apache_beam/examples/inference/xgboost_iris_classification.py
index 59ee7868ca0..963187fd210 100644
--- a/sdks/python/apache_beam/examples/inference/xgboost_iris_classification.py
+++ b/sdks/python/apache_beam/examples/inference/xgboost_iris_classification.py
@@ -73,6 +73,13 @@ def parse_known_args(argv):
       dest='model_state',
       required=True,
       help='Path to the state of the XGBoost model loaded for Inference.')
+  parser.add_argument(
+      '--large_model',
+      action='store_true',
+      dest='large_model',
+      default=False,
+      help='Set to true if your model is large enough to run into memory '
+      'pressure if you load multiple copies.')
   group = parser.add_mutually_exclusive_group(required=True)
   group.add_argument('--split', action='store_true', dest='split')
   group.add_argument('--no_split', action='store_false', dest='split')
@@ -125,7 +132,8 @@ def run(
   xgboost_model_handler = KeyedModelHandler(
       model_handler(
           model_class=xgboost.XGBClassifier,
-          model_state=known_args.model_state))
+          model_state=known_args.model_state,
+          large_model=known_args.large_model))
 
   input_data = load_sklearn_iris_test_data(
       data_type=input_data_type, split=known_args.split)
diff --git 
a/sdks/python/apache_beam/ml/inference/huggingface_inference_it_test.py 
b/sdks/python/apache_beam/ml/inference/huggingface_inference_it_test.py
index 0be359a8719..dd675d1935a 100644
--- a/sdks/python/apache_beam/ml/inference/huggingface_inference_it_test.py
+++ b/sdks/python/apache_beam/ml/inference/huggingface_inference_it_test.py
@@ -75,6 +75,41 @@ class HuggingFaceInference(unittest.TestCase):
       predicted_predicted_text = predictions_dict[text]
       self.assertEqual(actual_predicted_text, predicted_predicted_text)
 
+  def test_hf_language_modeling_large_model(self):
+    test_pipeline = TestPipeline(is_integration_test=True)
+    # Path to text file containing some sentences
+    file_of_sentences = 'gs://apache-beam-ml/datasets/custom/hf_sentences.txt'
+    output_file_dir = 'gs://apache-beam-ml/testing/predictions'
+    output_file = '/'.join([output_file_dir, str(uuid.uuid4()), 'result.txt'])
+
+    model_name = 'stevhliu/my_awesome_eli5_mlm_model'
+
+    extra_opts = {
+        'input': file_of_sentences,
+        'output': output_file,
+        'model_name': model_name,
+        'large_model': True,
+    }
+    huggingface_language_modeling.run(
+        test_pipeline.get_full_options_as_args(**extra_opts),
+        save_main_session=False)
+
+    self.assertEqual(FileSystems().exists(output_file), True)
+    predictions = pytorch_inference_it_test.process_outputs(
+        filepath=output_file)
+    actuals_file = 
'gs://apache-beam-ml/testing/expected_outputs/test_hf_run_inference_for_masked_lm_actuals.txt'
  # pylint: disable=line-too-long
+    actuals = pytorch_inference_it_test.process_outputs(filepath=actuals_file)
+
+    predictions_dict = {}
+    for prediction in predictions:
+      text, predicted_text = prediction.split(';')
+      predictions_dict[text] = predicted_text.strip().lower()
+
+    for actual in actuals:
+      text, actual_predicted_text = actual.split(';')
+      predicted_predicted_text = predictions_dict[text]
+      self.assertEqual(actual_predicted_text, predicted_predicted_text)
+
   def test_hf_pipeline(self):
     test_pipeline = TestPipeline(is_integration_test=True)
     # Path to text file containing some questions and context
diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference_it_test.py 
b/sdks/python/apache_beam/ml/inference/pytorch_inference_it_test.py
index 5e377720408..e00660bcbd9 100644
--- a/sdks/python/apache_beam/ml/inference/pytorch_inference_it_test.py
+++ b/sdks/python/apache_beam/ml/inference/pytorch_inference_it_test.py
@@ -161,6 +161,41 @@ class PyTorchInference(unittest.TestCase):
       predicted_predicted_text = predictions_dict[text]
       self.assertEqual(actual_predicted_text, predicted_predicted_text)
 
+  @pytest.mark.uses_pytorch
+  @pytest.mark.it_postcommit
+  def test_torch_run_inference_bert_for_masked_lm_large_model(self):
+    test_pipeline = TestPipeline(is_integration_test=True)
+    # Path to text file containing some sentences
+    file_of_sentences = 'gs://apache-beam-ml/datasets/custom/sentences.txt'  # 
pylint: disable=line-too-long
+    output_file_dir = 'gs://apache-beam-ml/testing/predictions'
+    output_file = '/'.join([output_file_dir, str(uuid.uuid4()), 'result.txt'])
+
+    model_state_dict_path = 
'gs://apache-beam-ml/models/huggingface.BertForMaskedLM.bert-base-uncased.pth'  
# pylint: disable=line-too-long
+    extra_opts = {
+        'input': file_of_sentences,
+        'output': output_file,
+        'model_state_dict_path': model_state_dict_path,
+        'large_model': True,
+    }
+    pytorch_language_modeling.run(
+        test_pipeline.get_full_options_as_args(**extra_opts),
+        save_main_session=False)
+
+    self.assertEqual(FileSystems().exists(output_file), True)
+    predictions = process_outputs(filepath=output_file)
+    actuals_file = 
'gs://apache-beam-ml/testing/expected_outputs/test_torch_run_inference_bert_for_masked_lm_actuals.txt'
  # pylint: disable=line-too-long
+    actuals = process_outputs(filepath=actuals_file)
+
+    predictions_dict = {}
+    for prediction in predictions:
+      text, predicted_text = prediction.split(';')
+      predictions_dict[text] = predicted_text
+
+    for actual in actuals:
+      text, actual_predicted_text = actual.split(';')
+      predicted_predicted_text = predictions_dict[text]
+      self.assertEqual(actual_predicted_text, predicted_predicted_text)
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.DEBUG)
diff --git a/sdks/python/apache_beam/ml/inference/sklearn_inference_it_test.py 
b/sdks/python/apache_beam/ml/inference/sklearn_inference_it_test.py
index 73bb9341f25..c5480234cda 100644
--- a/sdks/python/apache_beam/ml/inference/sklearn_inference_it_test.py
+++ b/sdks/python/apache_beam/ml/inference/sklearn_inference_it_test.py
@@ -85,6 +85,38 @@ class SklearnInference(unittest.TestCase):
       true_label, expected_prediction = expected_outputs[i].split(',')
       self.assertEqual(predictions_dict[true_label], expected_prediction)
 
+  def test_sklearn_mnist_classification_large_model(self):
+    test_pipeline = TestPipeline(is_integration_test=True)
+    input_file = 'gs://apache-beam-ml/testing/inputs/it_mnist_data.csv'
+    output_file_dir = 'gs://temp-storage-for-end-to-end-tests'
+    output_file = '/'.join([output_file_dir, str(uuid.uuid4()), 'result.txt'])
+    model_path = 'gs://apache-beam-ml/models/mnist_model_svm.pickle'
+    extra_opts = {
+        'input': input_file,
+        'output': output_file,
+        'model_path': model_path,
+        'large_model': True
+    }
+    sklearn_mnist_classification.run(
+        test_pipeline.get_full_options_as_args(**extra_opts),
+        save_main_session=False)
+    self.assertEqual(FileSystems().exists(output_file), True)
+
+    expected_output_filepath = 
'gs://apache-beam-ml/testing/expected_outputs/test_sklearn_mnist_classification_actuals.txt'
  # pylint: disable=line-too-long
+    expected_outputs = process_outputs(expected_output_filepath)
+
+    predicted_outputs = process_outputs(output_file)
+    self.assertEqual(len(expected_outputs), len(predicted_outputs))
+
+    predictions_dict = {}
+    for i in range(len(predicted_outputs)):
+      true_label, prediction = predicted_outputs[i].split(',')
+      predictions_dict[true_label] = prediction
+
+    for i in range(len(expected_outputs)):
+      true_label, expected_prediction = expected_outputs[i].split(',')
+      self.assertEqual(predictions_dict[true_label], expected_prediction)
+
   # TODO(https://github.com/apache/beam/issues/27151) use model with sklearn 
1.2
   @unittest.skipIf(sys.version_info >= (3, 11, 0), "Beam#27151")
   def test_sklearn_regression(self):
diff --git 
a/sdks/python/apache_beam/ml/inference/tensorflow_inference_it_test.py 
b/sdks/python/apache_beam/ml/inference/tensorflow_inference_it_test.py
index bdc0291dd1e..4786b7a0398 100644
--- a/sdks/python/apache_beam/ml/inference/tensorflow_inference_it_test.py
+++ b/sdks/python/apache_beam/ml/inference/tensorflow_inference_it_test.py
@@ -102,6 +102,37 @@ class TensorflowInference(unittest.TestCase):
       true_label, expected_prediction = expected_outputs[i].split(',')
       self.assertEqual(predictions_dict[true_label], expected_prediction)
 
+  def test_tf_mnist_classification_large_model(self):
+    test_pipeline = TestPipeline(is_integration_test=True)
+    input_file = 'gs://apache-beam-ml/testing/inputs/it_mnist_data.csv'
+    output_file_dir = 'gs://apache-beam-ml/testing/outputs'
+    output_file = '/'.join([output_file_dir, str(uuid.uuid4()), 'result.txt'])
+    model_path = 'gs://apache-beam-ml/models/tensorflow/mnist/'
+    extra_opts = {
+        'input': input_file,
+        'output': output_file,
+        'model_path': model_path,
+        'large_model': True,
+    }
+    tensorflow_mnist_classification.run(
+        test_pipeline.get_full_options_as_args(**extra_opts),
+        save_main_session=False)
+    self.assertEqual(FileSystems().exists(output_file), True)
+
+    expected_output_filepath = 
'gs://apache-beam-ml/testing/expected_outputs/test_sklearn_mnist_classification_actuals.txt'
  # pylint: disable=line-too-long
+    expected_outputs = process_outputs(expected_output_filepath)
+    predicted_outputs = process_outputs(output_file)
+    self.assertEqual(len(expected_outputs), len(predicted_outputs))
+
+    predictions_dict = {}
+    for i in range(len(predicted_outputs)):
+      true_label, prediction = predicted_outputs[i].split(',')
+      predictions_dict[true_label] = prediction
+
+    for i in range(len(expected_outputs)):
+      true_label, expected_prediction = expected_outputs[i].split(',')
+      self.assertEqual(predictions_dict[true_label], expected_prediction)
+
   def test_tf_imagenet_image_segmentation(self):
     test_pipeline = TestPipeline(is_integration_test=True)
     input_file = (
diff --git a/sdks/python/apache_beam/ml/inference/xgboost_inference_it_test.py 
b/sdks/python/apache_beam/ml/inference/xgboost_inference_it_test.py
index e458ccab4b0..3db62bcc6a9 100644
--- a/sdks/python/apache_beam/ml/inference/xgboost_inference_it_test.py
+++ b/sdks/python/apache_beam/ml/inference/xgboost_inference_it_test.py
@@ -113,6 +113,40 @@ class XGBoostInference(unittest.TestCase):
       true_label, expected_prediction = expected_output.split(',')
       self.assertEqual(predictions_dict[true_label], expected_prediction)
 
+  def test_iris_classification_numpy_single_batch_large_model(self):
+    test_pipeline = TestPipeline(is_integration_test=True)
+    input_type = 'numpy'
+    output_file_dir = '/tmp'
+    output_file = '/'.join(
+        [output_file_dir, str(uuid.uuid4()), 'numpy_single_batch.txt'])
+    model_state_path = 
'gs://apache-beam-ml/models/xgboost.iris_classifier.json'
+    extra_opts = {
+        'input_type': input_type,
+        'output': output_file,
+        'model_state': model_state_path,
+        'no_split': True,
+        'large_model': True,
+    }
+
+    xgboost_iris_classification.run(
+        test_pipeline.get_full_options_as_args(**extra_opts),
+        save_main_session=False)
+    self.assertEqual(FileSystems().exists(output_file), True)
+
+    expected_outputs = EXPECTED_OUTPUT_SINGLE_BATCHES
+
+    predicted_outputs = process_outputs(output_file)
+    self.assertEqual(len(expected_outputs), len(predicted_outputs))
+
+    predictions_dict = {}
+    for predicted_output in predicted_outputs:
+      true_label, prediction = predicted_output.split(',')
+      predictions_dict[true_label] = prediction
+
+    for expected_output in expected_outputs:
+      true_label, expected_prediction = expected_output.split(',')
+      self.assertEqual(predictions_dict[true_label], expected_prediction)
+
   def test_iris_classification_pandas_single_batch(self):
     test_pipeline = TestPipeline(is_integration_test=True)
     input_type = 'pandas'

Reply via email to