This is an automated email from the ASF dual-hosted git repository. vterentev pushed a commit to branch oss-image-cpu in repository https://gitbox.apache.org/repos/asf/beam.git
commit 054f6db251381a423870c78ddd69a07e5d3b3807 Author: Vitaly Terentyev <[email protected]> AuthorDate: Wed Dec 31 15:30:44 2025 +0400 Refactoring --- .../beam_Inference_Python_Benchmarks_Dataflow.yml | 6 +- ...ks_Dataflow_Pytorch_Image_Object_Detection.txt} | 2 +- .../inference/pytorch_image_object_detection.py | 141 ++++++++++----------- ...ytorch_image_object_detection_requirements.txt} | 0 ...> pytorch_image_object_detection_benchmarks.py} | 8 +- 5 files changed, 74 insertions(+), 83 deletions(-) diff --git a/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml b/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml index e1da7670221..bc88bcc209a 100644 --- a/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml +++ b/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml @@ -92,7 +92,7 @@ jobs: ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Streaming_DistilBert_Base_Uncased.txt ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Batch_DistilBert_Base_Uncased.txt ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_VLLM_Gemma_Batch.txt - ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Detection.txt + ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Object_Detection.txt # The env variables are created and populated in the test-arguments-action as "<github.job>_test_arguments_<argument_file_paths_index>" - name: get current time run: echo "NOW_UTC=$(date '+%m%d%H%M%S' --utc)" >> $GITHUB_ENV @@ -197,8 +197,8 @@ jobs: with: gradle-command: :sdks:python:apache_beam:testing:load_tests:run arguments: | - -PloadTest.mainClass=apache_beam.testing.benchmarks.inference.pytorch_object_detection_benchmarks \ + -PloadTest.mainClass=apache_beam.testing.benchmarks.inference.pytorch_image_object_detection_benchmarks \ -Prunner=DataflowRunner \ -PpythonVersion=3.10 \ - -PloadTest.requirementsTxtFile=apache_beam/ml/inference/pytorch_object_detection_requirements.txt \ + -PloadTest.requirementsTxtFile=apache_beam/ml/inference/pytorch_image_object_detection_requirements.txt \ '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_9 }} --mode=batch --job_name=benchmark-tests-pytorch-object_detection-batch-${{env.NOW_UTC}} --output_table=apache-beam-testing.beam_run_inference.result_torch_inference_object_detection_batch' \ diff --git a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Detection.txt b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Object_Detection.txt similarity index 94% rename from .github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Detection.txt rename to .github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Object_Detection.txt index 2cb905f1ed5..5b8b33b8427 100644 --- a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Detection.txt +++ b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Object_Detection.txt @@ -21,7 +21,7 @@ --autoscaling_algorithm=NONE --staging_location=gs://temp-storage-for-perf-tests/loadtests --temp_location=gs://temp-storage-for-perf-tests/loadtests ---requirements_file=apache_beam/ml/inference/pytorch_object_detection_requirements.txt +--requirements_file=apache_beam/ml/inference/pytorch_image_object_detection_requirements.txt --publish_to_big_query=true --metrics_dataset=beam_run_inference --metrics_table=result_torch_inference_object_detection_batch diff --git a/sdks/python/apache_beam/examples/inference/pytorch_image_object_detection.py b/sdks/python/apache_beam/examples/inference/pytorch_image_object_detection.py index e70c4b157f4..6ca9cdb1f93 100644 --- a/sdks/python/apache_beam/examples/inference/pytorch_image_object_detection.py +++ b/sdks/python/apache_beam/examples/inference/pytorch_image_object_detection.py @@ -52,9 +52,9 @@ from apache_beam.runners.runner import PipelineResult import torch import PIL.Image as PILImage - # ============ Utility & Preprocessing ============ + def now_millis() -> int: return int(time.time() * 1000) @@ -71,9 +71,9 @@ def load_image_from_uri(uri: str) -> bytes: return f.read() -def decode_to_tensor( - image_bytes: bytes, - resize_shorter_side: Optional[int] = None) -> torch.Tensor: +def decode_to_tens( + image_bytes: bytes, + resize_shorter_side: Optional[int] = None) -> torch.Tensor: """Decode bytes -> RGB PIL -> optional resize -> float tensor [0..1], CHW. Note: TorchVision detection models apply their own normalization internally. @@ -105,6 +105,7 @@ def sha1_hex(s: str) -> str: # ============ DoFns ============ + class MakeKeyDoFn(beam.DoFn): """Produce (image_id, uri) where image_id is stable for dedup and keys.""" def process(self, element: str): @@ -123,19 +124,21 @@ class DecodePreprocessDoFn(beam.DoFn): start = now_millis() try: b = load_image_from_uri(uri) - tensor = decode_to_tensor(b, resize_shorter_side=self.resize_shorter_side) + tensor = decode_to_tens(b, resize_shorter_side=self.resize_shorter_side) preprocess_ms = now_millis() - start - yield image_id, {"tensor": tensor, "preprocess_ms": preprocess_ms, "uri": uri} + yield image_id, { + "tensor": tensor, "preprocess_ms": preprocess_ms, "uri": uri + } except Exception as e: logging.warning("Decode failed for %s (%s): %s", image_id, uri, e) return def _torchvision_detection_inference_fn( - model, batch: List[torch.Tensor], device: str) -> List[Dict[str, Any]]: + model, batch: List[torch.Tensor], device: str) -> List[Dict[str, Any]]: """Custom inference for TorchVision detection models. - TorchVision detection models expect: List[Tensor] (each tensor: CHW float [0..1]). + TorchVision detection models expect: List[Tensor] (each: CHW float [0..1]). """ with torch.no_grad(): inputs = [] @@ -152,10 +155,7 @@ def _torchvision_detection_inference_fn( class PostProcessDoFn(beam.DoFn): """PredictionResult -> dict row for BQ.""" def __init__( - self, - model_name: str, - score_threshold: float, - max_detections: int): + self, model_name: str, score_threshold: float, max_detections: int): self.model_name = model_name self.score_threshold = score_threshold self.max_detections = max_detections @@ -191,9 +191,9 @@ class PostProcessDoFn(beam.DoFn): continue box = boxes_list[i] # [x1,y1,x2,y2] dets.append({ - "label_id": int(labels_list[i]), - "score": score, - "box": [float(box[0]), float(box[1]), float(box[2]), float(box[3])], + "label_id": int(labels_list[i]), + "score": score, + "box": [float(box[0]), float(box[1]), float(box[2]), float(box[3])], }) if len(dets) >= self.max_detections: break @@ -214,8 +214,7 @@ class PostProcessDoFn(beam.DoFn): if not isinstance(inference_obj, dict): logging.warning( - "Unexpected inference type for %s: %s", image_id, type(inference_obj) - ) + "Unexpected inf-ce type for %s: %s", image_id, type(inference_obj)) yield { "image_id": image_id, "model_name": self.model_name, @@ -238,32 +237,32 @@ class PostProcessDoFn(beam.DoFn): # ============ Args & Helpers ============ + def parse_known_args(argv): parser = argparse.ArgumentParser() # I/O & runtime parser.add_argument('--mode', default='batch', choices=['batch']) parser.add_argument( - '--output_table', - required=True, - help='BigQuery output table: dataset.table') + '--output_table', + required=True, + help='BigQuery output table: dataset.table') parser.add_argument( - '--publish_to_big_query', default='true', choices=['true', 'false']) + '--publish_to_big_query', default='true', choices=['true', 'false']) parser.add_argument( - '--input', - required=True, - help='GCS path to file with image URIs') + '--input', required=True, help='GCS path to file with image URIs') # Model & inference parser.add_argument( - '--pretrained_model_name', - default='fasterrcnn_resnet50_fpn', - help=('TorchVision detection model name ' + '--pretrained_model_name', + default='fasterrcnn_resnet50_fpn', + help=( + 'TorchVision detection model name ' '(e.g., fasterrcnn_resnet50_fpn)')) parser.add_argument( - '--model_state_dict_path', - required=True, - help='GCS path to a state_dict .pth for the chosen model') + '--model_state_dict_path', + required=True, + help='GCS path to a state_dict .pth for the chosen model') parser.add_argument('--device', default='GPU', choices=['CPU', 'GPU']) # Batch sizing (no right-fitting) @@ -311,8 +310,9 @@ def create_torchvision_detection_model(model_name: str): # ============ Main pipeline ============ + def run( - argv=None, save_main_session=True, test_pipeline=None) -> PipelineResult: + argv=None, save_main_session=True, test_pipeline=None) -> PipelineResult: known_args, pipeline_args = parse_known_args(argv) pipeline_options = PipelineOptions(pipeline_args) @@ -328,30 +328,25 @@ def run( batch_size = int(known_args.inference_batch_size) model_handler = PytorchModelHandlerTensor( - model_class=lambda: create_torchvision_detection_model( - known_args.pretrained_model_name - ), - model_params={}, - state_dict_path=known_args.model_state_dict_path, - device=device, - inference_batch_size=batch_size, - inference_fn=_torchvision_detection_inference_fn, + model_class=lambda: create_torchvision_detection_model( + known_args.pretrained_model_name + ), + model_params={}, + state_dict_path=known_args.model_state_dict_path, + device=device, + inference_batch_size=batch_size, + inference_fn=_torchvision_detection_inference_fn, ) pipeline = test_pipeline or beam.Pipeline(options=pipeline_options) pcoll = ( - pipeline - | 'ReadURIsBatch' >> beam.Create( - list(read_gcs_file_lines(known_args.input)) - ) - | 'FilterEmptyBatch' >> beam.Filter(lambda s: s.strip()) - ) + pipeline + | 'ReadURIsBatch' >> beam.Create( + list(read_gcs_file_lines(known_args.input))) + | 'FilterEmptyBatch' >> beam.Filter(lambda s: s.strip())) - keyed = ( - pcoll - | 'MakeKey' >> beam.ParDo(MakeKeyDoFn()) - ) + keyed = (pcoll | 'MakeKey' >> beam.ParDo(MakeKeyDoFn())) # Batch exactly-once behavior: # 1) Dedup by key within the run to ensure stable writes. @@ -359,41 +354,37 @@ def run( keyed = keyed | 'DistinctByKey' >> beam.Distinct() preprocessed = ( - keyed - | 'DecodePreprocess' >> beam.ParDo( - DecodePreprocessDoFn(resize_shorter_side=resize_shorter_side)) - ) + keyed + | 'DecodePreprocess' >> beam.ParDo( + DecodePreprocessDoFn(resize_shorter_side=resize_shorter_side))) to_infer = ( - preprocessed - | 'ToKeyedTensor' >> beam.Map(lambda kv: (kv[0], kv[1]["tensor"])) - ) + preprocessed + | 'ToKeyedTensor' >> beam.Map(lambda kv: (kv[0], kv[1]["tensor"]))) predictions = ( - to_infer - | 'RunInference' >> RunInference(KeyedModelHandler(model_handler)) - ) + to_infer + | 'RunInference' >> RunInference(KeyedModelHandler(model_handler))) results = ( - predictions - | 'PostProcess' >> beam.ParDo( - PostProcessDoFn( - model_name=known_args.pretrained_model_name, - score_threshold=known_args.score_threshold, - max_detections=known_args.max_detections)) - ) + predictions + | 'PostProcess' >> beam.ParDo( + PostProcessDoFn( + model_name=known_args.pretrained_model_name, + score_threshold=known_args.score_threshold, + max_detections=known_args.max_detections))) if known_args.publish_to_big_query == 'true': _ = ( - results - | 'WriteToBigQuery' >> beam.io.WriteToBigQuery( - known_args.output_table, - schema=('image_id:STRING, model_name:STRING, ' - 'detections:STRING, num_detections:INT64, infer_ms:INT64'), - write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, - create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, - method=beam.io.WriteToBigQuery.Method.FILE_LOADS) - ) + results + | 'WriteToBigQuery' >> beam.io.WriteToBigQuery( + known_args.output_table, + schema=( + 'image_id:STRING, model_name:STRING, ' + 'detections:STRING, num_detections:INT64, infer_ms:INT64'), + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + method=beam.io.WriteToBigQuery.Method.FILE_LOADS)) result = pipeline.run() result.wait_until_finish(duration=1800000) # 30 min diff --git a/sdks/python/apache_beam/ml/inference/pytorch_object_detection_requirements.txt b/sdks/python/apache_beam/ml/inference/pytorch_image_object_detection_requirements.txt similarity index 100% rename from sdks/python/apache_beam/ml/inference/pytorch_object_detection_requirements.txt rename to sdks/python/apache_beam/ml/inference/pytorch_image_object_detection_requirements.txt diff --git a/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_object_detection_benchmarks.py b/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_image_object_detection_benchmarks.py similarity index 88% rename from sdks/python/apache_beam/testing/benchmarks/inference/pytorch_object_detection_benchmarks.py rename to sdks/python/apache_beam/testing/benchmarks/inference/pytorch_image_object_detection_benchmarks.py index b31112f68bf..3bfd5daec18 100644 --- a/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_object_detection_benchmarks.py +++ b/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_image_object_detection_benchmarks.py @@ -26,15 +26,15 @@ class PytorchImageObjectDetectionBenchmarkTest(DataflowCostBenchmark): def __init__(self): self.metrics_namespace = 'BeamML_PyTorch' super().__init__( - metrics_namespace=self.metrics_namespace, - pcollection='PostProcess.out0') + metrics_namespace=self.metrics_namespace, + pcollection='PostProcess.out0') def test(self): extra_opts = {} extra_opts['input'] = self.pipeline.get_option('input_file') self.result = pytorch_image_object_detection.run( - self.pipeline.get_full_options_as_args(**extra_opts), - test_pipeline=self.pipeline) + self.pipeline.get_full_options_as_args(**extra_opts), + test_pipeline=self.pipeline) if __name__ == '__main__':
