This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e83a1d5bad9 Pin tensor_rt digest for PyTorch sentiment Dataflow 
benchmarks (#38374)
e83a1d5bad9 is described below

commit e83a1d5bad9ed96550adff9aa1df40287a24f97f
Author: Abdelrahman Ibrahim <[email protected]>
AuthorDate: Tue May 19 22:51:31 2026 +0200

    Pin tensor_rt digest for PyTorch sentiment Dataflow benchmarks (#38374)
    
    * Fix DistilBERT config compatibility in sentiment benchmark
    Pin tensor_rt digest for PyTorch sentiment Dataflow benchmarks
    
    * Harden Dataflow PyTorch sentiment benchmark worker compatibility
    
    * Added default for DisplayData for table row inference batch benchmark
    
    * used tensor_rt:latest for sentiment dataflow
---
 ...rch_Sentiment_Batch_DistilBert_Base_Uncased.txt |  3 +
 ...Sentiment_Streaming_DistilBert_Base_Uncased.txt |  3 +
 .../examples/inference/pytorch_sentiment.py        | 80 +++++++++++++++++-----
 .../inference/table_row_inference_benchmark.py     |  4 +-
 4 files changed, 69 insertions(+), 21 deletions(-)

diff --git 
a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Batch_DistilBert_Base_Uncased.txt
 
b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Batch_DistilBert_Base_Uncased.txt
index 4642156d57c..8dbdfa5cd0e 100644
--- 
a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Batch_DistilBert_Base_Uncased.txt
+++ 
b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Batch_DistilBert_Base_Uncased.txt
@@ -18,6 +18,7 @@
 --machine_type=n1-standard-2
 --num_workers=20
 --max_num_workers=250
+--timeout_ms=600000
 --disk_size_gb=50
 --autoscaling_algorithm=THROUGHPUT_BASED
 --staging_location=gs://temp-storage-for-perf-tests/loadtests
@@ -31,5 +32,7 @@
 --device=CPU
 --input_file=gs://apache-beam-ml/testing/inputs/sentences_50k.txt
 --runner=DataflowRunner
+--sdk_location=container
+--sdk_container_image=us.gcr.io/apache-beam-testing/python-postcommit-it/tensor_rt:latest
 --model_path=distilbert-base-uncased-finetuned-sst-2-english
 
--model_state_dict_path=gs://apache-beam-ml/models/huggingface.sentiment.distilbert-base-uncased.pth
\ No newline at end of file
diff --git 
a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Streaming_DistilBert_Base_Uncased.txt
 
b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Streaming_DistilBert_Base_Uncased.txt
index d10b9bb2dfc..4d285bbe0eb 100644
--- 
a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Streaming_DistilBert_Base_Uncased.txt
+++ 
b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Streaming_DistilBert_Base_Uncased.txt
@@ -18,6 +18,7 @@
 --machine_type=n1-standard-2
 --num_workers=20
 --max_num_workers=250
+--timeout_ms=600000
 --disk_size_gb=50
 --autoscaling_algorithm=THROUGHPUT_BASED
 --staging_location=gs://temp-storage-for-perf-tests/loadtests
@@ -31,6 +32,8 @@
 --device=CPU
 --input_file=gs://apache-beam-ml/testing/inputs/sentences_50k.txt
 --runner=DataflowRunner
+--sdk_location=container
+--sdk_container_image=us.gcr.io/apache-beam-testing/python-postcommit-it/tensor_rt:latest
 
--dataflow_service_options=worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver
 --model_path=distilbert-base-uncased-finetuned-sst-2-english
 
--model_state_dict_path=gs://apache-beam-ml/models/huggingface.sentiment.distilbert-base-uncased.pth
diff --git a/sdks/python/apache_beam/examples/inference/pytorch_sentiment.py 
b/sdks/python/apache_beam/examples/inference/pytorch_sentiment.py
index 3bb36930a04..95baca9c030 100644
--- a/sdks/python/apache_beam/examples/inference/pytorch_sentiment.py
+++ b/sdks/python/apache_beam/examples/inference/pytorch_sentiment.py
@@ -47,9 +47,6 @@ from transformers import DistilBertTokenizerFast
 
 class SentimentPostProcessor(beam.DoFn):
   """Processes PredictionResult to extract sentiment label and confidence."""
-  def __init__(self, tokenizer: DistilBertTokenizerFast):
-    self.tokenizer = tokenizer
-
   def process(self, element: tuple[str, PredictionResult]) -> Iterable[dict]:
     text, prediction_result = element
     logits = prediction_result.inference['logits']
@@ -62,16 +59,34 @@ class SentimentPostProcessor(beam.DoFn):
     }
 
 
-def tokenize_text(text: str,
-                  tokenizer: DistilBertTokenizerFast) -> tuple[str, dict]:
-  """Tokenizes input text using the specified tokenizer."""
-  tokenized = tokenizer(
-      text,
-      padding='max_length',
-      truncation=True,
-      max_length=128,
-      return_tensors="pt")
-  return text, {k: torch.squeeze(v) for k, v in tokenized.items()}
+class TokenizeTextDoFn(beam.DoFn):
+  """Initializes tokenizer per worker and tokenizes input text."""
+  def __init__(self, model_path: str):
+    self.model_path = model_path
+    self.tokenizer = None
+
+  def setup(self):
+    self.tokenizer = DistilBertTokenizerFast.from_pretrained(self.model_path)
+    if self.tokenizer.pad_token is None:
+      self.tokenizer.pad_token = '[PAD]'
+
+  def process(self, text: str) -> Iterable[tuple[str, dict]]:
+    tokenized = self.tokenizer(
+        text,
+        padding='max_length',
+        truncation=True,
+        max_length=128,
+        return_tensors="pt")
+    yield text, {k: torch.squeeze(v, 0) for k, v in tokenized.items()}
+
+
+class DistilBertForSequenceClassificationCompat(
+    DistilBertForSequenceClassification):
+  """Builds config in worker runtime to avoid cross-env config drift."""
+  def __init__(self, model_name: str, num_labels: int = 2):
+    config = _ensure_transformers_config_compat(
+        DistilBertConfig.from_pretrained(model_name, num_labels=num_labels))
+    super().__init__(config)
 
 
 class RateLimitDoFn(beam.DoFn):
@@ -83,6 +98,31 @@ class RateLimitDoFn(beam.DoFn):
     yield element
 
 
+def _ensure_transformers_config_compat(
+    config: DistilBertConfig) -> DistilBertConfig:
+  """Adds missing config attributes for cross-version transformers 
compatibility.
+
+  The benchmark can run with container images whose transformers version 
differs
+  from the launcher environment. Some versions assume these attributes exist.
+  """
+  # Use a default config instance as the source of canonical attributes for the
+  # transformers version available on the worker. This avoids chasing one
+  # missing field at a time (e.g. torchscript, output_attentions).
+  default_config = DistilBertConfig()
+  for key, value in default_config.to_dict().items():
+    if not hasattr(config, key):
+      setattr(config, key, value)
+
+  # Keep non-serialized fields explicitly for older/newer transformers mixes.
+  if not hasattr(config, 'pruned_heads'):
+    config.pruned_heads = {}
+  if not hasattr(config, 'torchscript'):
+    config.torchscript = False
+  if not hasattr(config, 'return_dict'):
+    config.return_dict = True
+  return config
+
+
 def parse_known_args(argv):
   """Parses command-line arguments for pipeline execution."""
   parser = argparse.ArgumentParser()
@@ -235,13 +275,14 @@ def run(
     pipeline_options.view_as(StandardOptions).streaming = True
 
   model_handler = PytorchModelHandlerKeyedTensor(
-      model_class=DistilBertForSequenceClassification,
-      model_params={'config': DistilBertConfig(num_labels=2)},
+      model_class=DistilBertForSequenceClassificationCompat,
+      model_params={
+          'model_name': known_args.model_path,
+          'num_labels': 2,
+      },
       state_dict_path=known_args.model_state_dict_path,
       device='GPU')
 
-  tokenizer = DistilBertTokenizerFast.from_pretrained(known_args.model_path)
-
   pipeline = test_pipeline or beam.Pipeline(options=pipeline_options)
 
   # Main pipeline: read, process, write result to BigQuery output table
@@ -264,9 +305,9 @@ def run(
 
   _ = (
       input
-      | 'Tokenize' >> beam.Map(lambda text: tokenize_text(text, tokenizer))
+      | 'Tokenize' >> beam.ParDo(TokenizeTextDoFn(known_args.model_path))
       | 'RunInference' >> RunInference(KeyedModelHandler(model_handler))
-      | 'PostProcess' >> beam.ParDo(SentimentPostProcessor(tokenizer))
+      | 'PostProcess' >> beam.ParDo(SentimentPostProcessor())
       | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
           known_args.output_table,
           schema='text:STRING, sentiment:STRING, confidence:FLOAT',
@@ -277,6 +318,7 @@ def run(
   result = pipeline.run()
   result.wait_until_finish(duration=1800000)  # 30 min
   result.cancel()
+  result.wait_until_finish(duration=600000)  # up to 10 min to settle cancel
 
   cleanup_pubsub_resources(
       project=known_args.project,
diff --git 
a/sdks/python/apache_beam/testing/benchmarks/inference/table_row_inference_benchmark.py
 
b/sdks/python/apache_beam/testing/benchmarks/inference/table_row_inference_benchmark.py
index bca5263b9f9..b8591a0fea8 100644
--- 
a/sdks/python/apache_beam/testing/benchmarks/inference/table_row_inference_benchmark.py
+++ 
b/sdks/python/apache_beam/testing/benchmarks/inference/table_row_inference_benchmark.py
@@ -44,8 +44,8 @@ class TableRowInferenceOptions(
   @classmethod
   def _add_argparse_args(cls, parser):
     parser.add_argument('--mode', default='batch')
-    parser.add_argument('--input_subscription')
-    parser.add_argument('--input_file')
+    parser.add_argument('--input_subscription', default='')
+    parser.add_argument('--input_file', default='')
     parser.add_argument('--output_table')
     parser.add_argument('--model_path')
     parser.add_argument('--feature_columns')

Reply via email to