This is an automated email from the ASF dual-hosted git repository.

Amar3tto pushed a commit to branch oss-image-detection
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/oss-image-detection by this 
push:
     new 5f31795342a Refactoring
5f31795342a is described below

commit 5f31795342a1e27bdd9267e5c8a9985c9659b9ca
Author: Vitaly Terentyev <[email protected]>
AuthorDate: Thu May 21 14:35:35 2026 +0400

    Refactoring
---
 ...beam_Inference_Python_Benchmarks_Dataflow_2.yml |  8 +--
 .../examples/inference/pytorch_image_captioning.py | 78 +++++++++-------------
 .../inference/pytorch_image_object_detection.py    | 40 +++++------
 .../inference/pytorch_imagenet_rightfit.py         | 30 ++-------
 4 files changed, 58 insertions(+), 98 deletions(-)

diff --git a/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow_2.yml 
b/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow_2.yml
index 7a1d7c0ebc1..d28f1e2e75d 100644
--- a/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow_2.yml
+++ b/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow_2.yml
@@ -188,7 +188,7 @@ jobs:
             -Prunner=DataflowRunner \
             -PpythonVersion=3.10 \
             
-PloadTest.requirementsTxtFile=apache_beam/ml/inference/pytorch_rightfit_requirements.txt
 \
-            '-PloadTest.args=${{ 
env.beam_Inference_Python_Benchmarks_Dataflow_2_test_arguments_1 }} 
--device=CPU --experiments=enable_streaming_rightfitting --enable_dedup=false 
--mode=streaming 
--job_name=benchmark-tests-pytorch-imagenet-rightfit-streaming-${{env.NOW_UTC}} 
--metrics_table=torch_inference_imagenet_stream_rightfit_cpu 
--output_table=apache-beam-testing.beam_run_inference.result_torch_inference_imagenet_stream_rightfit_cpu'
 \
+            '-PloadTest.args=${{ 
env.beam_Inference_Python_Benchmarks_Dataflow_2_test_arguments_1 }} 
--device=CPU --experiments=enable_streaming_rightfitting --mode=streaming 
--job_name=benchmark-tests-pytorch-imagenet-rightfit-streaming-${{env.NOW_UTC}} 
--metrics_table=torch_inference_imagenet_stream_rightfit_cpu 
--output_table=apache-beam-testing.beam_run_inference.result_torch_inference_imagenet_stream_rightfit_cpu'
 \
       - name: run PyTorch Image Classification EfficientNet-B0 Streaming 
(Right-fitting Exactly-once) CPU
         uses: ./.github/actions/gradle-command-self-hosted-action
         timeout-minutes: 180
@@ -199,7 +199,7 @@ jobs:
             -Prunner=DataflowRunner \
             -PpythonVersion=3.10 \
             
-PloadTest.requirementsTxtFile=apache_beam/ml/inference/pytorch_rightfit_requirements.txt
 \
-            '-PloadTest.args=${{ 
env.beam_Inference_Python_Benchmarks_Dataflow_2_test_arguments_1 }} 
--device=CPU --experiments=enable_streaming_rightfitting --enable_dedup=true 
--mode=streaming 
--job_name=benchmark-tests-pytorch-imagenet-rightfit-streaming-${{env.NOW_UTC}} 
--metrics_table=torch_inference_imagenet_stream_rightfit_once_cpu 
--output_table=apache-beam-testing.beam_run_inference.result_torch_inference_imagenet_stream_rightfit_once_cpu'
 \
+            '-PloadTest.args=${{ 
env.beam_Inference_Python_Benchmarks_Dataflow_2_test_arguments_1 }} 
--device=CPU --experiments=enable_streaming_rightfitting --mode=streaming 
--job_name=benchmark-tests-pytorch-imagenet-rightfit-streaming-${{env.NOW_UTC}} 
--metrics_table=torch_inference_imagenet_stream_rightfit_once_cpu 
--output_table=apache-beam-testing.beam_run_inference.result_torch_inference_imagenet_stream_rightfit_once_cpu'
 \
       - name: run PyTorch Image Classification EfficientNet-B0 Streaming 
(Right-fitting) GPU
         uses: ./.github/actions/gradle-command-self-hosted-action
         timeout-minutes: 180
