This is an automated email from the ASF dual-hosted git repository.

vterentev pushed a commit to branch oss-image-cpu
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 054f6db251381a423870c78ddd69a07e5d3b3807
Author: Vitaly Terentyev <[email protected]>
AuthorDate: Wed Dec 31 15:30:44 2025 +0400

    Refactoring
---
 .../beam_Inference_Python_Benchmarks_Dataflow.yml  |   6 +-
 ...ks_Dataflow_Pytorch_Image_Object_Detection.txt} |   2 +-
 .../inference/pytorch_image_object_detection.py    | 141 ++++++++++-----------
 ...ytorch_image_object_detection_requirements.txt} |   0
 ...> pytorch_image_object_detection_benchmarks.py} |   8 +-
 5 files changed, 74 insertions(+), 83 deletions(-)

diff --git a/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml 
b/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml
index e1da7670221..bc88bcc209a 100644
--- a/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml
+++ b/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml
@@ -92,7 +92,7 @@ jobs:
             ${{ github.workspace 
}}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Streaming_DistilBert_Base_Uncased.txt
             ${{ github.workspace 
}}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Batch_DistilBert_Base_Uncased.txt
             ${{ github.workspace 
}}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_VLLM_Gemma_Batch.txt
-            ${{ github.workspace 
}}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Detection.txt
+            ${{ github.workspace 
}}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Object_Detection.txt
       # The env variables are created and populated in the 
test-arguments-action as 
"<github.job>_test_arguments_<argument_file_paths_index>"
       - name: get current time
         run: echo "NOW_UTC=$(date '+%m%d%H%M%S' --utc)" >> $GITHUB_ENV
@@ -197,8 +197,8 @@ jobs:
         with:
           gradle-command: :sdks:python:apache_beam:testing:load_tests:run
           arguments: |
-            
-PloadTest.mainClass=apache_beam.testing.benchmarks.inference.pytorch_object_detection_benchmarks
 \
+            
-PloadTest.mainClass=apache_beam.testing.benchmarks.inference.pytorch_image_object_detection_benchmarks
 \
             -Prunner=DataflowRunner \
             -PpythonVersion=3.10 \
-            
-PloadTest.requirementsTxtFile=apache_beam/ml/inference/pytorch_object_detection_requirements.txt
 \
+            
-PloadTest.requirementsTxtFile=apache_beam/ml/inference/pytorch_image_object_detection_requirements.txt
 \
             '-PloadTest.args=${{ 
env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_9 }} --mode=batch 
--job_name=benchmark-tests-pytorch-object_detection-batch-${{env.NOW_UTC}} 
--output_table=apache-beam-testing.beam_run_inference.result_torch_inference_object_detection_batch'
 \
diff --git 
a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Detection.txt
 
b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Object_Detection.txt
similarity index 94%
rename from 
.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Detection.txt
rename to 
.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Object_Detection.txt
index 2cb905f1ed5..5b8b33b8427 100644
--- 
a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Detection.txt
+++ 
b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Object_Detection.txt
@@ -21,7 +21,7 @@
 --autoscaling_algorithm=NONE
 --staging_location=gs://temp-storage-for-perf-tests/loadtests
 --temp_location=gs://temp-storage-for-perf-tests/loadtests
---requirements_file=apache_beam/ml/inference/pytorch_object_detection_requirements.txt
+--requirements_file=apache_beam/ml/inference/pytorch_image_object_detection_requirements.txt
 --publish_to_big_query=true
 --metrics_dataset=beam_run_inference
 --metrics_table=result_torch_inference_object_detection_batch
diff --git 
a/sdks/python/apache_beam/examples/inference/pytorch_image_object_detection.py 
b/sdks/python/apache_beam/examples/inference/pytorch_image_object_detection.py
index e70c4b157f4..6ca9cdb1f93 100644
--- 
a/sdks/python/apache_beam/examples/inference/pytorch_image_object_detection.py
+++ 
b/sdks/python/apache_beam/examples/inference/pytorch_image_object_detection.py
@@ -52,9 +52,9 @@ from apache_beam.runners.runner import PipelineResult
 import torch
 import PIL.Image as PILImage
 
