This is an automated email from the ASF dual-hosted git repository.

vterentev pushed a commit to branch oss-image-cpu
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 6ae05f341fd9e6d80276ec8b9a81f66ee2aa627e
Author: Vitaly Terentyev <[email protected]>
AuthorDate: Wed Dec 31 15:59:25 2025 +0400

    Add PyTorch Image Captioning BLIP + CLIP Batch
---
 .../beam_Inference_Python_Benchmarks_Dataflow.yml  |  16 +-
 ...s_Dataflow_Pytorch_Image_Object_Captioning.txt} |  22 +-
 ...rks_Dataflow_Pytorch_Image_Object_Detection.txt |   4 +-
 .test-infra/tools/refresh_looker_metrics.py        |   1 +
 .../examples/inference/pytorch_image_captioning.py | 463 +++++++++++++++++++++
 ...rch_image_captioning_blip_clip_requirements.txt |  28 ++
 .../pytorch_image_captioning_benchmarks.py         |  42 ++
 website/www/site/content/en/performance/_index.md  |   1 +
 .../performance/pytorchimagecaptioning/_index.md   |  42 ++
 website/www/site/data/performance.yaml             |  16 +
 10 files changed, 620 insertions(+), 15 deletions(-)

diff --git a/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml 
b/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml
index bc88bcc209a..c673feed720 100644
--- a/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml
+++ b/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml
@@ -93,6 +93,7 @@ jobs:
             ${{ github.workspace 
}}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Batch_DistilBert_Base_Uncased.txt
             ${{ github.workspace 
}}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_VLLM_Gemma_Batch.txt
             ${{ github.workspace 
}}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Object_Detection.txt
+            ${{ github.workspace 
}}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Object_Captioning.txt
       # The env variables are created and populated in the 
test-arguments-action as 
"<github.job>_test_arguments_<argument_file_paths_index>"
       - name: get current time
         run: echo "NOW_UTC=$(date '+%m%d%H%M%S' --utc)" >> $GITHUB_ENV
@@ -190,7 +191,7 @@ jobs:
             -Prunner=DataflowRunner \
             -PpythonVersion=3.10 \
             
-PloadTest.requirementsTxtFile=apache_beam/ml/inference/torch_tests_requirements.txt
 \
-            '-PloadTest.args=${{ 
env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_5 }} 
--job_name=benchmark-tests-pytorch-imagenet-python-gpu-${{env.NOW_UTC}} 
--output=gs://temp-storage-for-end-to-end-tests/torch/result_resnet152_gpu-${{env.NOW_UTC}}.txt'
+            '-PloadTest.args=${{ 
env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_5 }} 
--job_name=benchmark-tests-pytorch-imagenet-python-gpu-${{env.NOW_UTC}} 
--output=gs://temp-storage-for-end-to-end-tests/torch/result_resnet152_gpu-${{env.NOW_UTC}}.txt'
 \
       - name: run PyTorch Image Object Detection Faster R-CNN ResNet-50 Batch
         uses: ./.github/actions/gradle-command-self-hosted-action
         timeout-minutes: 180
@@ -201,4 +202,15 @@ jobs:
             -Prunner=DataflowRunner \
             -PpythonVersion=3.10 \
             
-PloadTest.requirementsTxtFile=apache_beam/ml/inference/pytorch_image_object_detection_requirements.txt
 \
-            '-PloadTest.args=${{ 
env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_9 }} --mode=batch 
--job_name=benchmark-tests-pytorch-object_detection-batch-${{env.NOW_UTC}} 
--output_table=apache-beam-testing.beam_run_inference.result_torch_inference_object_detection_batch'
 \
+            '-PloadTest.args=${{ 
env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_9 }} --mode=batch 
--job_name=benchmark-tests-pytorch-image-object_detection-batch-${{env.NOW_UTC}}
 
--output_table=apache-beam-testing.beam_run_inference.result_torch_inference_image_object_detection_batch'
 \
+      - name: run PyTorch Image Captioning BLIP + CLIP Batch
+        uses: ./.github/actions/gradle-command-self-hosted-action
+        timeout-minutes: 180
+        with:
+          gradle-command: :sdks:python:apache_beam:testing:load_tests:run
+          arguments: |
+            
-PloadTest.mainClass=apache_beam.testing.benchmarks.inference.pytorch_image_captioning_benchmarks
 \