@@ -210,7 +210,7 @@ jobs:
             -Prunner=DataflowRunner \
             -PpythonVersion=3.10 \
             
-PloadTest.requirementsTxtFile=apache_beam/ml/inference/pytorch_rightfit_requirements.txt
 \
-            '-PloadTest.args=${{ 
env.beam_Inference_Python_Benchmarks_Dataflow_2_test_arguments_1 }} 
--device=GPU 
--experiments=enable_streaming_rightfitting,worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver:5xx
 --enable_dedup=false --mode=streaming 
--job_name=benchmark-tests-pytorch-imagenet-rightfit-streaming-${{env.NOW_UTC}} 
--metrics_table=torch_inference_imagenet_stream_rightfit_gpu 
--output_table=apache-beam-testing.beam_run_inference.result_torch_inference_imag
 [...]
+            '-PloadTest.args=${{ 
env.beam_Inference_Python_Benchmarks_Dataflow_2_test_arguments_1 }} 
--device=GPU 
--experiments=enable_streaming_rightfitting,worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver:5xx
 --mode=streaming 
--job_name=benchmark-tests-pytorch-imagenet-rightfit-streaming-${{env.NOW_UTC}} 
--metrics_table=torch_inference_imagenet_stream_rightfit_gpu 
--output_table=apache-beam-testing.beam_run_inference.result_torch_inference_imagenet_stream_rightfit_gpu'
 \
       - name: run PyTorch Image Classification EfficientNet-B0 Streaming 
(Right-fitting Exactly-once) GPU
         uses: ./.github/actions/gradle-command-self-hosted-action
         timeout-minutes: 180
@@ -221,4 +221,4 @@ jobs:
             -Prunner=DataflowRunner \
             -PpythonVersion=3.10 \
             
-PloadTest.requirementsTxtFile=apache_beam/ml/inference/pytorch_rightfit_requirements.txt
 \
-            '-PloadTest.args=${{ 
env.beam_Inference_Python_Benchmarks_Dataflow_2_test_arguments_1 }} 
--device=GPU 
--experiments=enable_streaming_rightfitting,worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver:5xx
 --enable_dedup=true --mode=streaming 
--job_name=benchmark-tests-pytorch-imagenet-rightfit-streaming-${{env.NOW_UTC}} 
--metrics_table=torch_inference_imagenet_stream_rightfit_once_gpu 
--output_table=apache-beam-testing.beam_run_inference.result_torch_inference_ 
[...]
+            '-PloadTest.args=${{ 
env.beam_Inference_Python_Benchmarks_Dataflow_2_test_arguments_1 }} 
--device=GPU 
--experiments=enable_streaming_rightfitting,worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver:5xx
 --mode=streaming 
--job_name=benchmark-tests-pytorch-imagenet-rightfit-streaming-${{env.NOW_UTC}} 
--metrics_table=torch_inference_imagenet_stream_rightfit_once_gpu 
--output_table=apache-beam-testing.beam_run_inference.result_torch_inference_imagenet_stream_righ
 [...]
diff --git 
a/sdks/python/apache_beam/examples/inference/pytorch_image_captioning.py 
b/sdks/python/apache_beam/examples/inference/pytorch_image_captioning.py
index e138110ef6a..2b25f05654a 100644
--- a/sdks/python/apache_beam/examples/inference/pytorch_image_captioning.py
+++ b/sdks/python/apache_beam/examples/inference/pytorch_image_captioning.py
@@ -59,16 +59,6 @@ def now_millis() -> int:
   return int(time.time() * 1000)
 
 
-def load_image_from_uri(uri: str) -> bytes:
-  with FileSystems.open(uri) as f:
-    return f.read()
-
-
-def sha1_hex(s: str) -> str:
-  import hashlib
-  return hashlib.sha1(s.encode("utf-8")).hexdigest()
-
-
 def decode_pil(image_bytes: bytes) -> PILImage.Image:
   with PILImage.open(io.BytesIO(image_bytes)) as img:
     img = img.convert("RGB")
@@ -80,25 +70,40 @@ def decode_pil(image_bytes: bytes) -> PILImage.Image:
 
 
 class MakeKeyDoFn(beam.DoFn):