-
 # ============ Utility & Preprocessing ============
 
+
 def now_millis() -> int:
   return int(time.time() * 1000)
 
@@ -71,9 +71,9 @@ def load_image_from_uri(uri: str) -> bytes:
     return f.read()
 
 
-def decode_to_tensor(
-  image_bytes: bytes,
-  resize_shorter_side: Optional[int] = None) -> torch.Tensor:
+def decode_to_tens(
+    image_bytes: bytes,
+    resize_shorter_side: Optional[int] = None) -> torch.Tensor:
   """Decode bytes -> RGB PIL -> optional resize -> float tensor [0..1], CHW.
 
   Note: TorchVision detection models apply their own normalization internally.
@@ -105,6 +105,7 @@ def sha1_hex(s: str) -> str:
 
 # ============ DoFns ============
 
+
 class MakeKeyDoFn(beam.DoFn):
   """Produce (image_id, uri) where image_id is stable for dedup and keys."""
   def process(self, element: str):
@@ -123,19 +124,21 @@ class DecodePreprocessDoFn(beam.DoFn):
     start = now_millis()
     try:
       b = load_image_from_uri(uri)
-      tensor = decode_to_tensor(b, 
resize_shorter_side=self.resize_shorter_side)
+      tensor = decode_to_tens(b, resize_shorter_side=self.resize_shorter_side)
       preprocess_ms = now_millis() - start
-      yield image_id, {"tensor": tensor, "preprocess_ms": preprocess_ms, 
"uri": uri}
+      yield image_id, {
+          "tensor": tensor, "preprocess_ms": preprocess_ms, "uri": uri
+      }
     except Exception as e:
       logging.warning("Decode failed for %s (%s): %s", image_id, uri, e)
       return
 
 
 def _torchvision_detection_inference_fn(
-  model, batch: List[torch.Tensor], device: str) -> List[Dict[str, Any]]:
+    model, batch: List[torch.Tensor], device: str) -> List[Dict[str, Any]]:
   """Custom inference for TorchVision detection models.
 
