This is an automated email from the ASF dual-hosted git repository.
Amar3tto pushed a commit to branch test-inference
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/test-inference by this push:
new 37c6ccef0a8 TokenizeTextDoFn
37c6ccef0a8 is described below
commit 37c6ccef0a8f4c3b5569a5a1a4a7e6076e974543
Author: Vitaly Terentyev <[email protected]>
AuthorDate: Fri May 8 15:58:03 2026 +0400
TokenizeTextDoFn
---
...rch_Sentiment_Batch_DistilBert_Base_Uncased.txt | 1 +
...Sentiment_Streaming_DistilBert_Base_Uncased.txt | 1 +
.../examples/inference/pytorch_sentiment.py | 78 +++++++++++++++++-----
3 files changed, 62 insertions(+), 18 deletions(-)
diff --git
a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Batch_DistilBert_Base_Uncased.txt
b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Batch_DistilBert_Base_Uncased.txt
index ba1f911d038..84160de4b04 100644
---
a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Batch_DistilBert_Base_Uncased.txt
+++
b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Batch_DistilBert_Base_Uncased.txt
@@ -18,6 +18,7 @@
--machine_type=n1-standard-2
--num_workers=20
--max_num_workers=250
+--timeout_ms=600000
--disk_size_gb=50
--autoscaling_algorithm=THROUGHPUT_BASED
--staging_location=gs://temp-storage-for-perf-tests/loadtests
diff --git
a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Streaming_DistilBert_Base_Uncased.txt
b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Streaming_DistilBert_Base_Uncased.txt
index 8020c5ed4a1..c1a423ea4d6 100644
---
a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Streaming_DistilBert_Base_Uncased.txt
+++
b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Streaming_DistilBert_Base_Uncased.txt
@@ -18,6 +18,7 @@
--machine_type=n1-standard-2
--num_workers=20
--max_num_workers=250
+--timeout_ms=600000
--disk_size_gb=50
--autoscaling_algorithm=THROUGHPUT_BASED
--staging_location=gs://temp-storage-for-perf-tests/loadtests
diff --git a/sdks/python/apache_beam/examples/inference/pytorch_sentiment.py
b/sdks/python/apache_beam/examples/inference/pytorch_sentiment.py
index 71669522674..892b45c598b 100644
--- a/sdks/python/apache_beam/examples/inference/pytorch_sentiment.py
+++ b/sdks/python/apache_beam/examples/inference/pytorch_sentiment.py
@@ -47,8 +47,6 @@ from transformers import DistilBertTokenizerFast
class SentimentPostProcessor(beam.DoFn):
"""Processes PredictionResult to extract sentiment label and confidence."""
- def __init__(self, tokenizer: DistilBertTokenizerFast):
- self.tokenizer = tokenizer
def process(self, element: tuple[str, PredictionResult]) -> Iterable[dict]:
text, prediction_result = element
@@ -62,16 +60,34 @@ class SentimentPostProcessor(beam.DoFn):
}
-def tokenize_text(text: str,
- tokenizer: DistilBertTokenizerFast) -> tuple[str, dict]:
- """Tokenizes input text using the specified tokenizer."""
- tokenized = tokenizer(
- text,
- padding='max_length',
- truncation=True,
- max_length=128,
- return_tensors="pt")
- return text, {k: torch.squeeze(v) for k, v in tokenized.items()}
+class TokenizeTextDoFn(beam.DoFn):
+ """Initializes tokenizer per worker and tokenizes input text."""
+ def __init__(self, model_path: str):
+ self.model_path = model_path
+ self.tokenizer = None
+
+ def setup(self):
+ self.tokenizer = DistilBertTokenizerFast.from_pretrained(self.model_path)
+ if self.tokenizer.pad_token is None:
+ self.tokenizer.pad_token = '[PAD]'
+
+ def process(self, text: str) -> Iterable[tuple[str, dict]]:
+ tokenized = self.tokenizer(
+ text,
+ padding='max_length',
+ truncation=True,
+ max_length=128,
+ return_tensors="pt")
+ yield text, {k: torch.squeeze(v, 0) for k, v in tokenized.items()}
+
+
+class DistilBertForSequenceClassificationCompat(
+ DistilBertForSequenceClassification):
+ """Builds config in worker runtime to avoid cross-env config drift."""
+ def __init__(self, model_name: str, num_labels: int = 2):
+ config = _ensure_transformers_config_compat(
+ DistilBertConfig.from_pretrained(model_name, num_labels=num_labels))
+ super().__init__(config)
class RateLimitDoFn(beam.DoFn):
@@ -83,6 +99,30 @@ class RateLimitDoFn(beam.DoFn):
yield element
+def _ensure_transformers_config_compat(
+ config: DistilBertConfig) -> DistilBertConfig:
+ """Adds missing config attributes for cross-version transformers
compatibility.
+ The benchmark can run with container images whose transformers version
differs
+ from the launcher environment. Some versions assume these attributes exist.
+ """
+ # Use a default config instance as the source of canonical attributes for the
+ # transformers version available on the worker. This avoids chasing one
+ # missing field at a time (e.g. torchscript, output_attentions).
+ default_config = DistilBertConfig()
+ for key, value in default_config.to_dict().items():
+ if not hasattr(config, key):
+ setattr(config, key, value)
+
+ # Keep non-serialized fields explicitly for older/newer transformers mixes.
+ if not hasattr(config, 'pruned_heads'):
+ config.pruned_heads = {}
+ if not hasattr(config, 'torchscript'):
+ config.torchscript = False
+ if not hasattr(config, 'return_dict'):
+ config.return_dict = True
+ return config
+
+
def parse_known_args(argv):
"""Parses command-line arguments for pipeline execution."""
parser = argparse.ArgumentParser()
@@ -241,13 +281,14 @@ def run(
model_config.pruned_heads = {}
model_handler = PytorchModelHandlerKeyedTensor(
- model_class=DistilBertForSequenceClassification,
- model_params={'config': model_config},
+ model_class=DistilBertForSequenceClassificationCompat,
+ model_params={
+ 'model_name': known_args.model_path,
+ 'num_labels': 2,
+ },
state_dict_path=known_args.model_state_dict_path,
device='GPU')
- tokenizer = DistilBertTokenizerFast.from_pretrained(known_args.model_path)
-
pipeline = test_pipeline or beam.Pipeline(options=pipeline_options)
# Main pipeline: read, process, write result to BigQuery output table
@@ -270,9 +311,9 @@ def run(
_ = (
input
- | 'Tokenize' >> beam.Map(lambda text: tokenize_text(text, tokenizer))
+ | 'Tokenize' >> beam.ParDo(TokenizeTextDoFn(known_args.model_path))
| 'RunInference' >> RunInference(KeyedModelHandler(model_handler))
- | 'PostProcess' >> beam.ParDo(SentimentPostProcessor(tokenizer))
+ | 'PostProcess' >> beam.ParDo(SentimentPostProcessor())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
known_args.output_table,
schema='text:STRING, sentiment:STRING, confidence:FLOAT',
@@ -283,6 +324,7 @@ def run(
result = pipeline.run()
result.wait_until_finish(duration=1800000) # 30 min
result.cancel()
+ result.wait_until_finish(duration=600000) # up to 10 min to settle cancel
cleanup_pubsub_resources(
project=known_args.project,