-  """Produce (image_id, uri) where image_id is stable for dedup and keys."""
+  """Produce (uri, uri) so the URI is used as the stable key."""
   def process(self, element: str):
     uri = element
-    image_id = sha1_hex(uri)
-    yield image_id, uri
+    yield uri, uri
 
 
 class ReadImageBytesDoFn(beam.DoFn):
-  """Turn (image_id, uri) -> (image_id, dict(image_bytes, uri))."""
+  """Turn (uri, uri) -> (uri, dict(image_bytes))."""
   def process(self, kv: Tuple[str, str]):
-    image_id, uri = kv
+    uri, _ = kv
     try:
-      b = load_image_from_uri(uri)
-      yield image_id, {"image_bytes": b, "uri": uri}
+      with FileSystems.open(uri) as f:
+        image_bytes = f.read()
+      yield uri, {"image_bytes": image_bytes}
     except Exception as e:
-      logging.warning("Failed to read image %s (%s): %s", image_id, uri, e)
+      logging.warning("Failed to read image %s: %s", uri, e)
       return
 
 
+class DecodeImageDoFn(beam.DoFn):
+  """Turn (uri, dict(image_bytes)) -> (uri, dict(image))."""
+  def process(self, kv: Tuple[str, Dict[str, Any]]):
+    uri, value = kv
+    image_bytes = value["image_bytes"]
+
+    try:
+      image = decode_pil(image_bytes)
+    except Exception as e:
+      logging.warning("Failed to decode image %s: %s", uri, e)
+      image = PILImage.new("RGB", (224, 224), color=(0, 0, 0))
+
+    yield uri, {"image": image}
+
+
 class PostProcessDoFn(beam.DoFn):
   """Final PredictionResult -> row for BigQuery."""
   def __init__(self, blip_name: str, clip_name: str):
@@ -106,7 +111,7 @@ class PostProcessDoFn(beam.DoFn):
     self.clip_name = clip_name
 
   def process(self, kv: Tuple[str, PredictionResult]):
-    image_id, pred = kv
+    uri, pred = kv
     if hasattr(pred, "inference"):
       inf = pred.inference or {}
     else:
@@ -122,7 +127,7 @@ class PostProcessDoFn(beam.DoFn):
     total_ms = inf.get("total_ms", None)
 
     yield {
-        "image_id": image_id,
+        "image_id": uri,
         "blip_model": self.blip_name,
         "clip_model": self.clip_name,
         "best_caption": best_caption,
@@ -172,19 +177,7 @@ class BlipCaptionModelHandler(ModelHandler):
     model.eval()
     start = now_millis()
 
-    images = []
-    uris = []
-    bytes_list = []
-    for x in batch:
-      b = x["image_bytes"]
-      bytes_list.append(b)
-      uris.append(x.get("uri", ""))
-      try:
-        images.append(decode_pil(b))
-      except Exception as e:
-        # fallback: a blank image (so pipeline keeps going)
-        logging.warning("Failed to decode image %s: %s", uris[-1], e)
-        images.append(PILImage.new("RGB", (224, 224), color=(0, 0, 0)))
+    images = [x["image"] for x in batch]
 
     # Processor makes pixel_values
     inputs = processor(images=images, return_tensors="pt")
@@ -217,10 +210,9 @@ class BlipCaptionModelHandler(ModelHandler):
     results = []
     for i in range(len(batch)):
       results.append({
-          "image_bytes": bytes_list[i],
-          "uri": uris[i],
-          "candidates": candidates_per_image[i],
-          "blip_ms": blip_ms,
+        "image": images[i],
+        "candidates": candidates_per_image[i],
+        "blip_ms": blip_ms,
       })
     return results
 
@@ -266,17 +258,11 @@ class ClipRankModelHandler(ModelHandler):
     blip_ms_list: List[Optional[int]] = []
 
     for x in batch:
-      image_bytes = x["image_bytes"]
+      img = x["image"]
       candidates = [str(c) for c in (x.get("candidates", []) or [])]
       candidates_list.append(candidates)
       blip_ms_list.append(x.get("blip_ms", None))
 
-      try:
-        img = decode_pil(image_bytes)
-      except Exception as e:
-        logging.warning("Failed to decode image for CLIP ranking: %s", e)
-        img = PILImage.new("RGB", (224, 224), color=(0, 0, 0))
-
       start_i = len(texts)
       for c in candidates:
         images.append(img)
@@ -592,8 +578,8 @@ def run(
             allowed_lateness=0))
 
   keyed = (pcoll | 'MakeKey' >> beam.ParDo(MakeKeyDoFn()))
