This is an automated email from the ASF dual-hosted git repository.

jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 34b8eea96f8 Introduce Python Wordcount with Rust Wrapped Functions 
(#37234)
34b8eea96f8 is described below

commit 34b8eea96f827f3c732dba14042b38496b42ad10
Author: Jack McCluskey <[email protected]>
AuthorDate: Tue Jan 6 14:02:11 2026 -0500

    Introduce Python Wordcount with Rust Wrapped Functions (#37234)
    
    * Introduce Python Wordcount with Rust Wrapped Functions
    
    * apache licenses
    
    * one more license
    
    * formatting and whitespace
---
 .../apache_beam/examples/wordcount_rust/README.md  |  51 +++++
 .../examples/wordcount_rust/requirements.txt       |  19 ++
 .../wordcount_rust/word_processing/Cargo.lock      | 234 +++++++++++++++++++++
 .../wordcount_rust/word_processing/Cargo.toml      |  30 +++
 .../wordcount_rust/word_processing/pyproject.toml  |  30 +++
 .../wordcount_rust/word_processing/src/lib.rs      |  38 ++++
 .../examples/wordcount_rust/wordcount_rust.py      |  86 ++++++++
 7 files changed, 488 insertions(+)

diff --git a/sdks/python/apache_beam/examples/wordcount_rust/README.md 
b/sdks/python/apache_beam/examples/wordcount_rust/README.md
new file mode 100644
index 00000000000..c02bd9ca8be
--- /dev/null
+++ b/sdks/python/apache_beam/examples/wordcount_rust/README.md
@@ -0,0 +1,51 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+This directory contains an example of a Python pipeline that uses Rust DoFns 
to perform some of the string processing in wordcount. This is performed using 
[PyO3](https://pyo3.rs/v0.27.2/) to produce bindings for the Rust code, managed 
using the [maturin](https://github.com/PyO3/maturin) python package.
+
+This example should be built and run in a Python virtual environment with 
Apache Beam and maturin installed. The `requirements.txt` file in this 
directory can be used to install the version of maturin used when the example 
was created.
+
+To build the Rust code, run the following from the wordcount_rust directory:
+
+```bash
+cd ./word_processing
+maturin develop
+```
+
+This will compile the Rust code and build a Python package linked to it in the 
current environment. The resulting package can be imported as a Python module 
called `word_processing`.
+
+To execute wordcount locally using the direct runner, execute the following 
from the wordcount_rust directory within the same virtual environment:
+
+```bash
+python wordcount.py --runner DirectRunner --input * --output counts.txt
+```
+
+To execute wordcount using the Dataflow runner, the tarball of the PyO3 Rust 
package must be provided to GCP. This is done by building the tarball then 
providing it as an `extra_package` argument. The tarball can be built using the 
following command from the wordcount_rust directory:
+
+```bash
+cd ./word_processing
+python -m build --sdist
+```
+This places the tarball in `./word_processing/dist` as 
`word_processing-0.1.0.tar.gz`. Job submission to Dataflow from the 
`wordcount_rust` directory then looks like the following:
+
+```bash
+python wordcount.py --runner DataflowRunner --input 
gs://apache-beam-samples/shakespeare/*.txt --output 
gs://<YOUR_BUCKET>/wordcount_rust/counts.txt --project <YOUR_PROJECT> --region 
<YOUR_REGION> --extra_package 
./word_processing/dist/word_processing-0.1.0.tar.gz
+```
+
+The job will then execute on Dataflow, installing the Rust package during 
worker setup. Wordcount will then execute and produce a counts.txt file in the 
specified output bucket.
\ No newline at end of file
diff --git a/sdks/python/apache_beam/examples/wordcount_rust/requirements.txt 
b/sdks/python/apache_beam/examples/wordcount_rust/requirements.txt
new file mode 100644
index 00000000000..44c79623571
--- /dev/null
+++ b/sdks/python/apache_beam/examples/wordcount_rust/requirements.txt
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+build=1.3.0
+maturin==1.11.2
\ No newline at end of file
diff --git 
a/sdks/python/apache_beam/examples/wordcount_rust/word_processing/Cargo.lock 
b/sdks/python/apache_beam/examples/wordcount_rust/word_processing/Cargo.lock
new file mode 100644
index 00000000000..822dcb69a05
--- /dev/null
+++ b/sdks/python/apache_beam/examples/wordcount_rust/word_processing/Cargo.lock
@@ -0,0 +1,234 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This file is automatically @generated by Cargo.
+# It is not intended for manual editing.
+version = 4
+
+[[package]]
+name = "aho-corasick"
+version = "1.1.4"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301"
+dependencies = [
+ "memchr",
+]
+
+[[package]]
+name = "autocfg"
+version = "1.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
+
+[[package]]
+name = "heck"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
+
+[[package]]
+name = "indoc"
+version = "2.0.7"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "79cf5c93f93228cf8efb3ba362535fb11199ac548a09ce117c9b1adc3030d706"
+dependencies = [
+ "rustversion",
+]
+
+[[package]]
+name = "libc"
+version = "0.2.179"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "c5a2d376baa530d1238d133232d15e239abad80d05838b4b59354e5268af431f"
+
+[[package]]
+name = "memchr"
+version = "2.7.6"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273"
+
+[[package]]
+name = "memoffset"
+version = "0.9.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "488016bfae457b036d996092f6cb448677611ce4449e970ceaf42695203f218a"
+dependencies = [
+ "autocfg",
+]
+
+[[package]]
+name = "once_cell"
+version = "1.21.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d"
+
+[[package]]
+name = "portable-atomic"
+version = "1.13.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "f89776e4d69bb58bc6993e99ffa1d11f228b839984854c7daeb5d37f87cbe950"
+
+[[package]]
+name = "proc-macro2"
+version = "1.0.105"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "535d180e0ecab6268a3e718bb9fd44db66bbbc256257165fc699dadf70d16fe7"
+dependencies = [
+ "unicode-ident",
+]
+
+[[package]]
+name = "pyo3"
+version = "0.27.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "ab53c047fcd1a1d2a8820fe84f05d6be69e9526be40cb03b73f86b6b03e6d87d"
+dependencies = [
+ "indoc",
+ "libc",
+ "memoffset",
+ "once_cell",
+ "portable-atomic",
+ "pyo3-build-config",
+ "pyo3-ffi",
+ "pyo3-macros",
+ "unindent",
+]
+
+[[package]]
+name = "pyo3-build-config"
+version = "0.27.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b455933107de8642b4487ed26d912c2d899dec6114884214a0b3bb3be9261ea6"
+dependencies = [
+ "target-lexicon",
+]
+
+[[package]]
+name = "pyo3-ffi"
+version = "0.27.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "1c85c9cbfaddf651b1221594209aed57e9e5cff63c4d11d1feead529b872a089"
+dependencies = [
+ "libc",
+ "pyo3-build-config",
+]
+
+[[package]]
+name = "pyo3-macros"
+version = "0.27.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "0a5b10c9bf9888125d917fb4d2ca2d25c8df94c7ab5a52e13313a07e050a3b02"
+dependencies = [
+ "proc-macro2",
+ "pyo3-macros-backend",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "pyo3-macros-backend"
+version = "0.27.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "03b51720d314836e53327f5871d4c0cfb4fb37cc2c4a11cc71907a86342c40f9"
+dependencies = [
+ "heck",
+ "proc-macro2",
+ "pyo3-build-config",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "quote"
+version = "1.0.43"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "dc74d9a594b72ae6656596548f56f667211f8a97b3d4c3d467150794690dc40a"
+dependencies = [
+ "proc-macro2",
+]
+
+[[package]]
+name = "regex"
+version = "1.12.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "843bc0191f75f3e22651ae5f1e72939ab2f72a4bc30fa80a066bd66edefc24d4"
+dependencies = [
+ "aho-corasick",
+ "memchr",
+ "regex-automata",
+ "regex-syntax",
+]
+
+[[package]]
+name = "regex-automata"
+version = "0.4.13"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "5276caf25ac86c8d810222b3dbb938e512c55c6831a10f3e6ed1c93b84041f1c"
+dependencies = [
+ "aho-corasick",
+ "memchr",
+ "regex-syntax",
+]
+
+[[package]]
+name = "regex-syntax"
+version = "0.8.8"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58"
+
+[[package]]
+name = "rustversion"
+version = "1.0.22"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d"
+
+[[package]]
+name = "syn"
+version = "2.0.113"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "678faa00651c9eb72dd2020cbdf275d92eccb2400d568e419efdd64838145cb4"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "unicode-ident",
+]
+
+[[package]]
+name = "target-lexicon"
+version = "0.13.4"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b1dd07eb858a2067e2f3c7155d54e929265c264e6f37efe3ee7a8d1b5a1dd0ba"
+
+[[package]]
+name = "unicode-ident"
+version = "1.0.22"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5"
+
+[[package]]
+name = "unindent"
+version = "0.2.4"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "7264e107f553ccae879d21fbea1d6724ac785e8c3bfc762137959b5802826ef3"
+
+[[package]]
+name = "word_processing"
+version = "0.1.0"
+dependencies = [
+ "pyo3",
+ "regex",
+]
diff --git 
a/sdks/python/apache_beam/examples/wordcount_rust/word_processing/Cargo.toml 
b/sdks/python/apache_beam/examples/wordcount_rust/word_processing/Cargo.toml
new file mode 100644
index 00000000000..a2bce1e7303
--- /dev/null
+++ b/sdks/python/apache_beam/examples/wordcount_rust/word_processing/Cargo.toml
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+[package]
+name = "word_processing"
+version = "0.1.0"
+edition = "2024"
+
+# See more keys and their definitions at 
https://doc.rust-lang.org/cargo/reference/manifest.html
+[lib]
+name = "word_processing"
+crate-type = ["cdylib"]
+
+[dependencies]
+pyo3 = "0.27.0"
+regex = "1.12.2"
diff --git 
a/sdks/python/apache_beam/examples/wordcount_rust/word_processing/pyproject.toml
 
b/sdks/python/apache_beam/examples/wordcount_rust/word_processing/pyproject.toml
new file mode 100644
index 00000000000..1fb6a7d0c7e
--- /dev/null
+++ 
b/sdks/python/apache_beam/examples/wordcount_rust/word_processing/pyproject.toml
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+[build-system]
+requires = ["maturin>=1.11,<2.0"]
+build-backend = "maturin"
+
+[project]
+name = "word_processing"
+requires-python = ">=3.8"
+classifiers = [
+    "Programming Language :: Rust",
+    "Programming Language :: Python :: Implementation :: CPython",
+    "Programming Language :: Python :: Implementation :: PyPy",
+]
+dynamic = ["version"]
diff --git 
a/sdks/python/apache_beam/examples/wordcount_rust/word_processing/src/lib.rs 
b/sdks/python/apache_beam/examples/wordcount_rust/word_processing/src/lib.rs
new file mode 100644
index 00000000000..4f15c18a9ee
--- /dev/null
+++ b/sdks/python/apache_beam/examples/wordcount_rust/word_processing/src/lib.rs
@@ -0,0 +1,38 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+use pyo3::prelude::*;
+
+/// A Python module implemented in Rust.
+#[pymodule]
+mod word_processing {
+    use pyo3::prelude::*;
+    use regex::Regex;
+
+    /// Builds the map of string to tuple(string, int).
+    #[pyfunction]
+    fn map_to_int(a: String) -> PyResult<(String, u32)> {
+        Ok((a, 1))
+    }
+
+    /// Extracts individual words from a line of text.
+    #[pyfunction]
+    fn extract_words(a: String) -> PyResult<Vec<String>> {
+        let re = Regex::new(r"[\w\']+").unwrap();
+        Ok(re.find_iter(&a).map(|m| m.as_str().to_string()).collect())
+    }
+}
diff --git a/sdks/python/apache_beam/examples/wordcount_rust/wordcount_rust.py 
b/sdks/python/apache_beam/examples/wordcount_rust/wordcount_rust.py
new file mode 100644
index 00000000000..9dd8ac02395
--- /dev/null
+++ b/sdks/python/apache_beam/examples/wordcount_rust/wordcount_rust.py
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A word-counting workflow."""
+
+# pytype: skip-file
+
+import argparse
+import logging
+import re
+
+import word_processing
+
+import apache_beam as beam
+from apache_beam.io import ReadFromText
+from apache_beam.io import WriteToText
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.runners.runner import PipelineResult
+
+
+def run(argv=None, save_main_session=True) -> PipelineResult:
+  """Main entry point; defines and runs the wordcount pipeline."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input',
+      dest='input',
+      default='gs://dataflow-samples/shakespeare/kinglear.txt',
+      help='Input file to process.')
+  parser.add_argument(
+      '--output',
+      dest='output',
+      required=True,
+      help='Output file to write results to.')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  # We use the save_main_session option because one or more DoFn's in this
+  # workflow rely on global context (e.g., a module imported at module level).
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
+
+  pipeline = beam.Pipeline(options=pipeline_options)
+
+  # Read the text file[pattern] into a PCollection.
+  lines = pipeline | 'Read' >> ReadFromText(known_args.input)
+
+  counts = (
+      lines
+      | 'Split' >>
+      (beam.ParDo(word_processing.extract_words).with_output_types(str))
+      | 'PairWithOne' >> beam.Map(word_processing.map_to_int)
+      | 'GroupAndSum' >> beam.CombinePerKey(sum))
+
+  # Format the counts into a PCollection of strings.
+  def format_result(word, count):
+    return '%s: %d' % (word, count)
+
+  output = counts | 'Format' >> beam.MapTuple(format_result)
+
+  # Write the output using a "Write" transform that has side effects.
+  # pylint: disable=expression-not-assigned
+  output | 'Write' >> WriteToText(known_args.output)
+
+  # Execute the pipeline and return the result.
+  result = pipeline.run()
+  result.wait_until_finish()
+  return result
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  run()

Reply via email to