This is an automated email from the ASF dual-hosted git repository.
jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 34b8eea96f8 Introduce Python Wordcount with Rust Wrapped Functions
(#37234)
34b8eea96f8 is described below
commit 34b8eea96f827f3c732dba14042b38496b42ad10
Author: Jack McCluskey <[email protected]>
AuthorDate: Tue Jan 6 14:02:11 2026 -0500
Introduce Python Wordcount with Rust Wrapped Functions (#37234)
* Introduce Python Wordcount with Rust Wrapped Functions
* apache licenses
* one more license
* formatting and whitespace
---
.../apache_beam/examples/wordcount_rust/README.md | 51 +++++
.../examples/wordcount_rust/requirements.txt | 19 ++
.../wordcount_rust/word_processing/Cargo.lock | 234 +++++++++++++++++++++
.../wordcount_rust/word_processing/Cargo.toml | 30 +++
.../wordcount_rust/word_processing/pyproject.toml | 30 +++
.../wordcount_rust/word_processing/src/lib.rs | 38 ++++
.../examples/wordcount_rust/wordcount_rust.py | 86 ++++++++
7 files changed, 488 insertions(+)
diff --git a/sdks/python/apache_beam/examples/wordcount_rust/README.md
b/sdks/python/apache_beam/examples/wordcount_rust/README.md
new file mode 100644
index 00000000000..c02bd9ca8be
--- /dev/null
+++ b/sdks/python/apache_beam/examples/wordcount_rust/README.md
@@ -0,0 +1,51 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+This directory contains an example of a Python pipeline that uses Rust DoFns
to perform some of the string processing in wordcount. This is performed using
[PyO3](https://pyo3.rs/v0.27.2/) to produce bindings for the Rust code, managed
using the [maturin](https://github.com/PyO3/maturin) python package.
+
+This example should be built and run in a Python virtual environment with
Apache Beam and maturin installed. The `requirements.txt` file in this
directory can be used to install the version of maturin used when the example
was created.
+
+To build the Rust code, run the following from the wordcount_rust directory:
+
+```bash
+cd ./word_processing
+maturin develop
+```
+
+This will compile the Rust code and build a Python package linked to it in the
current environment. The resulting package can be imported as a Python module
called `word_processing`.
+
+To execute wordcount locally using the direct runner, execute the following
from the wordcount_rust directory within the same virtual environment:
+
+```bash
+python wordcount.py --runner DirectRunner --input * --output counts.txt
+```
+
+To execute wordcount using the Dataflow runner, the tarball of the PyO3 Rust
package must be provided to GCP. This is done by building the tarball then
providing it as an `extra_package` argument. The tarball can be built using the
following command from the wordcount_rust directory:
+
+```bash
+cd ./word_processing
+python -m build --sdist
+```
+This places the tarball in `./word_processing/dist` as
`word_processing-0.1.0.tar.gz`. Job submission to Dataflow from the
`wordcount_rust` directory then looks like the following:
+
+```bash
+python wordcount.py --runner DataflowRunner --input
gs://apache-beam-samples/shakespeare/*.txt --output
gs://<YOUR_BUCKET>/wordcount_rust/counts.txt --project <YOUR_PROJECT> --region
<YOUR_REGION> --extra_package
./word_processing/dist/word_processing-0.1.0.tar.gz
+```
+
+The job will then execute on Dataflow, installing the Rust package during
worker setup. Wordcount will then execute and produce a counts.txt file in the
specified output bucket.
\ No newline at end of file
diff --git a/sdks/python/apache_beam/examples/wordcount_rust/requirements.txt
b/sdks/python/apache_beam/examples/wordcount_rust/requirements.txt
new file mode 100644
index 00000000000..44c79623571
--- /dev/null
+++ b/sdks/python/apache_beam/examples/wordcount_rust/requirements.txt
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+build=1.3.0
+maturin==1.11.2
\ No newline at end of file
diff --git
a/sdks/python/apache_beam/examples/wordcount_rust/word_processing/Cargo.lock
b/sdks/python/apache_beam/examples/wordcount_rust/word_processing/Cargo.lock
new file mode 100644
index 00000000000..822dcb69a05
--- /dev/null
+++ b/sdks/python/apache_beam/examples/wordcount_rust/word_processing/Cargo.lock
@@ -0,0 +1,234 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This file is automatically @generated by Cargo.
+# It is not intended for manual editing.
+version = 4
+
+[[package]]
+name = "aho-corasick"
+version = "1.1.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301"
+dependencies = [
+ "memchr",
+]
+
+[[package]]
+name = "autocfg"
+version = "1.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
+
+[[package]]
+name = "heck"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
+
+[[package]]
+name = "indoc"
+version = "2.0.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "79cf5c93f93228cf8efb3ba362535fb11199ac548a09ce117c9b1adc3030d706"
+dependencies = [
+ "rustversion",
+]
+
+[[package]]
+name = "libc"
+version = "0.2.179"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c5a2d376baa530d1238d133232d15e239abad80d05838b4b59354e5268af431f"
+
+[[package]]
+name = "memchr"
+version = "2.7.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273"
+
+[[package]]
+name = "memoffset"
+version = "0.9.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "488016bfae457b036d996092f6cb448677611ce4449e970ceaf42695203f218a"
+dependencies = [
+ "autocfg",
+]
+
+[[package]]
+name = "once_cell"
+version = "1.21.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d"
+
+[[package]]
+name = "portable-atomic"
+version = "1.13.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f89776e4d69bb58bc6993e99ffa1d11f228b839984854c7daeb5d37f87cbe950"
+
+[[package]]
+name = "proc-macro2"
+version = "1.0.105"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "535d180e0ecab6268a3e718bb9fd44db66bbbc256257165fc699dadf70d16fe7"
+dependencies = [
+ "unicode-ident",
+]
+
+[[package]]
+name = "pyo3"
+version = "0.27.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ab53c047fcd1a1d2a8820fe84f05d6be69e9526be40cb03b73f86b6b03e6d87d"
+dependencies = [
+ "indoc",
+ "libc",
+ "memoffset",
+ "once_cell",
+ "portable-atomic",
+ "pyo3-build-config",
+ "pyo3-ffi",
+ "pyo3-macros",
+ "unindent",
+]
+
+[[package]]
+name = "pyo3-build-config"
+version = "0.27.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b455933107de8642b4487ed26d912c2d899dec6114884214a0b3bb3be9261ea6"
+dependencies = [
+ "target-lexicon",
+]
+
+[[package]]
+name = "pyo3-ffi"
+version = "0.27.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1c85c9cbfaddf651b1221594209aed57e9e5cff63c4d11d1feead529b872a089"
+dependencies = [
+ "libc",
+ "pyo3-build-config",
+]
+
+[[package]]
+name = "pyo3-macros"
+version = "0.27.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0a5b10c9bf9888125d917fb4d2ca2d25c8df94c7ab5a52e13313a07e050a3b02"
+dependencies = [
+ "proc-macro2",
+ "pyo3-macros-backend",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "pyo3-macros-backend"
+version = "0.27.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "03b51720d314836e53327f5871d4c0cfb4fb37cc2c4a11cc71907a86342c40f9"
+dependencies = [
+ "heck",
+ "proc-macro2",
+ "pyo3-build-config",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "quote"
+version = "1.0.43"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dc74d9a594b72ae6656596548f56f667211f8a97b3d4c3d467150794690dc40a"
+dependencies = [
+ "proc-macro2",
+]
+
+[[package]]
+name = "regex"
+version = "1.12.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "843bc0191f75f3e22651ae5f1e72939ab2f72a4bc30fa80a066bd66edefc24d4"
+dependencies = [
+ "aho-corasick",
+ "memchr",
+ "regex-automata",
+ "regex-syntax",
+]
+
+[[package]]
+name = "regex-automata"
+version = "0.4.13"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5276caf25ac86c8d810222b3dbb938e512c55c6831a10f3e6ed1c93b84041f1c"
+dependencies = [
+ "aho-corasick",
+ "memchr",
+ "regex-syntax",
+]
+
+[[package]]
+name = "regex-syntax"
+version = "0.8.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58"
+
+[[package]]
+name = "rustversion"
+version = "1.0.22"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d"
+
+[[package]]
+name = "syn"
+version = "2.0.113"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "678faa00651c9eb72dd2020cbdf275d92eccb2400d568e419efdd64838145cb4"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "unicode-ident",
+]
+
+[[package]]
+name = "target-lexicon"
+version = "0.13.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b1dd07eb858a2067e2f3c7155d54e929265c264e6f37efe3ee7a8d1b5a1dd0ba"
+
+[[package]]
+name = "unicode-ident"
+version = "1.0.22"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5"
+
+[[package]]
+name = "unindent"
+version = "0.2.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7264e107f553ccae879d21fbea1d6724ac785e8c3bfc762137959b5802826ef3"
+
+[[package]]
+name = "word_processing"
+version = "0.1.0"
+dependencies = [
+ "pyo3",
+ "regex",
+]
diff --git
a/sdks/python/apache_beam/examples/wordcount_rust/word_processing/Cargo.toml
b/sdks/python/apache_beam/examples/wordcount_rust/word_processing/Cargo.toml
new file mode 100644
index 00000000000..a2bce1e7303
--- /dev/null
+++ b/sdks/python/apache_beam/examples/wordcount_rust/word_processing/Cargo.toml
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+[package]
+name = "word_processing"
+version = "0.1.0"
+edition = "2024"
+
+# See more keys and their definitions at
https://doc.rust-lang.org/cargo/reference/manifest.html
+[lib]
+name = "word_processing"
+crate-type = ["cdylib"]
+
+[dependencies]
+pyo3 = "0.27.0"
+regex = "1.12.2"
diff --git
a/sdks/python/apache_beam/examples/wordcount_rust/word_processing/pyproject.toml
b/sdks/python/apache_beam/examples/wordcount_rust/word_processing/pyproject.toml
new file mode 100644
index 00000000000..1fb6a7d0c7e
--- /dev/null
+++
b/sdks/python/apache_beam/examples/wordcount_rust/word_processing/pyproject.toml
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+[build-system]
+requires = ["maturin>=1.11,<2.0"]
+build-backend = "maturin"
+
+[project]
+name = "word_processing"
+requires-python = ">=3.8"
+classifiers = [
+ "Programming Language :: Rust",
+ "Programming Language :: Python :: Implementation :: CPython",
+ "Programming Language :: Python :: Implementation :: PyPy",
+]
+dynamic = ["version"]
diff --git
a/sdks/python/apache_beam/examples/wordcount_rust/word_processing/src/lib.rs
b/sdks/python/apache_beam/examples/wordcount_rust/word_processing/src/lib.rs
new file mode 100644
index 00000000000..4f15c18a9ee
--- /dev/null
+++ b/sdks/python/apache_beam/examples/wordcount_rust/word_processing/src/lib.rs
@@ -0,0 +1,38 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+use pyo3::prelude::*;
+
+/// A Python module implemented in Rust.
+#[pymodule]
+mod word_processing {
+ use pyo3::prelude::*;
+ use regex::Regex;
+
+ /// Builds the map of string to tuple(string, int).
+ #[pyfunction]
+ fn map_to_int(a: String) -> PyResult<(String, u32)> {
+ Ok((a, 1))
+ }
+
+ /// Extracts individual words from a line of text.
+ #[pyfunction]
+ fn extract_words(a: String) -> PyResult<Vec<String>> {
+ let re = Regex::new(r"[\w\']+").unwrap();
+ Ok(re.find_iter(&a).map(|m| m.as_str().to_string()).collect())
+ }
+}
diff --git a/sdks/python/apache_beam/examples/wordcount_rust/wordcount_rust.py
b/sdks/python/apache_beam/examples/wordcount_rust/wordcount_rust.py
new file mode 100644
index 00000000000..9dd8ac02395
--- /dev/null
+++ b/sdks/python/apache_beam/examples/wordcount_rust/wordcount_rust.py
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A word-counting workflow."""
+
+# pytype: skip-file
+
+import argparse
+import logging
+import re
+
+import word_processing
+
+import apache_beam as beam
+from apache_beam.io import ReadFromText
+from apache_beam.io import WriteToText
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.runners.runner import PipelineResult
+
+
+def run(argv=None, save_main_session=True) -> PipelineResult:
+ """Main entry point; defines and runs the wordcount pipeline."""
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--input',
+ dest='input',
+ default='gs://dataflow-samples/shakespeare/kinglear.txt',
+ help='Input file to process.')
+ parser.add_argument(
+ '--output',
+ dest='output',
+ required=True,
+ help='Output file to write results to.')
+ known_args, pipeline_args = parser.parse_known_args(argv)
+
+ # We use the save_main_session option because one or more DoFn's in this
+ # workflow rely on global context (e.g., a module imported at module level).
+ pipeline_options = PipelineOptions(pipeline_args)
+ pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
+
+ pipeline = beam.Pipeline(options=pipeline_options)
+
+ # Read the text file[pattern] into a PCollection.
+ lines = pipeline | 'Read' >> ReadFromText(known_args.input)
+
+ counts = (
+ lines
+ | 'Split' >>
+ (beam.ParDo(word_processing.extract_words).with_output_types(str))
+ | 'PairWithOne' >> beam.Map(word_processing.map_to_int)
+ | 'GroupAndSum' >> beam.CombinePerKey(sum))
+
+ # Format the counts into a PCollection of strings.
+ def format_result(word, count):
+ return '%s: %d' % (word, count)
+
+ output = counts | 'Format' >> beam.MapTuple(format_result)
+
+ # Write the output using a "Write" transform that has side effects.
+ # pylint: disable=expression-not-assigned
+ output | 'Write' >> WriteToText(known_args.output)
+
+ # Execute the pipeline and return the result.
+ result = pipeline.run()
+ result.wait_until_finish()
+ return result
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ run()