This is an automated email from the ASF dual-hosted git repository.
Amar3tto pushed a commit to branch oss-image-detection
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/oss-image-detection by this
push:
new 5f31795342a Refactoring
5f31795342a is described below
commit 5f31795342a1e27bdd9267e5c8a9985c9659b9ca
Author: Vitaly Terentyev <[email protected]>
AuthorDate: Thu May 21 14:35:35 2026 +0400
Refactoring
---
...beam_Inference_Python_Benchmarks_Dataflow_2.yml | 8 +--
.../examples/inference/pytorch_image_captioning.py | 78 +++++++++-------------
.../inference/pytorch_image_object_detection.py | 40 +++++------
.../inference/pytorch_imagenet_rightfit.py | 30 ++-------
4 files changed, 58 insertions(+), 98 deletions(-)
diff --git a/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow_2.yml
b/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow_2.yml
index 7a1d7c0ebc1..d28f1e2e75d 100644
--- a/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow_2.yml
+++ b/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow_2.yml
@@ -188,7 +188,7 @@ jobs:
-Prunner=DataflowRunner \
-PpythonVersion=3.10 \
-PloadTest.requirementsTxtFile=apache_beam/ml/inference/pytorch_rightfit_requirements.txt
\
- '-PloadTest.args=${{
env.beam_Inference_Python_Benchmarks_Dataflow_2_test_arguments_1 }}
--device=CPU --experiments=enable_streaming_rightfitting --enable_dedup=false
--mode=streaming
--job_name=benchmark-tests-pytorch-imagenet-rightfit-streaming-${{env.NOW_UTC}}
--metrics_table=torch_inference_imagenet_stream_rightfit_cpu
--output_table=apache-beam-testing.beam_run_inference.result_torch_inference_imagenet_stream_rightfit_cpu'
\
+ '-PloadTest.args=${{
env.beam_Inference_Python_Benchmarks_Dataflow_2_test_arguments_1 }}
--device=CPU --experiments=enable_streaming_rightfitting --mode=streaming
--job_name=benchmark-tests-pytorch-imagenet-rightfit-streaming-${{env.NOW_UTC}}
--metrics_table=torch_inference_imagenet_stream_rightfit_cpu
--output_table=apache-beam-testing.beam_run_inference.result_torch_inference_imagenet_stream_rightfit_cpu'
\
- name: run PyTorch Image Classification EfficientNet-B0 Streaming
(Right-fitting Exactly-once) CPU
uses: ./.github/actions/gradle-command-self-hosted-action
timeout-minutes: 180
@@ -199,7 +199,7 @@ jobs:
-Prunner=DataflowRunner \
-PpythonVersion=3.10 \
-PloadTest.requirementsTxtFile=apache_beam/ml/inference/pytorch_rightfit_requirements.txt
\
- '-PloadTest.args=${{
env.beam_Inference_Python_Benchmarks_Dataflow_2_test_arguments_1 }}
--device=CPU --experiments=enable_streaming_rightfitting --enable_dedup=true
--mode=streaming
--job_name=benchmark-tests-pytorch-imagenet-rightfit-streaming-${{env.NOW_UTC}}
--metrics_table=torch_inference_imagenet_stream_rightfit_once_cpu
--output_table=apache-beam-testing.beam_run_inference.result_torch_inference_imagenet_stream_rightfit_once_cpu'
\
+ '-PloadTest.args=${{
env.beam_Inference_Python_Benchmarks_Dataflow_2_test_arguments_1 }}
--device=CPU --experiments=enable_streaming_rightfitting --mode=streaming
--job_name=benchmark-tests-pytorch-imagenet-rightfit-streaming-${{env.NOW_UTC}}
--metrics_table=torch_inference_imagenet_stream_rightfit_once_cpu
--output_table=apache-beam-testing.beam_run_inference.result_torch_inference_imagenet_stream_rightfit_once_cpu'
\
- name: run PyTorch Image Classification EfficientNet-B0 Streaming
(Right-fitting) GPU
uses: ./.github/actions/gradle-command-self-hosted-action
timeout-minutes: 180
@@ -210,7 +210,7 @@ jobs:
-Prunner=DataflowRunner \
-PpythonVersion=3.10 \
-PloadTest.requirementsTxtFile=apache_beam/ml/inference/pytorch_rightfit_requirements.txt
\
- '-PloadTest.args=${{
env.beam_Inference_Python_Benchmarks_Dataflow_2_test_arguments_1 }}
--device=GPU
--experiments=enable_streaming_rightfitting,worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver:5xx
--enable_dedup=false --mode=streaming
--job_name=benchmark-tests-pytorch-imagenet-rightfit-streaming-${{env.NOW_UTC}}
--metrics_table=torch_inference_imagenet_stream_rightfit_gpu
--output_table=apache-beam-testing.beam_run_inference.result_torch_inference_imag
[...]
+ '-PloadTest.args=${{
env.beam_Inference_Python_Benchmarks_Dataflow_2_test_arguments_1 }}
--device=GPU
--experiments=enable_streaming_rightfitting,worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver:5xx
--mode=streaming
--job_name=benchmark-tests-pytorch-imagenet-rightfit-streaming-${{env.NOW_UTC}}
--metrics_table=torch_inference_imagenet_stream_rightfit_gpu
--output_table=apache-beam-testing.beam_run_inference.result_torch_inference_imagenet_stream_rightfit_gpu'
\
- name: run PyTorch Image Classification EfficientNet-B0 Streaming
(Right-fitting Exactly-once) GPU
uses: ./.github/actions/gradle-command-self-hosted-action
timeout-minutes: 180
@@ -221,4 +221,4 @@ jobs:
-Prunner=DataflowRunner \
-PpythonVersion=3.10 \
-PloadTest.requirementsTxtFile=apache_beam/ml/inference/pytorch_rightfit_requirements.txt
\
- '-PloadTest.args=${{
env.beam_Inference_Python_Benchmarks_Dataflow_2_test_arguments_1 }}
--device=GPU
--experiments=enable_streaming_rightfitting,worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver:5xx
--enable_dedup=true --mode=streaming
--job_name=benchmark-tests-pytorch-imagenet-rightfit-streaming-${{env.NOW_UTC}}
--metrics_table=torch_inference_imagenet_stream_rightfit_once_gpu
--output_table=apache-beam-testing.beam_run_inference.result_torch_inference_
[...]
+ '-PloadTest.args=${{
env.beam_Inference_Python_Benchmarks_Dataflow_2_test_arguments_1 }}
--device=GPU
--experiments=enable_streaming_rightfitting,worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver:5xx
--mode=streaming
--job_name=benchmark-tests-pytorch-imagenet-rightfit-streaming-${{env.NOW_UTC}}
--metrics_table=torch_inference_imagenet_stream_rightfit_once_gpu
--output_table=apache-beam-testing.beam_run_inference.result_torch_inference_imagenet_stream_righ
[...]
diff --git
a/sdks/python/apache_beam/examples/inference/pytorch_image_captioning.py
b/sdks/python/apache_beam/examples/inference/pytorch_image_captioning.py
index e138110ef6a..2b25f05654a 100644
--- a/sdks/python/apache_beam/examples/inference/pytorch_image_captioning.py
+++ b/sdks/python/apache_beam/examples/inference/pytorch_image_captioning.py
@@ -59,16 +59,6 @@ def now_millis() -> int:
return int(time.time() * 1000)
-def load_image_from_uri(uri: str) -> bytes:
- with FileSystems.open(uri) as f:
- return f.read()
-
-
-def sha1_hex(s: str) -> str:
- import hashlib
- return hashlib.sha1(s.encode("utf-8")).hexdigest()
-
-
def decode_pil(image_bytes: bytes) -> PILImage.Image:
with PILImage.open(io.BytesIO(image_bytes)) as img:
img = img.convert("RGB")
@@ -80,25 +70,40 @@ def decode_pil(image_bytes: bytes) -> PILImage.Image:
class MakeKeyDoFn(beam.DoFn):
- """Produce (image_id, uri) where image_id is stable for dedup and keys."""
+ """Produce (uri, uri) so the URI is used as the stable key."""
def process(self, element: str):
uri = element
- image_id = sha1_hex(uri)
- yield image_id, uri
+ yield uri, uri
class ReadImageBytesDoFn(beam.DoFn):
- """Turn (image_id, uri) -> (image_id, dict(image_bytes, uri))."""
+ """Turn (uri, uri) -> (uri, dict(image_bytes))."""
def process(self, kv: Tuple[str, str]):
- image_id, uri = kv
+ uri, _ = kv
try:
- b = load_image_from_uri(uri)
- yield image_id, {"image_bytes": b, "uri": uri}
+ with FileSystems.open(uri) as f:
+ image_bytes = f.read()
+ yield uri, {"image_bytes": image_bytes}
except Exception as e:
- logging.warning("Failed to read image %s (%s): %s", image_id, uri, e)
+ logging.warning("Failed to read image %s: %s", uri, e)
return
+class DecodeImageDoFn(beam.DoFn):
+ """Turn (uri, dict(image_bytes)) -> (uri, dict(image))."""
+ def process(self, kv: Tuple[str, Dict[str, Any]]):
+ uri, value = kv
+ image_bytes = value["image_bytes"]
+
+ try:
+ image = decode_pil(image_bytes)
+ except Exception as e:
+ logging.warning("Failed to decode image %s: %s", uri, e)
+ image = PILImage.new("RGB", (224, 224), color=(0, 0, 0))
+
+ yield uri, {"image": image}
+
+
class PostProcessDoFn(beam.DoFn):
"""Final PredictionResult -> row for BigQuery."""
def __init__(self, blip_name: str, clip_name: str):
@@ -106,7 +111,7 @@ class PostProcessDoFn(beam.DoFn):
self.clip_name = clip_name
def process(self, kv: Tuple[str, PredictionResult]):
- image_id, pred = kv
+ uri, pred = kv
if hasattr(pred, "inference"):
inf = pred.inference or {}
else:
@@ -122,7 +127,7 @@ class PostProcessDoFn(beam.DoFn):
total_ms = inf.get("total_ms", None)
yield {
- "image_id": image_id,
+ "image_id": uri,
"blip_model": self.blip_name,
"clip_model": self.clip_name,
"best_caption": best_caption,
@@ -172,19 +177,7 @@ class BlipCaptionModelHandler(ModelHandler):
model.eval()
start = now_millis()
- images = []
- uris = []
- bytes_list = []
- for x in batch:
- b = x["image_bytes"]
- bytes_list.append(b)
- uris.append(x.get("uri", ""))
- try:
- images.append(decode_pil(b))
- except Exception as e:
- # fallback: a blank image (so pipeline keeps going)
- logging.warning("Failed to decode image %s: %s", uris[-1], e)
- images.append(PILImage.new("RGB", (224, 224), color=(0, 0, 0)))
+ images = [x["image"] for x in batch]
# Processor makes pixel_values
inputs = processor(images=images, return_tensors="pt")
@@ -217,10 +210,9 @@ class BlipCaptionModelHandler(ModelHandler):
results = []
for i in range(len(batch)):
results.append({
- "image_bytes": bytes_list[i],
- "uri": uris[i],
- "candidates": candidates_per_image[i],
- "blip_ms": blip_ms,
+ "image": images[i],
+ "candidates": candidates_per_image[i],
+ "blip_ms": blip_ms,
})
return results
@@ -266,17 +258,11 @@ class ClipRankModelHandler(ModelHandler):
blip_ms_list: List[Optional[int]] = []
for x in batch:
- image_bytes = x["image_bytes"]
+ img = x["image"]
candidates = [str(c) for c in (x.get("candidates", []) or [])]
candidates_list.append(candidates)
blip_ms_list.append(x.get("blip_ms", None))
- try:
- img = decode_pil(image_bytes)
- except Exception as e:
- logging.warning("Failed to decode image for CLIP ranking: %s", e)
- img = PILImage.new("RGB", (224, 224), color=(0, 0, 0))
-
start_i = len(texts)
for c in candidates:
images.append(img)
@@ -592,8 +578,8 @@ def run(
allowed_lateness=0))
keyed = (pcoll | 'MakeKey' >> beam.ParDo(MakeKeyDoFn()))
-
- images = (keyed | 'ReadImageBytes' >> beam.ParDo(ReadImageBytesDoFn()))
+ image_bytes = (keyed | 'ReadImageBytes' >> beam.ParDo(ReadImageBytesDoFn()))
+ images = (image_bytes | 'DecodeImage' >> beam.ParDo(DecodeImageDoFn()))
# Stage 1: BLIP candidate generation
blip_out = (
diff --git
a/sdks/python/apache_beam/examples/inference/pytorch_image_object_detection.py
b/sdks/python/apache_beam/examples/inference/pytorch_image_object_detection.py
index 65e489d1881..850c66126e3 100644
---
a/sdks/python/apache_beam/examples/inference/pytorch_image_object_detection.py
+++
b/sdks/python/apache_beam/examples/inference/pytorch_image_object_detection.py
@@ -63,11 +63,6 @@ def now_millis() -> int:
return int(time.time() * 1000)
-def load_image_from_uri(uri: str) -> bytes:
- with FileSystems.open(uri) as f:
- return f.read()
-
-
def decode_to_tens(
image_bytes: bytes,
resize_shorter_side: Optional[int] = None) -> torch.Tensor:
@@ -95,39 +90,36 @@ def decode_to_tens(
return torch.from_numpy(arr)
-def sha1_hex(s: str) -> str:
- import hashlib
- return hashlib.sha1(s.encode("utf-8")).hexdigest()
-
-
# ============ DoFns ============
class MakeKeyDoFn(beam.DoFn):
- """Produce (image_id, uri) where image_id is stable for dedup and keys."""
+ """Produce (uri, uri) where the URI is used as the stable key."""
def process(self, element: str):
uri = element
- image_id = sha1_hex(uri)
- yield image_id, uri
+ yield uri, uri
class DecodePreprocessDoFn(beam.DoFn):
- """Turn (image_id, uri) -> (image_id, tensor)."""
+ """Turn (uri, uri) -> (uri, tensor)."""
def __init__(self, resize_shorter_side: Optional[int] = None):
self.resize_shorter_side = resize_shorter_side
def process(self, kv: Tuple[str, str]):
- image_id, uri = kv
+ uri, _ = kv
start = now_millis()
try:
- b = load_image_from_uri(uri)
- tensor = decode_to_tens(b, resize_shorter_side=self.resize_shorter_side)
+ with FileSystems.open(uri) as f:
+ image_bytes = f.read()
+ tensor = decode_to_tens(
+ image_bytes,
+ resize_shorter_side=self.resize_shorter_side)
preprocess_ms = now_millis() - start
- yield image_id, {
- "tensor": tensor, "preprocess_ms": preprocess_ms, "uri": uri
+ yield uri, {
+ "tensor": tensor, "preprocess_ms": preprocess_ms
}
except Exception as e:
- logging.warning("Decode failed for %s (%s): %s", image_id, uri, e)
+ logging.warning("Decode failed for %s: %s", uri, e)
return
@@ -213,7 +205,7 @@ class PostProcessDoFn(beam.DoFn):
}
def process(self, kv: Tuple[str, PredictionResult]):
- image_id, pred = kv
+ image_uri, pred = kv
# pred can be PredictionResult OR raw torchvision dict.
if hasattr(pred, "inference"):
@@ -226,9 +218,9 @@ class PostProcessDoFn(beam.DoFn):
if not isinstance(inference_obj, dict):
logging.warning(
- "Unexpected inf-ce type for %s: %s", image_id, type(inference_obj))
+ "Unexpected inf-ce type for %s: %s", image_uri, type(inference_obj))
yield {
- "image_id": image_id,
+ "image_id": image_uri,
"model_name": self.model_name,
"detections": json.dumps([]),
"num_detections": 0,
@@ -239,7 +231,7 @@ class PostProcessDoFn(beam.DoFn):
extracted = self._extract_detection(inference_obj)
yield {
- "image_id": image_id,
+ "image_id": image_uri,
"model_name": self.model_name,
"detections": json.dumps(extracted["detections"]),
"num_detections": int(extracted["num_detections"]),
diff --git
a/sdks/python/apache_beam/examples/inference/pytorch_imagenet_rightfit.py
b/sdks/python/apache_beam/examples/inference/pytorch_imagenet_rightfit.py
index 39a02315648..d13923c2670 100644
--- a/sdks/python/apache_beam/examples/inference/pytorch_imagenet_rightfit.py
+++ b/sdks/python/apache_beam/examples/inference/pytorch_imagenet_rightfit.py
@@ -17,9 +17,8 @@
PyTorch EfficientNet-B0 model optimized for T4 GPUs.
It reads image URIs from Pub/Sub, decodes and preprocesses them in parallel,
and runs inference with adaptive batch sizing for optimal GPU utilization.
-The pipeline ensures exactly-once semantics via stateful deduplication and
-idempotent BigQuery writes, allowing stable and reproducible performance
-measurements under continuous load.
+The pipeline targets stable and reproducible performance measurements under
+continuous load.
Resources like Pub/Sub topic/subscription cleanup is handled programmatically.
"""
@@ -36,7 +35,6 @@ import torch
import torch.nn.functional as F
import apache_beam as beam
-from apache_beam.coders import BytesCoder
from apache_beam.io.filesystems import FileSystems
from apache_beam.ml.inference.base import KeyedModelHandler
from apache_beam.ml.inference.base import PredictionResult
@@ -46,7 +44,6 @@ from apache_beam.options.pipeline_options import
PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.runners.runner import PipelineResult
-from apache_beam.transforms import userstate
from apache_beam.transforms import window
from google.api_core.exceptions import NotFound
@@ -116,16 +113,6 @@ class MakeKeyDoFn(beam.DoFn):
yield image_id, uri
-class DedupDoFn(beam.DoFn):
- seen = userstate.ReadModifyWriteStateSpec('seen', BytesCoder())
-
- def process(self, element, seen=beam.DoFn.StateParam(seen)):
- if seen.read() == b'1':
- return
- seen.write(b'1')
- yield element
-
-
class DecodePreprocessDoFn(beam.DoFn):
"""Turn (image_id, bytes|uri) -> (image_id, torch.Tensor)"""
def __init__(
@@ -266,10 +253,6 @@ def parse_known_args(argv):
parser.add_argument('--window_sec', type=int, default=60)
parser.add_argument('--trigger_proc_time_sec', type=int, default=30)
- # Dedup
- parser.add_argument(
- '--enable_dedup', default='false', choices=['true', 'false'])
-
known_args, pipeline_args = parser.parse_known_args(argv)
return known_args, pipeline_args
@@ -478,9 +461,6 @@ def run(
pcoll
| 'MakeKey' >> beam.ParDo(MakeKeyDoFn(input_mode=known_args.input_mode)))
- if known_args.enable_dedup == 'true':
- keyed = keyed | 'Dedup' >> beam.ParDo(DedupDoFn())
-
preprocessed = (
keyed
| 'DecodePreprocess' >> beam.ParDo(
@@ -494,8 +474,10 @@ def run(
'ToKeyedTensor' >> beam.Map(lambda kv: (kv[0], kv[1]["tensor"].float())))
predictions = (
- to_infer
- | 'RunInference' >> RunInference(KeyedModelHandler(model_handler)))
+ to_infer
+ | 'RunInference' >> RunInference(
+ KeyedModelHandler(model_handler)).with_resource_hints(
+ accelerator="type:nvidia-tesla-t4;count:1;install-nvidia-driver"))
results = (
predictions