-
-  images = (keyed | 'ReadImageBytes' >> beam.ParDo(ReadImageBytesDoFn()))
+  image_bytes = (keyed | 'ReadImageBytes' >> beam.ParDo(ReadImageBytesDoFn()))
+  images = (image_bytes | 'DecodeImage' >> beam.ParDo(DecodeImageDoFn()))
 
   # Stage 1: BLIP candidate generation
   blip_out = (
diff --git 
a/sdks/python/apache_beam/examples/inference/pytorch_image_object_detection.py 
b/sdks/python/apache_beam/examples/inference/pytorch_image_object_detection.py
index 65e489d1881..850c66126e3 100644
--- 
a/sdks/python/apache_beam/examples/inference/pytorch_image_object_detection.py
+++ 
b/sdks/python/apache_beam/examples/inference/pytorch_image_object_detection.py
@@ -63,11 +63,6 @@ def now_millis() -> int:
   return int(time.time() * 1000)
 
 
-def load_image_from_uri(uri: str) -> bytes:
-  with FileSystems.open(uri) as f:
-    return f.read()
-
-
 def decode_to_tens(
     image_bytes: bytes,
     resize_shorter_side: Optional[int] = None) -> torch.Tensor:
@@ -95,39 +90,36 @@ def decode_to_tens(
     return torch.from_numpy(arr)
 
 
-def sha1_hex(s: str) -> str:
-  import hashlib
-  return hashlib.sha1(s.encode("utf-8")).hexdigest()
-
-
 # ============ DoFns ============
 
 
 class MakeKeyDoFn(beam.DoFn):
-  """Produce (image_id, uri) where image_id is stable for dedup and keys."""
+  """Produce (uri, uri) where the URI is used as the stable key."""
   def process(self, element: str):
     uri = element
-    image_id = sha1_hex(uri)
-    yield image_id, uri
+    yield uri, uri
 
 
 class DecodePreprocessDoFn(beam.DoFn):
-  """Turn (image_id, uri) -> (image_id, tensor)."""
+  """Turn (uri, uri) -> (uri, tensor)."""
   def __init__(self, resize_shorter_side: Optional[int] = None):
     self.resize_shorter_side = resize_shorter_side
 
   def process(self, kv: Tuple[str, str]):
-    image_id, uri = kv
+    uri, _ = kv
     start = now_millis()
     try:
-      b = load_image_from_uri(uri)
-      tensor = decode_to_tens(b, resize_shorter_side=self.resize_shorter_side)
+      with FileSystems.open(uri) as f:
+        image_bytes = f.read()
+      tensor = decode_to_tens(
+        image_bytes,
+        resize_shorter_side=self.resize_shorter_side)
       preprocess_ms = now_millis() - start
-      yield image_id, {
-          "tensor": tensor, "preprocess_ms": preprocess_ms, "uri": uri
+      yield uri, {
+        "tensor": tensor, "preprocess_ms": preprocess_ms
       }
     except Exception as e:
-      logging.warning("Decode failed for %s (%s): %s", image_id, uri, e)
+      logging.warning("Decode failed for %s: %s", uri, e)
       return
 
 
@@ -213,7 +205,7 @@ class PostProcessDoFn(beam.DoFn):
     }
 
   def process(self, kv: Tuple[str, PredictionResult]):
-    image_id, pred = kv
+    image_uri, pred = kv
 
     # pred can be PredictionResult OR raw torchvision dict.
     if hasattr(pred, "inference"):
@@ -226,9 +218,9 @@ class PostProcessDoFn(beam.DoFn):
 
     if not isinstance(inference_obj, dict):
       logging.warning(
-          "Unexpected inf-ce type for %s: %s", image_id, type(inference_obj))
+          "Unexpected inf-ce type for %s: %s", image_uri, type(inference_obj))
       yield {
-          "image_id": image_id,
+          "image_id": image_uri,
           "model_name": self.model_name,
           "detections": json.dumps([]),
           "num_detections": 0,
@@ -239,7 +231,7 @@ class PostProcessDoFn(beam.DoFn):
     extracted = self._extract_detection(inference_obj)
 
     yield {
-        "image_id": image_id,
+        "image_id": image_uri,
         "model_name": self.model_name,
         "detections": json.dumps(extracted["detections"]),
         "num_detections": int(extracted["num_detections"]),
diff --git 
a/sdks/python/apache_beam/examples/inference/pytorch_imagenet_rightfit.py 
b/sdks/python/apache_beam/examples/inference/pytorch_imagenet_rightfit.py
index 39a02315648..d13923c2670 100644
--- a/sdks/python/apache_beam/examples/inference/pytorch_imagenet_rightfit.py
+++ b/sdks/python/apache_beam/examples/inference/pytorch_imagenet_rightfit.py
@@ -17,9 +17,8 @@
 PyTorch EfficientNet-B0 model optimized for T4 GPUs.
 It reads image URIs from Pub/Sub, decodes and preprocesses them in parallel,
 and runs inference with adaptive batch sizing for optimal GPU utilization.
-The pipeline ensures exactly-once semantics via stateful deduplication and
-idempotent BigQuery writes, allowing stable and reproducible performance
-measurements under continuous load.
+The pipeline targets stable and reproducible performance measurements under
+continuous load.
 Resources like Pub/Sub topic/subscription cleanup is handled programmatically.
 """
 
@@ -36,7 +35,6 @@ import torch
 import torch.nn.functional as F
 
 import apache_beam as beam
-from apache_beam.coders import BytesCoder
 from apache_beam.io.filesystems import FileSystems
 from apache_beam.ml.inference.base import KeyedModelHandler
 from apache_beam.ml.inference.base import PredictionResult
@@ -46,7 +44,6 @@ from apache_beam.options.pipeline_options import 
PipelineOptions
 from apache_beam.options.pipeline_options import SetupOptions
 from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.runners.runner import PipelineResult
-from apache_beam.transforms import userstate
 from apache_beam.transforms import window
 
 from google.api_core.exceptions import NotFound
@@ -116,16 +113,6 @@ class MakeKeyDoFn(beam.DoFn):
       yield image_id, uri
 
 
-class DedupDoFn(beam.DoFn):
-  seen = userstate.ReadModifyWriteStateSpec('seen', BytesCoder())
-
-  def process(self, element, seen=beam.DoFn.StateParam(seen)):
-    if seen.read() == b'1':
-      return
-    seen.write(b'1')
-    yield element
-
-
 class DecodePreprocessDoFn(beam.DoFn):
   """Turn (image_id, bytes|uri) -> (image_id, torch.Tensor)"""
   def __init__(
@@ -266,10 +253,6 @@ def parse_known_args(argv):
   parser.add_argument('--window_sec', type=int, default=60)
   parser.add_argument('--trigger_proc_time_sec', type=int, default=30)
 
-  # Dedup
-  parser.add_argument(
-      '--enable_dedup', default='false', choices=['true', 'false'])
-
   known_args, pipeline_args = parser.parse_known_args(argv)
   return known_args, pipeline_args
 
@@ -478,9 +461,6 @@ def run(
       pcoll
       | 'MakeKey' >> beam.ParDo(MakeKeyDoFn(input_mode=known_args.input_mode)))
 
-  if known_args.enable_dedup == 'true':
-    keyed = keyed | 'Dedup' >> beam.ParDo(DedupDoFn())
-
   preprocessed = (
       keyed
       | 'DecodePreprocess' >> beam.ParDo(
@@ -494,8 +474,10 @@ def run(
       'ToKeyedTensor' >> beam.Map(lambda kv: (kv[0], kv[1]["tensor"].float())))
 
   predictions = (
-      to_infer
-      | 'RunInference' >> RunInference(KeyedModelHandler(model_handler)))
+    to_infer
+    | 'RunInference' >> RunInference(
+        KeyedModelHandler(model_handler)).with_resource_hints(
+            accelerator="type:nvidia-tesla-t4;count:1;install-nvidia-driver"))
 
   results = (
       predictions

Reply via email to