This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e83a1d5bad9 Pin tensor_rt digest for PyTorch sentiment Dataflow
benchmarks (#38374)
e83a1d5bad9 is described below
commit e83a1d5bad9ed96550adff9aa1df40287a24f97f
Author: Abdelrahman Ibrahim <[email protected]>
AuthorDate: Tue May 19 22:51:31 2026 +0200
Pin tensor_rt digest for PyTorch sentiment Dataflow benchmarks (#38374)
* Fix DistilBERT config compatibility in sentiment benchmark
Pin tensor_rt digest for PyTorch sentiment Dataflow benchmarks
* Harden Dataflow PyTorch sentiment benchmark worker compatibility
* Added default for DisplayData for table row inference batch benchmark
* used tensor_rt:latest for sentiment dataflow
---
...rch_Sentiment_Batch_DistilBert_Base_Uncased.txt | 3 +
...Sentiment_Streaming_DistilBert_Base_Uncased.txt | 3 +
.../examples/inference/pytorch_sentiment.py | 80 +++++++++++++++++-----
.../inference/table_row_inference_benchmark.py | 4 +-
4 files changed, 69 insertions(+), 21 deletions(-)
diff --git
a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Batch_DistilBert_Base_Uncased.txt
b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Batch_DistilBert_Base_Uncased.txt
index 4642156d57c..8dbdfa5cd0e 100644
---
a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Batch_DistilBert_Base_Uncased.txt
+++
b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Batch_DistilBert_Base_Uncased.txt
@@ -18,6 +18,7 @@
--machine_type=n1-standard-2
--num_workers=20
--max_num_workers=250
+--timeout_ms=600000
--disk_size_gb=50
--autoscaling_algorithm=THROUGHPUT_BASED
--staging_location=gs://temp-storage-for-perf-tests/loadtests
@@ -31,5 +32,7 @@
--device=CPU
--input_file=gs://apache-beam-ml/testing/inputs/sentences_50k.txt
--runner=DataflowRunner
+--sdk_location=container
+--sdk_container_image=us.gcr.io/apache-beam-testing/python-postcommit-it/tensor_rt:latest
--model_path=distilbert-base-uncased-finetuned-sst-2-english
--model_state_dict_path=gs://apache-beam-ml/models/huggingface.sentiment.distilbert-base-uncased.pth
\ No newline at end of file
diff --git
a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Streaming_DistilBert_Base_Uncased.txt
b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Streaming_DistilBert_Base_Uncased.txt
index d10b9bb2dfc..4d285bbe0eb 100644
---
a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Streaming_DistilBert_Base_Uncased.txt
+++
b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Streaming_DistilBert_Base_Uncased.txt
@@ -18,6 +18,7 @@
--machine_type=n1-standard-2
--num_workers=20
--max_num_workers=250
+--timeout_ms=600000
--disk_size_gb=50
--autoscaling_algorithm=THROUGHPUT_BASED
--staging_location=gs://temp-storage-for-perf-tests/loadtests
@@ -31,6 +32,8 @@
--device=CPU
--input_file=gs://apache-beam-ml/testing/inputs/sentences_50k.txt
--runner=DataflowRunner
+--sdk_location=container
+--sdk_container_image=us.gcr.io/apache-beam-testing/python-postcommit-it/tensor_rt:latest
--dataflow_service_options=worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver
--model_path=distilbert-base-uncased-finetuned-sst-2-english
--model_state_dict_path=gs://apache-beam-ml/models/huggingface.sentiment.distilbert-base-uncased.pth
diff --git a/sdks/python/apache_beam/examples/inference/pytorch_sentiment.py
b/sdks/python/apache_beam/examples/inference/pytorch_sentiment.py
index 3bb36930a04..95baca9c030 100644
--- a/sdks/python/apache_beam/examples/inference/pytorch_sentiment.py
+++ b/sdks/python/apache_beam/examples/inference/pytorch_sentiment.py
@@ -47,9 +47,6 @@ from transformers import DistilBertTokenizerFast
class SentimentPostProcessor(beam.DoFn):
"""Processes PredictionResult to extract sentiment label and confidence."""
- def __init__(self, tokenizer: DistilBertTokenizerFast):
- self.tokenizer = tokenizer
-
def process(self, element: tuple[str, PredictionResult]) -> Iterable[dict]:
text, prediction_result = element
logits = prediction_result.inference['logits']
@@ -62,16 +59,34 @@ class SentimentPostProcessor(beam.DoFn):
}
-def tokenize_text(text: str,
- tokenizer: DistilBertTokenizerFast) -> tuple[str, dict]:
- """Tokenizes input text using the specified tokenizer."""
- tokenized = tokenizer(
- text,
- padding='max_length',
- truncation=True,
- max_length=128,
- return_tensors="pt")
- return text, {k: torch.squeeze(v) for k, v in tokenized.items()}
+class TokenizeTextDoFn(beam.DoFn):
+ """Initializes tokenizer per worker and tokenizes input text."""
+ def __init__(self, model_path: str):
+ self.model_path = model_path
+ self.tokenizer = None
+
+ def setup(self):
+ self.tokenizer = DistilBertTokenizerFast.from_pretrained(self.model_path)
+ if self.tokenizer.pad_token is None:
+ self.tokenizer.pad_token = '[PAD]'
+
+ def process(self, text: str) -> Iterable[tuple[str, dict]]:
+ tokenized = self.tokenizer(
+ text,
+ padding='max_length',
+ truncation=True,
+ max_length=128,
+ return_tensors="pt")
+ yield text, {k: torch.squeeze(v, 0) for k, v in tokenized.items()}
+
+
+class DistilBertForSequenceClassificationCompat(
+ DistilBertForSequenceClassification):
+ """Builds config in worker runtime to avoid cross-env config drift."""
+ def __init__(self, model_name: str, num_labels: int = 2):
+ config = _ensure_transformers_config_compat(
+ DistilBertConfig.from_pretrained(model_name, num_labels=num_labels))
+ super().__init__(config)
class RateLimitDoFn(beam.DoFn):
@@ -83,6 +98,31 @@ class RateLimitDoFn(beam.DoFn):
yield element
+def _ensure_transformers_config_compat(
+ config: DistilBertConfig) -> DistilBertConfig:
+ """Adds missing config attributes for cross-version transformers
compatibility.
+
+ The benchmark can run with container images whose transformers version
differs
+ from the launcher environment. Some versions assume these attributes exist.
+ """
+ # Use a default config instance as the source of canonical attributes for the
+ # transformers version available on the worker. This avoids chasing one
+ # missing field at a time (e.g. torchscript, output_attentions).
+ default_config = DistilBertConfig()
+ for key, value in default_config.to_dict().items():
+ if not hasattr(config, key):
+ setattr(config, key, value)
+
+ # Keep non-serialized fields explicitly for older/newer transformers mixes.
+ if not hasattr(config, 'pruned_heads'):
+ config.pruned_heads = {}
+ if not hasattr(config, 'torchscript'):
+ config.torchscript = False
+ if not hasattr(config, 'return_dict'):
+ config.return_dict = True
+ return config
+
+
def parse_known_args(argv):
"""Parses command-line arguments for pipeline execution."""
parser = argparse.ArgumentParser()
@@ -235,13 +275,14 @@ def run(
pipeline_options.view_as(StandardOptions).streaming = True
model_handler = PytorchModelHandlerKeyedTensor(
- model_class=DistilBertForSequenceClassification,
- model_params={'config': DistilBertConfig(num_labels=2)},
+ model_class=DistilBertForSequenceClassificationCompat,
+ model_params={
+ 'model_name': known_args.model_path,
+ 'num_labels': 2,
+ },
state_dict_path=known_args.model_state_dict_path,
device='GPU')
- tokenizer = DistilBertTokenizerFast.from_pretrained(known_args.model_path)
-
pipeline = test_pipeline or beam.Pipeline(options=pipeline_options)
# Main pipeline: read, process, write result to BigQuery output table
@@ -264,9 +305,9 @@ def run(
_ = (
input
- | 'Tokenize' >> beam.Map(lambda text: tokenize_text(text, tokenizer))
+ | 'Tokenize' >> beam.ParDo(TokenizeTextDoFn(known_args.model_path))
| 'RunInference' >> RunInference(KeyedModelHandler(model_handler))
- | 'PostProcess' >> beam.ParDo(SentimentPostProcessor(tokenizer))
+ | 'PostProcess' >> beam.ParDo(SentimentPostProcessor())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
known_args.output_table,
schema='text:STRING, sentiment:STRING, confidence:FLOAT',
@@ -277,6 +318,7 @@ def run(
result = pipeline.run()
result.wait_until_finish(duration=1800000) # 30 min
result.cancel()
+ result.wait_until_finish(duration=600000) # up to 10 min to settle cancel
cleanup_pubsub_resources(
project=known_args.project,
diff --git
a/sdks/python/apache_beam/testing/benchmarks/inference/table_row_inference_benchmark.py
b/sdks/python/apache_beam/testing/benchmarks/inference/table_row_inference_benchmark.py
index bca5263b9f9..b8591a0fea8 100644
---
a/sdks/python/apache_beam/testing/benchmarks/inference/table_row_inference_benchmark.py
+++
b/sdks/python/apache_beam/testing/benchmarks/inference/table_row_inference_benchmark.py
@@ -44,8 +44,8 @@ class TableRowInferenceOptions(
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--mode', default='batch')
- parser.add_argument('--input_subscription')
- parser.add_argument('--input_file')
+ parser.add_argument('--input_subscription', default='')
+ parser.add_argument('--input_file', default='')
parser.add_argument('--output_table')
parser.add_argument('--model_path')
parser.add_argument('--feature_columns')