This is an automated email from the ASF dual-hosted git repository. damccorm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 632f7d0800a Add integration tests to exercise large model (#28121) 632f7d0800a is described below commit 632f7d0800aaa8fa7572547b8d0d4f63673936e3 Author: Danny McCormick <dannymccorm...@google.com> AuthorDate: Wed Aug 23 15:07:32 2023 -0400 Add integration tests to exercise large model (#28121) --- .../inference/huggingface_language_modeling.py | 10 ++++++- .../inference/pytorch_language_modeling.py | 10 ++++++- .../inference/sklearn_mnist_classification.py | 10 ++++++- .../inference/tensorflow_mnist_classification.py | 11 ++++++- .../inference/xgboost_iris_classification.py | 10 ++++++- .../ml/inference/huggingface_inference_it_test.py | 35 ++++++++++++++++++++++ .../ml/inference/pytorch_inference_it_test.py | 35 ++++++++++++++++++++++ .../ml/inference/sklearn_inference_it_test.py | 32 ++++++++++++++++++++ .../ml/inference/tensorflow_inference_it_test.py | 31 +++++++++++++++++++ .../ml/inference/xgboost_inference_it_test.py | 34 +++++++++++++++++++++ 10 files changed, 213 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/examples/inference/huggingface_language_modeling.py b/sdks/python/apache_beam/examples/inference/huggingface_language_modeling.py index f6cb3de72b7..dbdb8c0651a 100644 --- a/sdks/python/apache_beam/examples/inference/huggingface_language_modeling.py +++ b/sdks/python/apache_beam/examples/inference/huggingface_language_modeling.py @@ -114,6 +114,13 @@ def parse_known_args(argv): dest='model_class', default=AutoModelForMaskedLM, help="Name of the model from Hugging Face") + parser.add_argument( + '--large_model', + action='store_true', + dest='large_model', + default=False, + help='Set to true if your model is large enough to run into memory ' + 'pressure if you load multiple copies.') return parser.parse_known_args(argv) @@ -139,7 +146,8 @@ def run( model_uri=known_args.model_name, model_class=known_args.model_class, framework='pt', - max_batch_size=1) + max_batch_size=1, + large_model=known_args.large_model) if not known_args.input: text = ( pipeline | 'CreateSentences' >> beam.Create([ diff --git a/sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py b/sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py index b5fabbb1f1e..9de10e73e11 100644 --- a/sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py +++ b/sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py @@ -118,6 +118,13 @@ def parse_known_args(argv): dest='model_state_dict_path', required=True, help="Path to the model's state_dict.") + parser.add_argument( + '--large_model', + action='store_true', + dest='large_model', + default=False, + help='Set to true if your model is large enough to run into memory ' + 'pressure if you load multiple copies.') return parser.parse_known_args(argv) @@ -166,7 +173,8 @@ def run( model_handler = PytorchNoBatchModelHandler( state_dict_path=known_args.model_state_dict_path, model_class=model_class, - model_params=model_params) + model_params=model_params, + large_model=known_args.large_model) pipeline = test_pipeline if not test_pipeline: diff --git a/sdks/python/apache_beam/examples/inference/sklearn_mnist_classification.py b/sdks/python/apache_beam/examples/inference/sklearn_mnist_classification.py index 6f8ea929bbb..5392cdf7dda 100644 --- a/sdks/python/apache_beam/examples/inference/sklearn_mnist_classification.py +++ b/sdks/python/apache_beam/examples/inference/sklearn_mnist_classification.py @@ -77,6 +77,13 @@ def parse_known_args(argv): dest='model_path', required=True, help='Path to load the Sklearn model for Inference.') + parser.add_argument( + '--large_model', + action='store_true', + dest='large_model', + default=False, + help='Set to true if your model is large enough to run into memory ' + 'pressure if you load multiple copies.') return parser.parse_known_args(argv) @@ -103,7 +110,8 @@ def run( model_loader = KeyedModelHandler( SklearnModelHandlerNumpy( model_file_type=ModelFileType.PICKLE, - model_uri=known_args.model_path)) + model_uri=known_args.model_path, + large_model=known_args.large_model)) pipeline = test_pipeline if not test_pipeline: diff --git a/sdks/python/apache_beam/examples/inference/tensorflow_mnist_classification.py b/sdks/python/apache_beam/examples/inference/tensorflow_mnist_classification.py index 174d21b26af..6cf746e77cd 100644 --- a/sdks/python/apache_beam/examples/inference/tensorflow_mnist_classification.py +++ b/sdks/python/apache_beam/examples/inference/tensorflow_mnist_classification.py @@ -70,6 +70,13 @@ def parse_known_args(argv): dest='model_path', required=True, help='Path to load the Tensorflow model for Inference.') + parser.add_argument( + '--large_model', + action='store_true', + dest='large_model', + default=False, + help='Set to true if your model is large enough to run into memory ' + 'pressure if you load multiple copies.') return parser.parse_known_args(argv) @@ -89,7 +96,9 @@ def run( # Therefore, we use KeyedModelHandler wrapper over TFModelHandlerNumpy. model_loader = KeyedModelHandler( TFModelHandlerNumpy( - model_uri=known_args.model_path, model_type=ModelType.SAVED_MODEL)) + model_uri=known_args.model_path, + model_type=ModelType.SAVED_MODEL, + large_model=known_args.large_model)) pipeline = test_pipeline if not test_pipeline: diff --git a/sdks/python/apache_beam/examples/inference/xgboost_iris_classification.py b/sdks/python/apache_beam/examples/inference/xgboost_iris_classification.py index 59ee7868ca0..963187fd210 100644 --- a/sdks/python/apache_beam/examples/inference/xgboost_iris_classification.py +++ b/sdks/python/apache_beam/examples/inference/xgboost_iris_classification.py @@ -73,6 +73,13 @@ def parse_known_args(argv): dest='model_state', required=True, help='Path to the state of the XGBoost model loaded for Inference.') + parser.add_argument( + '--large_model', + action='store_true', + dest='large_model', + default=False, + help='Set to true if your model is large enough to run into memory ' + 'pressure if you load multiple copies.') group = parser.add_mutually_exclusive_group(required=True) group.add_argument('--split', action='store_true', dest='split') group.add_argument('--no_split', action='store_false', dest='split') @@ -125,7 +132,8 @@ def run( xgboost_model_handler = KeyedModelHandler( model_handler( model_class=xgboost.XGBClassifier, - model_state=known_args.model_state)) + model_state=known_args.model_state, + large_model=known_args.large_model)) input_data = load_sklearn_iris_test_data( data_type=input_data_type, split=known_args.split) diff --git a/sdks/python/apache_beam/ml/inference/huggingface_inference_it_test.py b/sdks/python/apache_beam/ml/inference/huggingface_inference_it_test.py index 0be359a8719..dd675d1935a 100644 --- a/sdks/python/apache_beam/ml/inference/huggingface_inference_it_test.py +++ b/sdks/python/apache_beam/ml/inference/huggingface_inference_it_test.py @@ -75,6 +75,41 @@ class HuggingFaceInference(unittest.TestCase): predicted_predicted_text = predictions_dict[text] self.assertEqual(actual_predicted_text, predicted_predicted_text) + def test_hf_language_modeling_large_model(self): + test_pipeline = TestPipeline(is_integration_test=True) + # Path to text file containing some sentences + file_of_sentences = 'gs://apache-beam-ml/datasets/custom/hf_sentences.txt' + output_file_dir = 'gs://apache-beam-ml/testing/predictions' + output_file = '/'.join([output_file_dir, str(uuid.uuid4()), 'result.txt']) + + model_name = 'stevhliu/my_awesome_eli5_mlm_model' + + extra_opts = { + 'input': file_of_sentences, + 'output': output_file, + 'model_name': model_name, + 'large_model': True, + } + huggingface_language_modeling.run( + test_pipeline.get_full_options_as_args(**extra_opts), + save_main_session=False) + + self.assertEqual(FileSystems().exists(output_file), True) + predictions = pytorch_inference_it_test.process_outputs( + filepath=output_file) + actuals_file = 'gs://apache-beam-ml/testing/expected_outputs/test_hf_run_inference_for_masked_lm_actuals.txt' # pylint: disable=line-too-long + actuals = pytorch_inference_it_test.process_outputs(filepath=actuals_file) + + predictions_dict = {} + for prediction in predictions: + text, predicted_text = prediction.split(';') + predictions_dict[text] = predicted_text.strip().lower() + + for actual in actuals: + text, actual_predicted_text = actual.split(';') + predicted_predicted_text = predictions_dict[text] + self.assertEqual(actual_predicted_text, predicted_predicted_text) + def test_hf_pipeline(self): test_pipeline = TestPipeline(is_integration_test=True) # Path to text file containing some questions and context diff --git a/sdks/python/apache_beam/ml/inference/pytorch_inference_it_test.py b/sdks/python/apache_beam/ml/inference/pytorch_inference_it_test.py index 5e377720408..e00660bcbd9 100644 --- a/sdks/python/apache_beam/ml/inference/pytorch_inference_it_test.py +++ b/sdks/python/apache_beam/ml/inference/pytorch_inference_it_test.py @@ -161,6 +161,41 @@ class PyTorchInference(unittest.TestCase): predicted_predicted_text = predictions_dict[text] self.assertEqual(actual_predicted_text, predicted_predicted_text) + @pytest.mark.uses_pytorch + @pytest.mark.it_postcommit + def test_torch_run_inference_bert_for_masked_lm_large_model(self): + test_pipeline = TestPipeline(is_integration_test=True) + # Path to text file containing some sentences + file_of_sentences = 'gs://apache-beam-ml/datasets/custom/sentences.txt' # pylint: disable=line-too-long + output_file_dir = 'gs://apache-beam-ml/testing/predictions' + output_file = '/'.join([output_file_dir, str(uuid.uuid4()), 'result.txt']) + + model_state_dict_path = 'gs://apache-beam-ml/models/huggingface.BertForMaskedLM.bert-base-uncased.pth' # pylint: disable=line-too-long + extra_opts = { + 'input': file_of_sentences, + 'output': output_file, + 'model_state_dict_path': model_state_dict_path, + 'large_model': True, + } + pytorch_language_modeling.run( + test_pipeline.get_full_options_as_args(**extra_opts), + save_main_session=False) + + self.assertEqual(FileSystems().exists(output_file), True) + predictions = process_outputs(filepath=output_file) + actuals_file = 'gs://apache-beam-ml/testing/expected_outputs/test_torch_run_inference_bert_for_masked_lm_actuals.txt' # pylint: disable=line-too-long + actuals = process_outputs(filepath=actuals_file) + + predictions_dict = {} + for prediction in predictions: + text, predicted_text = prediction.split(';') + predictions_dict[text] = predicted_text + + for actual in actuals: + text, actual_predicted_text = actual.split(';') + predicted_predicted_text = predictions_dict[text] + self.assertEqual(actual_predicted_text, predicted_predicted_text) + if __name__ == '__main__': logging.getLogger().setLevel(logging.DEBUG) diff --git a/sdks/python/apache_beam/ml/inference/sklearn_inference_it_test.py b/sdks/python/apache_beam/ml/inference/sklearn_inference_it_test.py index 73bb9341f25..c5480234cda 100644 --- a/sdks/python/apache_beam/ml/inference/sklearn_inference_it_test.py +++ b/sdks/python/apache_beam/ml/inference/sklearn_inference_it_test.py @@ -85,6 +85,38 @@ class SklearnInference(unittest.TestCase): true_label, expected_prediction = expected_outputs[i].split(',') self.assertEqual(predictions_dict[true_label], expected_prediction) + def test_sklearn_mnist_classification_large_model(self): + test_pipeline = TestPipeline(is_integration_test=True) + input_file = 'gs://apache-beam-ml/testing/inputs/it_mnist_data.csv' + output_file_dir = 'gs://temp-storage-for-end-to-end-tests' + output_file = '/'.join([output_file_dir, str(uuid.uuid4()), 'result.txt']) + model_path = 'gs://apache-beam-ml/models/mnist_model_svm.pickle' + extra_opts = { + 'input': input_file, + 'output': output_file, + 'model_path': model_path, + 'large_model': True + } + sklearn_mnist_classification.run( + test_pipeline.get_full_options_as_args(**extra_opts), + save_main_session=False) + self.assertEqual(FileSystems().exists(output_file), True) + + expected_output_filepath = 'gs://apache-beam-ml/testing/expected_outputs/test_sklearn_mnist_classification_actuals.txt' # pylint: disable=line-too-long + expected_outputs = process_outputs(expected_output_filepath) + + predicted_outputs = process_outputs(output_file) + self.assertEqual(len(expected_outputs), len(predicted_outputs)) + + predictions_dict = {} + for i in range(len(predicted_outputs)): + true_label, prediction = predicted_outputs[i].split(',') + predictions_dict[true_label] = prediction + + for i in range(len(expected_outputs)): + true_label, expected_prediction = expected_outputs[i].split(',') + self.assertEqual(predictions_dict[true_label], expected_prediction) + # TODO(https://github.com/apache/beam/issues/27151) use model with sklearn 1.2 @unittest.skipIf(sys.version_info >= (3, 11, 0), "Beam#27151") def test_sklearn_regression(self): diff --git a/sdks/python/apache_beam/ml/inference/tensorflow_inference_it_test.py b/sdks/python/apache_beam/ml/inference/tensorflow_inference_it_test.py index bdc0291dd1e..4786b7a0398 100644 --- a/sdks/python/apache_beam/ml/inference/tensorflow_inference_it_test.py +++ b/sdks/python/apache_beam/ml/inference/tensorflow_inference_it_test.py @@ -102,6 +102,37 @@ class TensorflowInference(unittest.TestCase): true_label, expected_prediction = expected_outputs[i].split(',') self.assertEqual(predictions_dict[true_label], expected_prediction) + def test_tf_mnist_classification_large_model(self): + test_pipeline = TestPipeline(is_integration_test=True) + input_file = 'gs://apache-beam-ml/testing/inputs/it_mnist_data.csv' + output_file_dir = 'gs://apache-beam-ml/testing/outputs' + output_file = '/'.join([output_file_dir, str(uuid.uuid4()), 'result.txt']) + model_path = 'gs://apache-beam-ml/models/tensorflow/mnist/' + extra_opts = { + 'input': input_file, + 'output': output_file, + 'model_path': model_path, + 'large_model': True, + } + tensorflow_mnist_classification.run( + test_pipeline.get_full_options_as_args(**extra_opts), + save_main_session=False) + self.assertEqual(FileSystems().exists(output_file), True) + + expected_output_filepath = 'gs://apache-beam-ml/testing/expected_outputs/test_sklearn_mnist_classification_actuals.txt' # pylint: disable=line-too-long + expected_outputs = process_outputs(expected_output_filepath) + predicted_outputs = process_outputs(output_file) + self.assertEqual(len(expected_outputs), len(predicted_outputs)) + + predictions_dict = {} + for i in range(len(predicted_outputs)): + true_label, prediction = predicted_outputs[i].split(',') + predictions_dict[true_label] = prediction + + for i in range(len(expected_outputs)): + true_label, expected_prediction = expected_outputs[i].split(',') + self.assertEqual(predictions_dict[true_label], expected_prediction) + def test_tf_imagenet_image_segmentation(self): test_pipeline = TestPipeline(is_integration_test=True) input_file = ( diff --git a/sdks/python/apache_beam/ml/inference/xgboost_inference_it_test.py b/sdks/python/apache_beam/ml/inference/xgboost_inference_it_test.py index e458ccab4b0..3db62bcc6a9 100644 --- a/sdks/python/apache_beam/ml/inference/xgboost_inference_it_test.py +++ b/sdks/python/apache_beam/ml/inference/xgboost_inference_it_test.py @@ -113,6 +113,40 @@ class XGBoostInference(unittest.TestCase): true_label, expected_prediction = expected_output.split(',') self.assertEqual(predictions_dict[true_label], expected_prediction) + def test_iris_classification_numpy_single_batch_large_model(self): + test_pipeline = TestPipeline(is_integration_test=True) + input_type = 'numpy' + output_file_dir = '/tmp' + output_file = '/'.join( + [output_file_dir, str(uuid.uuid4()), 'numpy_single_batch.txt']) + model_state_path = 'gs://apache-beam-ml/models/xgboost.iris_classifier.json' + extra_opts = { + 'input_type': input_type, + 'output': output_file, + 'model_state': model_state_path, + 'no_split': True, + 'large_model': True, + } + + xgboost_iris_classification.run( + test_pipeline.get_full_options_as_args(**extra_opts), + save_main_session=False) + self.assertEqual(FileSystems().exists(output_file), True) + + expected_outputs = EXPECTED_OUTPUT_SINGLE_BATCHES + + predicted_outputs = process_outputs(output_file) + self.assertEqual(len(expected_outputs), len(predicted_outputs)) + + predictions_dict = {} + for predicted_output in predicted_outputs: + true_label, prediction = predicted_output.split(',') + predictions_dict[true_label] = prediction + + for expected_output in expected_outputs: + true_label, expected_prediction = expected_output.split(',') + self.assertEqual(predictions_dict[true_label], expected_prediction) + def test_iris_classification_pandas_single_batch(self): test_pipeline = TestPipeline(is_integration_test=True) input_type = 'pandas'