+            -Prunner=DataflowRunner \
+            -PpythonVersion=3.10 \
+            
-PloadTest.requirementsTxtFile=apache_beam/ml/inference/pytorch_image_captioning_requirements.txt
 \
+            '-PloadTest.args=${{ 
env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_10 }} --mode=batch 
--job_name=benchmark-tests-pytorch-image-captioning-batch-${{env.NOW_UTC}} 
--output_table=apache-beam-testing.beam_run_inference.result_torch_inference_image_captioning_batch'
diff --git 
a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Object_Detection.txt
 
b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Object_Captioning.txt
similarity index 67%
copy from 
.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Object_Detection.txt
copy to 
.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Object_Captioning.txt
index 5b8b33b8427..d37e91abbc2 100644
--- 
a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Object_Detection.txt
+++ 
b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Object_Captioning.txt
@@ -1,5 +1,3 @@
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
 #  distributed with this work for additional information
 #  regarding copyright ownership.  The ASF licenses this file
 #  to you under the Apache License, Version 2.0 (the
@@ -21,21 +19,23 @@
 --autoscaling_algorithm=NONE
 --staging_location=gs://temp-storage-for-perf-tests/loadtests
 --temp_location=gs://temp-storage-for-perf-tests/loadtests
---requirements_file=apache_beam/ml/inference/pytorch_image_object_detection_requirements.txt
+--requirements_file=apache_beam/ml/inference/pytorch_image_captioning_requirements.txt
 --publish_to_big_query=true
 --metrics_dataset=beam_run_inference
---metrics_table=result_torch_inference_object_detection_batch
+--metrics_table=result_torch_inference_image_captioning_batch
 --input_options={}
---influx_measurement=result_torch_inference_object_detection_batch
---pretrained_model_name=fasterrcnn_resnet50_fpn
+--influx_measurement=result_torch_inference_image_captioning_batch
 --device=GPU
 --mode=batch
---inference_batch_size=8
---resize_shorter_side=800
---score_threshold=0.5
---max_detections=50
 --input=gs://apache-beam-ml/testing/inputs/openimage_50k_benchmark.txt
---model_state_dict_path=gs://apache-beam-ml/models/torchvision.detection.fasterrcnn_resnet50_fpn.pth
+--blip_model_name=Salesforce/blip-image-captioning-base
+--blip_batch_size=4
+--num_captions=5
+--max_new_tokens=30
+--num_beams=5
+--clip_model_name=openai/clip-vit-base-patch32
+--clip_batch_size=8
+--clip_score_normalize=false
 --runner=DataflowRunner
 --experiments=use_runner_v2
 --worker_accelerator=type=nvidia-tesla-t4,count=1,install-nvidia-driver=true
diff --git 
a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Object_Detection.txt
 
b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Object_Detection.txt
index 5b8b33b8427..0bd0778224e 100644
--- 
a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Object_Detection.txt
+++ 
b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Object_Detection.txt
@@ -24,9 +24,9 @@
 
--requirements_file=apache_beam/ml/inference/pytorch_image_object_detection_requirements.txt
 --publish_to_big_query=true
 --metrics_dataset=beam_run_inference
---metrics_table=result_torch_inference_object_detection_batch
+--metrics_table=result_torch_inference_image_object_detection_batch
 --input_options={}
---influx_measurement=result_torch_inference_object_detection_batch
+--influx_measurement=result_torch_inference_image_object_detection_batch
 --pretrained_model_name=fasterrcnn_resnet50_fpn
 --device=GPU
 --mode=batch
diff --git a/.test-infra/tools/refresh_looker_metrics.py 
b/.test-infra/tools/refresh_looker_metrics.py
index 9b8296c56d3..5daac3aaf31 100644
--- a/.test-infra/tools/refresh_looker_metrics.py
+++ b/.test-infra/tools/refresh_looker_metrics.py
@@ -44,6 +44,7 @@ LOOKS_TO_DOWNLOAD = [
     ("85", ["268", "269", "270", "271", "272"]),  # PyTorch Sentiment Batch 
DistilBERT base uncased
     ("86", ["284", "285", "286", "287", "288"]),  # VLLM Batch Gemma
     #TODO: PyTorch Image Object Detection Faster R-CNN ResNet-50 Batch
+    #TODO: PyTorch Image Captioning BLIP + CLIP Batch
 ]
 
 
