This is an automated email from the ASF dual-hosted git repository.

Amar3tto pushed a commit to branch test-inference
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/test-inference by this push:
     new 37c6ccef0a8 TokenizeTextDoFn
37c6ccef0a8 is described below

commit 37c6ccef0a8f4c3b5569a5a1a4a7e6076e974543
Author: Vitaly Terentyev <[email protected]>
AuthorDate: Fri May 8 15:58:03 2026 +0400

    TokenizeTextDoFn
---
 ...rch_Sentiment_Batch_DistilBert_Base_Uncased.txt |  1 +
 ...Sentiment_Streaming_DistilBert_Base_Uncased.txt |  1 +
 .../examples/inference/pytorch_sentiment.py        | 78 +++++++++++++++++-----
 3 files changed, 62 insertions(+), 18 deletions(-)

diff --git 
a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Batch_DistilBert_Base_Uncased.txt
 
b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Batch_DistilBert_Base_Uncased.txt
index ba1f911d038..84160de4b04 100644
--- 
a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Batch_DistilBert_Base_Uncased.txt
+++ 
b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Batch_DistilBert_Base_Uncased.txt
@@ -18,6 +18,7 @@
 --machine_type=n1-standard-2
 --num_workers=20
 --max_num_workers=250
+--timeout_ms=600000
 --disk_size_gb=50
 --autoscaling_algorithm=THROUGHPUT_BASED
 --staging_location=gs://temp-storage-for-perf-tests/loadtests
diff --git 
a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Streaming_DistilBert_Base_Uncased.txt
 
b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Streaming_DistilBert_Base_Uncased.txt
index 8020c5ed4a1..c1a423ea4d6 100644
--- 
a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Streaming_DistilBert_Base_Uncased.txt
+++ 
b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Streaming_DistilBert_Base_Uncased.txt
@@ -18,6 +18,7 @@
 --machine_type=n1-standard-2
 --num_workers=20
 --max_num_workers=250
+--timeout_ms=600000
 --disk_size_gb=50
 --autoscaling_algorithm=THROUGHPUT_BASED
 --staging_location=gs://temp-storage-for-perf-tests/loadtests
diff --git a/sdks/python/apache_beam/examples/inference/pytorch_sentiment.py 
b/sdks/python/apache_beam/examples/inference/pytorch_sentiment.py
index 71669522674..892b45c598b 100644
--- a/sdks/python/apache_beam/examples/inference/pytorch_sentiment.py
+++ b/sdks/python/apache_beam/examples/inference/pytorch_sentiment.py
@@ -47,8 +47,6 @@ from transformers import DistilBertTokenizerFast
 
 class SentimentPostProcessor(beam.DoFn):
   """Processes PredictionResult to extract sentiment label and confidence."""
-  def __init__(self, tokenizer: DistilBertTokenizerFast):
-    self.tokenizer = tokenizer
 
   def process(self, element: tuple[str, PredictionResult]) -> Iterable[dict]:
     text, prediction_result = element
@@ -62,16 +60,34 @@ class SentimentPostProcessor(beam.DoFn):
     }
 
 
-def tokenize_text(text: str,
-                  tokenizer: DistilBertTokenizerFast) -> tuple[str, dict]:
-  """Tokenizes input text using the specified tokenizer."""
-  tokenized = tokenizer(
-      text,
-      padding='max_length',
-      truncation=True,
-      max_length=128,
-      return_tensors="pt")
-  return text, {k: torch.squeeze(v) for k, v in tokenized.items()}
+class TokenizeTextDoFn(beam.DoFn):
+  """Initializes tokenizer per worker and tokenizes input text."""
+  def __init__(self, model_path: str):
+    self.model_path = model_path
+    self.tokenizer = None
+
+  def setup(self):
+    self.tokenizer = DistilBertTokenizerFast.from_pretrained(self.model_path)
+    if self.tokenizer.pad_token is None:
+      self.tokenizer.pad_token = '[PAD]'
+
+  def process(self, text: str) -> Iterable[tuple[str, dict]]:
+    tokenized = self.tokenizer(
+        text,
+        padding='max_length',
+        truncation=True,
+        max_length=128,
+        return_tensors="pt")
+    yield text, {k: torch.squeeze(v, 0) for k, v in tokenized.items()}
+
+
+class DistilBertForSequenceClassificationCompat(
+    DistilBertForSequenceClassification):
+  """Builds config in worker runtime to avoid cross-env config drift."""
+  def __init__(self, model_name: str, num_labels: int = 2):
+    config = _ensure_transformers_config_compat(
+        DistilBertConfig.from_pretrained(model_name, num_labels=num_labels))
+    super().__init__(config)
 
 
 class RateLimitDoFn(beam.DoFn):
@@ -83,6 +99,30 @@ class RateLimitDoFn(beam.DoFn):
     yield element
 
 
+def _ensure_transformers_config_compat(
+    config: DistilBertConfig) -> DistilBertConfig:
+  """Adds missing config attributes for cross-version transformers 
compatibility.
+  The benchmark can run with container images whose transformers version 
differs
+  from the launcher environment. Some versions assume these attributes exist.
+  """
+  # Use a default config instance as the source of canonical attributes for the
+  # transformers version available on the worker. This avoids chasing one
+  # missing field at a time (e.g. torchscript, output_attentions).
+  default_config = DistilBertConfig()
+  for key, value in default_config.to_dict().items():
+    if not hasattr(config, key):
+      setattr(config, key, value)
+
+  # Keep non-serialized fields explicitly for older/newer transformers mixes.
+  if not hasattr(config, 'pruned_heads'):
+    config.pruned_heads = {}
+  if not hasattr(config, 'torchscript'):
+    config.torchscript = False
+  if not hasattr(config, 'return_dict'):
+    config.return_dict = True
+  return config
+
+
 def parse_known_args(argv):
   """Parses command-line arguments for pipeline execution."""
   parser = argparse.ArgumentParser()
@@ -241,13 +281,14 @@ def run(
     model_config.pruned_heads = {}
 
   model_handler = PytorchModelHandlerKeyedTensor(
-      model_class=DistilBertForSequenceClassification,
-      model_params={'config': model_config},
+      model_class=DistilBertForSequenceClassificationCompat,
+      model_params={
+          'model_name': known_args.model_path,
+          'num_labels': 2,
+      },
       state_dict_path=known_args.model_state_dict_path,
       device='GPU')
 
-  tokenizer = DistilBertTokenizerFast.from_pretrained(known_args.model_path)
-
   pipeline = test_pipeline or beam.Pipeline(options=pipeline_options)
 
   # Main pipeline: read, process, write result to BigQuery output table
@@ -270,9 +311,9 @@ def run(
 
   _ = (
       input
-      | 'Tokenize' >> beam.Map(lambda text: tokenize_text(text, tokenizer))
+      | 'Tokenize' >> beam.ParDo(TokenizeTextDoFn(known_args.model_path))
       | 'RunInference' >> RunInference(KeyedModelHandler(model_handler))
-      | 'PostProcess' >> beam.ParDo(SentimentPostProcessor(tokenizer))
+      | 'PostProcess' >> beam.ParDo(SentimentPostProcessor())
       | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
           known_args.output_table,
           schema='text:STRING, sentiment:STRING, confidence:FLOAT',
@@ -283,6 +324,7 @@ def run(
   result = pipeline.run()
   result.wait_until_finish(duration=1800000)  # 30 min
   result.cancel()
+  result.wait_until_finish(duration=600000)  # up to 10 min to settle cancel
 
   cleanup_pubsub_resources(
       project=known_args.project,

Reply via email to