-  TorchVision detection models expect: List[Tensor] (each tensor: CHW float 
[0..1]).
+  TorchVision detection models expect: List[Tensor] (each: CHW float [0..1]).
   """
   with torch.no_grad():
     inputs = []
@@ -152,10 +155,7 @@ def _torchvision_detection_inference_fn(
 class PostProcessDoFn(beam.DoFn):
   """PredictionResult -> dict row for BQ."""
   def __init__(
-    self,
-    model_name: str,
-    score_threshold: float,
-    max_detections: int):
+    self, model_name: str, score_threshold: float, max_detections: int):
     self.model_name = model_name
     self.score_threshold = score_threshold
     self.max_detections = max_detections
@@ -191,9 +191,9 @@ class PostProcessDoFn(beam.DoFn):
         continue
       box = boxes_list[i]  # [x1,y1,x2,y2]
       dets.append({
-        "label_id": int(labels_list[i]),
-        "score": score,
-        "box": [float(box[0]), float(box[1]), float(box[2]), float(box[3])],
+          "label_id": int(labels_list[i]),
+          "score": score,
+          "box": [float(box[0]), float(box[1]), float(box[2]), float(box[3])],
       })
       if len(dets) >= self.max_detections:
         break
@@ -214,8 +214,7 @@ class PostProcessDoFn(beam.DoFn):
 
     if not isinstance(inference_obj, dict):
       logging.warning(
-          "Unexpected inference type for %s: %s", image_id, type(inference_obj)
-      )
+          "Unexpected inf-ce type for %s: %s", image_id, type(inference_obj))
       yield {
           "image_id": image_id,
           "model_name": self.model_name,
@@ -238,32 +237,32 @@ class PostProcessDoFn(beam.DoFn):
 
 # ============ Args & Helpers ============
 
+
 def parse_known_args(argv):
   parser = argparse.ArgumentParser()
 
   # I/O & runtime
   parser.add_argument('--mode', default='batch', choices=['batch'])
   parser.add_argument(
-    '--output_table',
-    required=True,
-    help='BigQuery output table: dataset.table')
+      '--output_table',
+      required=True,
+      help='BigQuery output table: dataset.table')
   parser.add_argument(
-    '--publish_to_big_query', default='true', choices=['true', 'false'])
+      '--publish_to_big_query', default='true', choices=['true', 'false'])
   parser.add_argument(
-    '--input',
-    required=True,
-    help='GCS path to file with image URIs')
+      '--input', required=True, help='GCS path to file with image URIs')
 
   # Model & inference
   parser.add_argument(
-    '--pretrained_model_name',
-    default='fasterrcnn_resnet50_fpn',
-    help=('TorchVision detection model name '
+      '--pretrained_model_name',
+      default='fasterrcnn_resnet50_fpn',
+      help=(
+          'TorchVision detection model name '
           '(e.g., fasterrcnn_resnet50_fpn)'))
   parser.add_argument(
-    '--model_state_dict_path',
-    required=True,
-    help='GCS path to a state_dict .pth for the chosen model')
+      '--model_state_dict_path',
+      required=True,
+      help='GCS path to a state_dict .pth for the chosen model')
   parser.add_argument('--device', default='GPU', choices=['CPU', 'GPU'])
 
   # Batch sizing (no right-fitting)
@@ -311,8 +310,9 @@ def create_torchvision_detection_model(model_name: str):
 
 # ============ Main pipeline ============
 
+
 def run(
-  argv=None, save_main_session=True, test_pipeline=None) -> PipelineResult:
+    argv=None, save_main_session=True, test_pipeline=None) -> PipelineResult:
   known_args, pipeline_args = parse_known_args(argv)
 
   pipeline_options = PipelineOptions(pipeline_args)
@@ -328,30 +328,25 @@ def run(
   batch_size = int(known_args.inference_batch_size)
 
   model_handler = PytorchModelHandlerTensor(
-    model_class=lambda: create_torchvision_detection_model(
-        known_args.pretrained_model_name
-    ),
-    model_params={},
-    state_dict_path=known_args.model_state_dict_path,
-    device=device,
-    inference_batch_size=batch_size,
-    inference_fn=_torchvision_detection_inference_fn,
+      model_class=lambda: create_torchvision_detection_model(
+          known_args.pretrained_model_name
+      ),
+      model_params={},
+      state_dict_path=known_args.model_state_dict_path,
+      device=device,
+      inference_batch_size=batch_size,
+      inference_fn=_torchvision_detection_inference_fn,
   )
 
   pipeline = test_pipeline or beam.Pipeline(options=pipeline_options)
 
   pcoll = (
-    pipeline
-    | 'ReadURIsBatch' >> beam.Create(
-        list(read_gcs_file_lines(known_args.input))
-    )
-    | 'FilterEmptyBatch' >> beam.Filter(lambda s: s.strip())
-  )
+      pipeline
+      | 'ReadURIsBatch' >> beam.Create(
+          list(read_gcs_file_lines(known_args.input)))
+      | 'FilterEmptyBatch' >> beam.Filter(lambda s: s.strip()))
 
-  keyed = (
-    pcoll
-    | 'MakeKey' >> beam.ParDo(MakeKeyDoFn())
-  )
+  keyed = (pcoll | 'MakeKey' >> beam.ParDo(MakeKeyDoFn()))
 
   # Batch exactly-once behavior:
   # 1) Dedup by key within the run to ensure stable writes.
@@ -359,41 +354,37 @@ def run(
   keyed = keyed | 'DistinctByKey' >> beam.Distinct()
 
   preprocessed = (
-    keyed
-    | 'DecodePreprocess' >> beam.ParDo(
-    DecodePreprocessDoFn(resize_shorter_side=resize_shorter_side))
-  )
+      keyed
+      | 'DecodePreprocess' >> beam.ParDo(
+          DecodePreprocessDoFn(resize_shorter_side=resize_shorter_side)))
 
   to_infer = (
-    preprocessed
-    | 'ToKeyedTensor' >> beam.Map(lambda kv: (kv[0], kv[1]["tensor"]))
-  )
+      preprocessed
+      | 'ToKeyedTensor' >> beam.Map(lambda kv: (kv[0], kv[1]["tensor"])))
 
   predictions = (
-    to_infer
-    | 'RunInference' >> RunInference(KeyedModelHandler(model_handler))
-  )
+      to_infer
+      | 'RunInference' >> RunInference(KeyedModelHandler(model_handler)))
 
   results = (
-    predictions
-    | 'PostProcess' >> beam.ParDo(
-    PostProcessDoFn(
-      model_name=known_args.pretrained_model_name,
-      score_threshold=known_args.score_threshold,
-      max_detections=known_args.max_detections))
-  )
+      predictions
+      | 'PostProcess' >> beam.ParDo(
+          PostProcessDoFn(
+              model_name=known_args.pretrained_model_name,
+              score_threshold=known_args.score_threshold,
+              max_detections=known_args.max_detections)))
 
   if known_args.publish_to_big_query == 'true':
     _ = (
-      results
-      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
-      known_args.output_table,
-      schema=('image_id:STRING, model_name:STRING, '
-              'detections:STRING, num_detections:INT64, infer_ms:INT64'),
-      write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
-      create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
-      method=beam.io.WriteToBigQuery.Method.FILE_LOADS)
-    )
+        results
+        | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
+            known_args.output_table,
+            schema=(
+                'image_id:STRING, model_name:STRING, '
+                'detections:STRING, num_detections:INT64, infer_ms:INT64'),
+            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
+            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+            method=beam.io.WriteToBigQuery.Method.FILE_LOADS))
 
   result = pipeline.run()
   result.wait_until_finish(duration=1800000)  # 30 min
diff --git 
a/sdks/python/apache_beam/ml/inference/pytorch_object_detection_requirements.txt
 
b/sdks/python/apache_beam/ml/inference/pytorch_image_object_detection_requirements.txt
similarity index 100%
rename from 
sdks/python/apache_beam/ml/inference/pytorch_object_detection_requirements.txt
rename to 
sdks/python/apache_beam/ml/inference/pytorch_image_object_detection_requirements.txt
diff --git 
a/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_object_detection_benchmarks.py
 
b/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_image_object_detection_benchmarks.py
similarity index 88%
rename from 
sdks/python/apache_beam/testing/benchmarks/inference/pytorch_object_detection_benchmarks.py
rename to 
sdks/python/apache_beam/testing/benchmarks/inference/pytorch_image_object_detection_benchmarks.py
index b31112f68bf..3bfd5daec18 100644
--- 
a/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_object_detection_benchmarks.py
+++ 
b/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_image_object_detection_benchmarks.py
@@ -26,15 +26,15 @@ class 
PytorchImageObjectDetectionBenchmarkTest(DataflowCostBenchmark):
   def __init__(self):
     self.metrics_namespace = 'BeamML_PyTorch'
     super().__init__(
-      metrics_namespace=self.metrics_namespace,
-      pcollection='PostProcess.out0')
+        metrics_namespace=self.metrics_namespace,
+        pcollection='PostProcess.out0')
 
   def test(self):
     extra_opts = {}
     extra_opts['input'] = self.pipeline.get_option('input_file')
     self.result = pytorch_image_object_detection.run(
-      self.pipeline.get_full_options_as_args(**extra_opts),
-      test_pipeline=self.pipeline)
+        self.pipeline.get_full_options_as_args(**extra_opts),
+        test_pipeline=self.pipeline)
 
 
 if __name__ == '__main__':

Reply via email to