diff --git 
a/sdks/python/apache_beam/examples/inference/pytorch_image_captioning.py 
b/sdks/python/apache_beam/examples/inference/pytorch_image_captioning.py
new file mode 100644
index 00000000000..04089b8b1e8
--- /dev/null
+++ b/sdks/python/apache_beam/examples/inference/pytorch_image_captioning.py
@@ -0,0 +1,463 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""This batch pipeline performs image captioning using a multi-model approach:
+BLIP generates candidate captions, CLIP ranks them by image-text similarity.
+
+The pipeline reads image URIs from a GCS input file, decodes images, runs BLIP
+caption generation in batches on GPU, then runs CLIP ranking in batches on GPU.
+Results are written to BigQuery using FILE_LOADS for stable batch semantics.
+
+Exactly-once semantics for batch runs are approximated via a stable image_id
+(sha1(uri)) + Distinct() before writing and FILE_LOADS output method.
+"""
+
+import argparse
+import io
+import json
+import logging
+import time
+from typing import Any
+from typing import Dict
+from typing import Iterable
+from typing import List
+from typing import Tuple
+
+import apache_beam as beam
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import ModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+from apache_beam.runners.runner import PipelineResult
+
+import torch
+import PIL.Image as PILImage
+
+
+# ============ Utility ============
+
+
+def now_millis() -> int:
+  return int(time.time() * 1000)
+
+
+def read_gcs_file_lines(gcs_path: str) -> Iterable[str]:
+  """Reads text lines from a GCS file."""
+  with FileSystems.open(gcs_path) as f:
+    for line in f.read().decode("utf-8").splitlines():
+      yield line.strip()
+
+
+def load_image_from_uri(uri: str) -> bytes:
+  with FileSystems.open(uri) as f:
+    return f.read()
+
+
+def sha1_hex(s: str) -> str:
+  import hashlib
+  return hashlib.sha1(s.encode("utf-8")).hexdigest()
+
+
+def decode_pil(image_bytes: bytes) -> PILImage.Image:
+  with PILImage.open(io.BytesIO(image_bytes)) as img:
+    return img.convert("RGB")
+
+
+# ============ DoFns ============
+
+
+class MakeKeyDoFn(beam.DoFn):
+  """Produce (image_id, uri) where image_id is stable for dedup and keys."""
+  def process(self, element: str):
+    uri = element
+    image_id = sha1_hex(uri)
+    yield image_id, uri
+
+
+class ReadImageBytesDoFn(beam.DoFn):
+  """Turn (image_id, uri) -> (image_id, dict(image_bytes, uri))."""
+  def process(self, kv: Tuple[str, str]):
+    image_id, uri = kv
+    try:
+      b = load_image_from_uri(uri)
+      yield image_id, {"image_bytes": b, "uri": uri}
+    except Exception as e:
+      logging.warning("Failed to read image %s (%s): %s", image_id, uri, e)
+      return
+
+
+class PostProcessDoFn(beam.DoFn):
+  """Final PredictionResult -> row for BigQuery."""
+  def __init__(self, blip_name: str, clip_name: str):
+    self.blip_name = blip_name
+    self.clip_name = clip_name
+
+  def process(self, kv: Tuple[str, PredictionResult]):
+    image_id, pred = kv
+    inf = pred.inference or {}
+    # Expected inference fields from CLIP handler:
+    # best_caption, best_score, candidates, scores, blip_ms, clip_ms, total_ms
+    best_caption = inf.get("best_caption", "")
+    best_score = inf.get("best_score", None)
+    candidates = inf.get("candidates", [])
+    scores = inf.get("scores", [])
+    blip_ms = inf.get("blip_ms", None)
+    clip_ms = inf.get("clip_ms", None)
+    total_ms = inf.get("total_ms", None)
+
+    yield {
+        "image_id": image_id,
+        "blip_model": self.blip_name,
+        "clip_model": self.clip_name,
+        "best_caption": best_caption,
+        "best_score": float(best_score) if best_score is not None else None,
+        "candidates": json.dumps(candidates),
+        "scores": json.dumps(scores),
+        "blip_ms": int(blip_ms) if blip_ms is not None else None,
+        "clip_ms": int(clip_ms) if clip_ms is not None else None,
+        "total_ms": int(total_ms) if total_ms is not None else None,
+        "infer_ms": now_millis(),
+    }
+
+
+# ============ Model Handlers ============
+
+
+class BlipCaptionModelHandler(ModelHandler):
+  def __init__(
+    self,
+    model_name: str,
+    device: str,
+    batch_size: int,
+    num_captions: int,
+    max_new_tokens: int,
+    num_beams: int):
+    self.model_name = model_name
+    self.device = device
+    self.batch_size = batch_size
+    self.num_captions = num_captions
+    self.max_new_tokens = max_new_tokens
+    self.num_beams = num_beams
+
+    self._model = None
+    self._processor = None
+
+  def load_model(self):
+    from transformers import BlipForConditionalGeneration, BlipProcessor
+    self._processor = BlipProcessor.from_pretrained(self.model_name)
+    self._model = BlipForConditionalGeneration.from_pretrained(self.model_name)
+    self._model.eval()
+    self._model.to(self.device)
+    return self._model
+
+  def batch_elements_kwargs(self):
+    return {"max_batch_size": self.batch_size}
+
+  def run_inference(
+      self, batch: List[Dict[str, Any]], model, inference_args=None):
+    if self._model is None:
+      self.load_model()
+
+    start = now_millis()
+
+    images = []
+    uris = []
+    bytes_list = []
+    for x in batch:
+      b = x["image_bytes"]
+      bytes_list.append(b)
+      uris.append(x.get("uri", ""))
+      try:
+        images.append(decode_pil(b))
+      except Exception:
+        # fallback: a blank image (so pipeline keeps going)
+        images.append(PILImage.new("RGB", (224, 224), color=(0, 0, 0)))
+
+    # Processor makes pixel_values
+    inputs = self._processor(images=images, return_tensors="pt")
+    pixel_values = inputs["pixel_values"].to(self.device)
+
+    # Generate captions
+    # We use num_return_sequences to generate multiple candidates per image.
+    # Note: this will produce (B * num_captions) sequences.
+    with torch.no_grad():
+      generated_ids = self._model.generate(
+          pixel_values=pixel_values,
+          max_new_tokens=self.max_new_tokens,
+          num_beams=max(self.num_beams, self.num_captions),
+          num_return_sequences=self.num_captions,
+          do_sample=False,
+      )
+
+    captions_all = self._processor.batch_decode(
+        generated_ids, skip_special_tokens=True)
+
+    # Group candidates per image
+    candidates_per_image = []
+    idx = 0
+    for _ in range(len(batch)):
+      candidates_per_image.append(captions_all[idx: idx + self.num_captions])
+      idx += self.num_captions
+
+    blip_ms = now_millis() - start
+
+    results = []
+    for i in range(len(batch)):
+      results.append({
+          "image_bytes": bytes_list[i],
+          "uri": uris[i],
+          "candidates": candidates_per_image[i],
+          "blip_ms": blip_ms,
+      })
+    return results
+
+  def get_metrics_namespace(self) -> str:
+    return "blip_captioning"
+
+
+class ClipRankModelHandler(ModelHandler):
+  def __init__(
+    self,
+    model_name: str,
+    device: str,
+    batch_size: int,
+    score_normalize: bool):
+    self.model_name = model_name
+    self.device = device
+    self.batch_size = batch_size
+    self.score_normalize = score_normalize
+
+    self._model = None
+    self._processor = None
+
+  def load_model(self):
+    from transformers import CLIPModel, CLIPProcessor
+    self._processor = CLIPProcessor.from_pretrained(self.model_name)
+    self._model = CLIPModel.from_pretrained(self.model_name)
+    self._model.eval()
+    self._model.to(self.device)
+    return self._model
+
+  def batch_elements_kwargs(self):
+    return {"max_batch_size": self.batch_size}
+
+  def run_inference(
+      self, batch: List[Dict[str, Any]], model, inference_args=None):
+    if self._model is None:
+      self.load_model()
+
+    start = now_millis()
+
+    results = []
+    with torch.no_grad():
+      for x in batch:
+        image_bytes = x["image_bytes"]
+        candidates = x.get("candidates", [])
+        blip_ms = x.get("blip_ms", None)
+
+        # Decode image
+        try:
+          image = decode_pil(image_bytes)
+        except Exception:
+          image = PILImage.new("RGB", (224, 224), color=(0, 0, 0))
+
+        if not candidates:
+          clip_ms = now_millis() - start
+          results.append({
+              "best_caption": "",
+              "best_score": None,
+              "candidates": [],
+              "scores": [],
+              "blip_ms": blip_ms,
+              "clip_ms": clip_ms,
+              "total_ms": None,
+          })
+          continue
+
+        # CLIPProcessor can accept a single image and list of texts
+        inputs = self._processor(
+            text=candidates, images=image, return_tensors="pt", padding=True)
+        inputs = {k: v.to(self.device) for k, v in inputs.items()}
+
+        outputs = self._model(**inputs)
+        # logits_per_image shape: [1, num_texts]
+        logits = outputs.logits_per_image[0]
+
+        if self.score_normalize:
+          # optional normalization to [0..1] via softmax
+          probs = torch.softmax(logits, dim=-1)
+          scores_t = probs
+        else:
+          scores_t = logits
+
+        scores = scores_t.detach().cpu().tolist()
+        best_idx = int(torch.argmax(scores_t).item())
+        best_caption = candidates[best_idx]
+        best_score = float(scores[best_idx])
+
+        clip_ms = now_millis() - start
+        total_ms = None
+        if blip_ms is not None:
+          total_ms = int(blip_ms) + int(clip_ms)
+
+        results.append({
+            "best_caption": best_caption,
+            "best_score": best_score,
+            "candidates": candidates,
+            "scores": scores,
+            "blip_ms": blip_ms,
+            "clip_ms": clip_ms,
+            "total_ms": total_ms,
+        })
+
+    return results
+
+  def get_metrics_namespace(self) -> str:
+    return "clip_ranking"
+
+
+# ============ Args & Helpers ============
+
+
+def parse_known_args(argv):
+  parser = argparse.ArgumentParser()
+
+  # I/O & runtime
+  parser.add_argument('--mode', default='batch', choices=['batch'])
+  parser.add_argument(
+      '--input', required=True, help='GCS path to file with image URIs')
+  parser.add_argument(
+      '--output_table',
+      required=True,
+      help='BigQuery output table: dataset.table')
+  parser.add_argument(
+      '--publish_to_big_query', default='true', choices=['true', 'false'])
+
+  # Device
+  parser.add_argument('--device', default='GPU', choices=['CPU', 'GPU'])
+
+  # BLIP
+  parser.add_argument(
+      '--blip_model_name', default='Salesforce/blip-image-captioning-base')
+  parser.add_argument('--blip_batch_size', type=int, default=4)
+  parser.add_argument('--num_captions', type=int, default=5)
+  parser.add_argument('--max_new_tokens', type=int, default=30)
+  parser.add_argument('--num_beams', type=int, default=5)
+
+  # CLIP
+  parser.add_argument('--clip_model_name', 
default='openai/clip-vit-base-patch32')
+  parser.add_argument('--clip_batch_size', type=int, default=8)
+  parser.add_argument(
+      '--clip_score_normalize', default='false', choices=['true', 'false'])
+
+  known_args, pipeline_args = parser.parse_known_args(argv)
+  return known_args, pipeline_args
+
+
+# ============ Main pipeline ============
+
+
+def run(
+    argv=None, save_main_session=True, test_pipeline=None) -> PipelineResult:
+  known_args, pipeline_args = parse_known_args(argv)
+
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
+  pipeline_options.view_as(StandardOptions).streaming = False
+
+  device = 'cuda' if known_args.device.upper() == 'GPU' else 'cpu'
+  clip_score_normalize = (known_args.clip_score_normalize == 'true')
+
+  blip_handler = BlipCaptionModelHandler(
+      model_name=known_args.blip_model_name,
+      device=device,
+      batch_size=int(known_args.blip_batch_size),
+      num_captions=int(known_args.num_captions),
+      max_new_tokens=int(known_args.max_new_tokens),
+      num_beams=int(known_args.num_beams),
+  )
+
+  clip_handler = ClipRankModelHandler(
+      model_name=known_args.clip_model_name,
+      device=device,
+      batch_size=int(known_args.clip_batch_size),
+      score_normalize=clip_score_normalize,
+  )
+
+  pipeline = test_pipeline or beam.Pipeline(options=pipeline_options)
+
+  pcoll = (
+      pipeline
+      | 'ReadURIsBatch' >> beam.Create(
+          list(read_gcs_file_lines(known_args.input)))
+      | 'FilterEmptyBatch' >> beam.Filter(lambda s: s.strip()))
+
+  keyed = (
+      pcoll
+      | 'MakeKey' >> beam.ParDo(MakeKeyDoFn())
+      | 'DistinctByKey' >> beam.Distinct())
+
+  images = (
+      keyed
+      | 'ReadImageBytes' >> beam.ParDo(ReadImageBytesDoFn()))
+
+  # Stage 1: BLIP candidate generation
+  blip_out = (
+      images
+      | 'RunInferenceBLIP' >> RunInference(KeyedModelHandler(blip_handler)))
+
+  # Stage 2: CLIP ranking over candidates
+  clip_out = (
+      blip_out
+      | 'RunInferenceCLIP' >> RunInference(KeyedModelHandler(clip_handler)))
+
+  results = (
+      clip_out
+      | 'PostProcess' >> beam.ParDo(PostProcessDoFn(
+      blip_name=known_args.blip_model_name,
+      clip_name=known_args.clip_model_name)))
+
+  if known_args.publish_to_big_query == 'true':
+    _ = (
+        results
+        | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
+        known_args.output_table,
+        schema=(
+            'image_id:STRING, blip_model:STRING, clip_model:STRING, '
+            'best_caption:STRING, best_score:FLOAT, '
+            'candidates:STRING, scores:STRING, '
+            'blip_ms:INT64, clip_ms:INT64, total_ms:INT64, infer_ms:INT64'),
+        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
+        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+        method=beam.io.WriteToBigQuery.Method.FILE_LOADS))
+
+  result = pipeline.run()
+  result.wait_until_finish(duration=1800000)  # 30 min
+  try:
+    result.cancel()
+  except Exception:
+    pass
+  return result
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  run()
diff --git 
a/sdks/python/apache_beam/ml/inference/pytorch_image_captioning_blip_clip_requirements.txt
 
b/sdks/python/apache_beam/ml/inference/pytorch_image_captioning_blip_clip_requirements.txt
new file mode 100644
index 00000000000..6eb308d8745
--- /dev/null
+++ 
b/sdks/python/apache_beam/ml/inference/pytorch_image_captioning_blip_clip_requirements.txt
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+torch>=2.2.0,<2.8.0
+torchvision>=0.17.0,<0.21.0
+Pillow>=10.0.0
+numpy>=1.25.0
+transformers>=4.41.0,<5.0.0
+accelerate>=0.30.0
+tokenizers>=0.19.0
+safetensors>=0.4.3
+protobuf>=4.25.1
+requests>=2.31.0
+google-cloud-monitoring>=2.27.0
diff --git 
a/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_image_captioning_benchmarks.py
 
b/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_image_captioning_benchmarks.py
new file mode 100644
index 00000000000..2960bfb7468
--- /dev/null
+++ 
b/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_image_captioning_benchmarks.py
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# pytype: skip-file
+
+import logging
+
+from apache_beam.examples.inference import pytorch_image_captioning
+from apache_beam.testing.load_tests.dataflow_cost_benchmark import 
DataflowCostBenchmark
+
+
+class PytorchImageCaptioningBenchmarkTest(DataflowCostBenchmark):
+  def __init__(self):
+    self.metrics_namespace = 'BeamML_PyTorch'
+    super().__init__(
+        metrics_namespace=self.metrics_namespace,
+        pcollection='PostProcess.out0')
+
+  def test(self):
+    extra_opts = {}
+    extra_opts['input'] = self.pipeline.get_option('input_file')
+    self.result = pytorch_image_captioning.run(
+        self.pipeline.get_full_options_as_args(**extra_opts),
+        test_pipeline=self.pipeline)
+
+
+if __name__ == '__main__':
+  logging.basicConfig(level=logging.INFO)
+  PytorchImageCaptioningBenchmarkTest().run()
diff --git a/website/www/site/content/en/performance/_index.md 
b/website/www/site/content/en/performance/_index.md
index a96e2ee03bd..138b98391bc 100644
--- a/website/www/site/content/en/performance/_index.md
+++ b/website/www/site/content/en/performance/_index.md
@@ -58,3 +58,4 @@ See the following pages for performance measures recorded 
when running various B
 - [TensorFlow MNIST Image Classification](/performance/tensorflowmnist)
 - [VLLM Gemma Batch Completion Tesla T4 GPU](/performance/vllmgemmabatchtesla)
 - [PyTorch Image Object Detection Faster R-CNN ResNet-50 
Batch](/performance/pytorchimageobjectdetection)
+- [PyTorch Image Captioning BLIP + CLIP 
Batch](/performance/pytorchimagecaptioning)
diff --git 
a/website/www/site/content/en/performance/pytorchimagecaptioning/_index.md 
b/website/www/site/content/en/performance/pytorchimagecaptioning/_index.md
new file mode 100644
index 00000000000..2c568486f89
--- /dev/null
+++ b/website/www/site/content/en/performance/pytorchimagecaptioning/_index.md
@@ -0,0 +1,42 @@
+---
+title: "PyTorch Image Captioning BLIP + CLIP Batch Performance"
+---
+
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# PyTorch Image Captioning BLIP + CLIP Batch
+
+**Model**: PyTorch Image Captioning — BLIP (candidate generation) + CLIP 
(ranking)
+**Accelerator**: Tesla T4 GPU (fixed batch size)
+**Host**: 50 × n1-standard-4 (4 vCPUs, 15 GB RAM)
+
+This batch pipeline performs image captioning using a multi-model open-source 
PyTorch approach.
+It first generates multiple candidate captions per image using a BLIP model, 
then ranks these candidates with a CLIP model based on image-text similarity.
+The pipeline runs batched GPU inference with fixed batch sizes and ensures 
exactly-once semantics through deterministic input deduplication and file-based 
BigQuery writes, enabling stable and reproducible performance measurements 
across batch runs.
+
+The following graphs show various metrics when running PyTorch Image 
Captioning BLIP + CLIP Batch pipeline.
+See the [glossary](/performance/glossary) for definitions.
+
+Full pipeline implementation is available 
[here](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/inference/pytorch_image_captioning.py).
+
+## What is the estimated cost to run the pipeline?
+
+{{< performance_looks io="pytorchimagecaptioning" read_or_write="write" 
section="cost" >}}
+
+## How has various metrics changed when running the pipeline for different 
Beam SDK versions?
+
+{{< performance_looks io="pytorchimagecaptioning" read_or_write="write" 
section="version" >}}
+
+## How has various metrics changed over time when running the pipeline?
+
+{{< performance_looks io="pytorchimagecaptioning" read_or_write="write" 
section="date" >}}
diff --git a/website/www/site/data/performance.yaml 
b/website/www/site/data/performance.yaml
index e669ca3b63c..8f561fda49e 100644
--- a/website/www/site/data/performance.yaml
+++ b/website/www/site/data/performance.yaml
@@ -266,3 +266,19 @@ looks:
           title: AvgThroughputBytesPerSec by Version
         - id: #TODO
           title: AvgThroughputElementsPerSec by Version
+  pytorchimagecaptioning:
+    write:
+      folder: #TODO
+      cost:
+        - id: #TODO
+          title: RunTime and EstimatedCost
+      date:
+        - id: #TODO
+          title: AvgThroughputBytesPerSec by Date
+        - id: #TODO
+          title: AvgThroughputElementsPerSec by Date
+      version:
+        - id: #TODO
+          title: AvgThroughputBytesPerSec by Version
+        - id: #TODO
+          title: AvgThroughputElementsPerSec by Version

Reply via email to