This is an automated email from the ASF dual-hosted git repository. git-site-role pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/asf-site by this push: new 39ea3ff Publishing website 2022/01/25 06:03:11 at commit abac2df 39ea3ff is described below commit 39ea3ff5aab6e897f6a970cb047b5520c024aca8 Author: jenkins <bui...@apache.org> AuthorDate: Tue Jan 25 06:03:11 2022 +0000 Publishing website 2022/01/25 06:03:11 at commit abac2df --- website/generated-content/documentation/index.xml | 524 ++++++++------------- .../documentation/programming-guide/index.html | 313 ++++++------ website/generated-content/sitemap.xml | 2 +- 3 files changed, 361 insertions(+), 478 deletions(-) diff --git a/website/generated-content/documentation/index.xml b/website/generated-content/documentation/index.xml index d20d6e4..0b839fd 100644 --- a/website/generated-content/documentation/index.xml +++ b/website/generated-content/documentation/index.xml @@ -11581,78 +11581,71 @@ use case.</p> </div> </div> <h2 id="multi-language-pipelines">13. Multi-language pipelines</h2> -<p>This section provides comprehensive documentation of multi-language pipelines. For a short overview of the topic, see:</p> +<p>This section provides comprehensive documentation of multi-language pipelines. To get started creating a multi-language pipeline, see:</p> <ul> <li><a href="/documentation/sdks/python-multi-language-pipelines">Python multi-language pipelines quickstart</a></li> +<li><a href="/documentation/sdks/java-multi-language-pipelines">Java multi-language pipelines quickstart</a></li> </ul> -<p>Beam allows you to combine transforms written in any supported SDK language (currently, Java and Python) and use them in one multi-language pipeline. This capability makes it easy to provide new functionality simultaneously in different Apache Beam SDKs through a single cross-language transform. For example, the <a href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py">Apache Kafka connector</a> and <a href="https://github.com/apache/beam/blo [...] +<p>Beam lets you combine transforms written in any supported SDK language (currently, Java and Python) and use them in one multi-language pipeline. This capability makes it easy to provide new functionality simultaneously in different Apache Beam SDKs through a single cross-language transform. For example, the <a href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py">Apache Kafka connector</a> and <a href="https://github.com/apache/beam/blob/mas [...] <p>Pipelines that use transforms from more than one SDK-language are known as <em>multi-language pipelines</em>.</p> <h3 id="create-x-lang-transforms">13.1. Creating cross-language transforms</h3> -<p>To make transforms written in one language available to pipelines written in another language, an <em>expansion service</em> for transforms written in the same language is used to create and inject the appropriate language-specific pipeline fragments into your pipeline.</p> -<p>In the following example, a Python pipeline written the Apache Beam SDK for Python starts up a local Java expansion service on your computer to create and inject the appropriate Java pipeline fragments for executing the Java Kafka cross-language transform into your Python pipeline. The SDK then downloads and stages the necessary Java dependencies needed to execute these transforms.</p> +<p>To make transforms written in one language available to pipelines written in another language, Beam uses an <em>expansion service</em>, which creates and injects the appropriate language-specific pipeline fragments into the pipeline.</p> +<p>In the following example, a Beam Python pipeline starts up a local Java expansion service to create and inject the appropriate Java pipeline fragments for executing the Java Kafka cross-language transform into the Python pipeline. The SDK then downloads and stages the necessary Java dependencies needed to execute these transforms.</p> <p><img src="/images/multi-language-pipelines-diagram.svg" alt="Diagram of multi-language pipeline execution flow."></p> -<p>At runtime, the Beam runner will execute both Python and Java transforms to execute your pipeline.</p> +<p>At runtime, the Beam runner will execute both Python and Java transforms to run the pipeline.</p> <p>In this section, we will use <a href="https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html">KafkaIO.Read</a> to illustrate how to create a cross-language transform for Java and a test example for Python.</p> <h4 id="1311-creating-cross-language-java-transforms">13.1.1. Creating cross-language Java transforms</h4> <p>There are two ways to make Java transforms available to other SDKs.</p> <ul> <li>Option 1: In some cases, you can use existing Java transforms from other SDKs without writing any additional Java code.</li> -<li>Option 2: You can use arbitrary Java Transforms from other SDKs by adding a few Java classes.</li> +<li>Option 2: You can use arbitrary Java transforms from other SDKs by adding a few Java classes.</li> </ul> -<h5 id="13111-using-existing-java-transforms-from-other-sdks-without-writing-more-java-code">13.1.1.1 Using Existing Java Transforms from Other SDKs Without Writing more Java Code</h5> -<p>Starting with Beam 2.34.0, Python SDK users can use some Java transforms without writing additional Java code. This can be useful in many cases. For example,</p> +<h5 id="13111-using-existing-java-transforms-without-writing-more-java-code">13.1.1.1 Using existing Java transforms without writing more Java code</h5> +<p>Starting with Beam 2.34.0, Python SDK users can use some Java transforms without writing additional Java code. This can be useful in many cases. For example:</p> <ul> -<li>A developer not familiar with Java may need to use an existing Java transform from a Python pipeline</li> -<li>A developer may need to make an existing Java transform available to a Python pipeline without writing/releasing more Java code</li> +<li>A developer not familiar with Java may need to use an existing Java transform from a Python pipeline.</li> +<li>A developer may need to make an existing Java transform available to a Python pipeline without writing/releasing more Java code.</li> </ul> <blockquote> <p><strong>Note:</strong> This feature is currently only available when using Java transforms from a Python pipeline.</p> </blockquote> -<p>To be eligible for direct usage, the API of the Java transform has to follow the following pattern.</p> -<ul> -<li>Requirement 1: The Java transform can be constructed using an available public constructor or a public static method (a constructor method) in the same Java class.</li> -<li>Requirement 2: The Java transform can be configured using one or more builder methods. Each builder method should be public and should return an instance of the Java transform.</li> -</ul> -<p>See below for the structure of an example Java class that can be directly used from the Python API.</p> -<div class="snippet"> -<div class="notebook-skip code-snippet without_switcher"> -<a class="copy" type="button" data-bs-toggle="tooltip" data-bs-placement="bottom" title="Copy to clipboard"> -<img src="/images/copy-icon.svg"/> -</a> -<pre><code>public class JavaDataGenerator extends PTransform&lt;PBegin, PCollection&lt;String&gt;&gt; { -. . . -// Following method satisfies the Requirement 1. -// Note that you may also use a class constructor instead of a static method. -public static JavaDataGenerator create(Integer size) { -return new JavaDataGenerator(size); -} -static class JavaDataGeneratorConfig implements Serializable { -public String prefix; -public long length; -public String suffix; -. . . -} -// Following method conforms to the Requirement 2 -public JavaDataGenerator withJavaDataGeneratorConfig(JavaDataGeneratorConfig dataConfig) { -return new JavaDataGenerator(this.size, javaDataGeneratorConfig); -} -. . . -}</code></pre> -</div> -</div> -<p>To use a Java class that conforms to the above requirement from a Python SDK pipeline you may do the following.</p> -<ul> -<li>Step 1: create an allowlist file in the <em>yaml</em> format that describes the Java transform classes and methods that will be directly accessed from Python.</li> -<li>Step 2: start an Expansion Service with the <code>javaClassLookupAllowlistFile</code> option passing path to the allowlist defined in Step 1 as the value.</li> -<li>Step 3: Use the Python <a href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py">JavaExternalTransform</a> API to directly -access Java transforms defined in the allowlist from the Python side.</li> -</ul> -<p>Starting with Beam 2.35.0, Step 1 and 2 may be skipped as described in corresponding sections below.</p> -<h5 id="step-1">Step 1</h5> -<p>To use this Java transform from Python, you may define an allowlist file in the <em>yaml</em> format. This allowlist lists the class names, +<p>To be eligible for direct usage, the API of the Java transform has to meet the following requirements:</p> +<ol> +<li>The Java transform can be constructed using an available public constructor or a public static method (a constructor method) in the same Java class.</li> +<li>The Java transform can be configured using one or more builder methods. Each builder method should be public and should return an instance of the Java transform.</li> +</ol> +<p>Here&rsquo;s an example Java class that can be directly used from the Python API.</p> +<div class="highlight"><pre class="chroma"><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">JavaDataGenerator</span> <span class="kd">extends</span> <span class="n">PTransform</span><span class="o">&lt;</span><span class="n">PBegin</span><span class="o">,</span> <span class="n">PCollection</span><span class="o">&lt;</span><span class="n [...] +<span class="o">.</span> <span class="o">.</span> <span class="o">.</span> +<span class="c1">// The following method satisfies requirement 1. +</span><span class="c1"></span> <span class="c1">// Note that you could use a class constructor instead of a static method. +</span><span class="c1"></span> <span class="kd">public</span> <span class="kd">static</span> <span class="n">JavaDataGenerator</span> <span class="nf">create</span><span class="o">(</span><span class="n">Integer</span> <span class="n">size</span><span class="o">)</span> <span class="o">{</span> +<span class="k">return</span> <span class="k">new</span> <span class="n">JavaDataGenerator</span><span class="o">(</span><span class="n">size</span><span class="o">);</span> +<span class="o">}</span> +<span class="kd">static</span> <span class="kd">class</span> <span class="nc">JavaDataGeneratorConfig</span> <span class="kd">implements</span> <span class="n">Serializable</span> <span class="o">{</span> +<span class="kd">public</span> <span class="n">String</span> <span class="n">prefix</span><span class="o">;</span> +<span class="kd">public</span> <span class="kt">long</span> <span class="n">length</span><span class="o">;</span> +<span class="kd">public</span> <span class="n">String</span> <span class="n">suffix</span><span class="o">;</span> +<span class="o">.</span> <span class="o">.</span> <span class="o">.</span> +<span class="o">}</span> +<span class="c1">// The following method conforms to requirement 2. +</span><span class="c1"></span> <span class="kd">public</span> <span class="n">JavaDataGenerator</span> <span class="nf">withJavaDataGeneratorConfig</span><span class="o">(</span><span class="n">JavaDataGeneratorConfig</span> <span class="n">dataConfig</span><span class="o">)</span> <span class="o">{</span> +<span class="k">return</span> <span class="k">new</span> <span class="n">JavaDataGenerator</span><span class="o">(</span><span class="k">this</span><span class="o">.</span><span class="na">size</span><span class="o">,</span> <span class="n">javaDataGeneratorConfig</span><span class="o">);</span> +<span class="o">}</span> +<span class="o">.</span> <span class="o">.</span> <span class="o">.</span> +<span class="o">}</span> +</code></pre></div><p>For a complete example, see <a href="https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/JavaDataGenerator.java">JavaDataGenerator</a>.</p> +<p>To use a Java class that conforms to the above requirements from a Python SDK pipeline, follow these steps:</p> +<ol> +<li>Create a <em>yaml</em> allowlist that describes the Java transform classes and methods that will be directly accessed from Python.</li> +<li>Start an expansion service, using the <code>javaClassLookupAllowlistFile</code> option to pass the path to the allowlist.</li> +<li>Use the Python <a href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py">JavaExternalTransform</a> API to directly access Java transforms defined in the allowlist from the Python side.</li> +</ol> +<p>Starting with Beam 2.36.0, steps 1 and 2 can be skipped, as described in the corresponding sections below.</p> +<p><strong>Step 1</strong></p> +<p>To use an eligible Java transform from Python, define a <em>yaml</em> allowlist. This allowlist lists the class names, constructor methods, and builder methods that are directly available to be used from the Python side.</p> -<p>Starting with Beam 2.35.0, you have the option to specify <code>*</code> to the <code>javaClassLookupAllowlistFile</code> option instead of defining an actual allowlist which -denotes that all supported transforms in the classpath of the expansion service may be accessed through the API.</p> +<p>Starting with Beam 2.35.0, you have the option to pass <code>*</code> to the <code>javaClassLookupAllowlistFile</code> option instead of defining an actual allowlist. The <code>*</code> specifies that all supported transforms in the classpath of the expansion service can be accessed through the API. We encourage using an actual allowlist for production, because allowing clients to access arbitrary Java classes can pose a security risk.</p> <div class="snippet"> <div class="notebook-skip code-snippet without_switcher"> <a class="copy" type="button" data-bs-toggle="tooltip" data-bs-placement="bottom" title="Copy to clipboard"> @@ -11667,9 +11660,9 @@ allowedBuilderMethods: - withJavaDataGeneratorConfig</code></pre> </div> </div> -<h5 id="step-2">Step 2</h5> -<p>The allowlist is provided as an argument when starting up the Java expansion service. For example, the expansion service can be started -as a local Java process using the following command.</p> +<p><strong>Step 2</strong></p> +<p>Provide the allowlist as an argument when starting up the Java expansion service. For example, you can start the expansion service +as a local Java process using the following command:</p> <div class="snippet"> <div class="notebook-skip code-snippet without_switcher"> <a class="copy" type="button" data-bs-toggle="tooltip" data-bs-placement="bottom" title="Copy to clipboard"> @@ -11678,62 +11671,46 @@ as a local Java process using the following command.</p> <pre><code>java -jar &lt;jar file&gt; &lt;port&gt; --javaClassLookupAllowlistFile=&lt;path to the allowlist file&gt;</code></pre> </div> </div> -<p>Starting with Beam 2.35.0, Beam ``JavaExternalTransform<code>API will automatically startup an expansion service with a given set of</code>jar` file dependencies -if an expansion service address was not provided.</p> -<h5 id="step-3">Step 3</h5> -<p>You can directly use the Java class from your Python pipeline using a stub transform created using the <code>JavaExternalTransform</code> API. This API allows you to construct the transform -using the Java class name and allows you to invoke builder methods to configure the class.</p> -<p>Constructor and method parameter types are mapped between Python and Java using a Beam Schema. The Schema is auto-generated using the object types -provided on the Python side. If the Java class constructor method or builder method accepts any complex object types, make sure that the Beam Schema +<p>Starting with Beam 2.36.0, the <code>JavaExternalTransform</code> API will automatically start up an expansion service with a given <code>jar</code> file dependency if an expansion service address was not provided.</p> +<p><strong>Step 3</strong></p> +<p>You can use the Java class directly from your Python pipeline using a stub transform created from the <code>JavaExternalTransform</code> API. This API allows you to construct the transform using the Java class name and allows you to invoke builder methods to configure the class.</p> +<p>Constructor and method parameter types are mapped between Python and Java using a Beam schema. The schema is auto-generated using the object types +provided on the Python side. If the Java class constructor method or builder method accepts any complex object types, make sure that the Beam schema for these objects is registered and available for the Java expansion service. If a schema has not been registered, the Java expansion service will -try to register a schema using <a href="https://beam.apache.org/documentation/programming-guide/#creating-schemas">JavaFieldSchema</a>. In Python arbitrary objects -can be represented using <code>NamedTuple</code>s which will be represented as Beam Rows in the Schema. See below for a Python stub transform that represents the above -mentioned Java transform.</p> -<div class="snippet"> -<div class="notebook-skip code-snippet without_switcher"> -<a class="copy" type="button" data-bs-toggle="tooltip" data-bs-placement="bottom" title="Copy to clipboard"> -<img src="/images/copy-icon.svg"/> -</a> -<pre><code>JavaDataGeneratorConfig = typing.NamedTuple( -&#39;JavaDataGeneratorConfig&#39;, [(&#39;prefix&#39;, str), (&#39;length&#39;, int), (&#39;suffix&#39;, str)]) -data_config = JavaDataGeneratorConfig(prefix=&#39;start&#39;, length=20, suffix=&#39;end&#39;) -java_transform = JavaExternalTransform( -&#39;my.beam.transforms.JavaDataGenerator&#39;, expansion_service=&#39;localhost:&lt;port&gt;&#39;).create(numpy.int32(100)).withJavaDataGeneratorConfig(data_config)</code></pre> -</div> -</div> -<p>This transform can be used in a Python pipeline along with other Python transforms.</p> -<h5 id="13112-full-api-for-making-existing-java-transforms-available-to-other-sdks">13.1.1.2 Full API for Making Existing Java Transforms Available to Other SDKs</h5> -<p>To make your Apache Beam Java SDK transform portable across SDK languages, you must implement two interfaces: <a href="https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ExternalTransformBuilder.java">ExternalTransformBuilder</a> and <a href="https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalTransformRegistrar.java">ExternalTransformRegistrar</a>. The <code [...] +try to register a schema using <a href="https://beam.apache.org/documentation/programming-guide/#creating-schemas">JavaFieldSchema</a>. In Python, arbitrary objects +can be represented using <code>NamedTuple</code>s, which will be represented as Beam rows in the schema. Here is a Python stub transform that represents the above +mentioned Java transform:</p> +<div class="highlight"><pre class="chroma"><code class="language-py" data-lang="py"><span class="n">JavaDataGeneratorConfig</span> <span class="o">=</span> <span class="n">typing</span><span class="o">.</span><span class="n">NamedTuple</span><span class="p">(</span> +<span class="s1">&#39;JavaDataGeneratorConfig&#39;</span><span class="p">,</span> <span class="p">[(</span><span class="s1">&#39;prefix&#39;</span><span class="p">,</span> <span class="nb">str</span><span class="p">),</span> <span class="p">(</span><span class="s1">&#39;length&#39;</span><span class="p">,</span> <span class="nb">int</span><span class="p">),</span> <span class="p">(</span [...] +<span class="n">data_config</span> <span class="o">=</span> <span class="n">JavaDataGeneratorConfig</span><span class="p">(</span><span class="n">prefix</span><span class="o">=</span><span class="s1">&#39;start&#39;</span><span class="p">,</span> <span class="n">length</span><span class="o">=</span><span class="mi">20</span><span class="p">,</span> <span class="n">suffix</span><span class="o">=</s [...] +<span class="n">java_transform</span> <span class="o">=</span> <span class="n">JavaExternalTransform</span><span class="p">(</span> +<span class="s1">&#39;my.beam.transforms.JavaDataGenerator&#39;</span><span class="p">,</span> <span class="n">expansion_service</span><span class="o">=</span><span class="s1">&#39;localhost:&lt;port&gt;&#39;</span><span class="p">)</span><span class="o">.</span><span class="n">create</span><span class="p">(</span><span class="n">numpy</span><span class="o">.</span><span class="n">int32</span> [...] +</code></pre></div><p>You can use this transform in a Python pipeline along with other Python transforms. For a complete example, see <a href="https://github.com/apache/beam/blob/master/examples/multi-language/python/javadatagenerator.py">javadatagenerator.py</a>.</p> +<h5 id="13112-using-the-api-to-make-existing-java-transforms-available-to-other-sdks">13.1.1.2 Using the API to make existing Java transforms available to other SDKs</h5> +<p>To make your Beam Java SDK transform portable across SDK languages, you must implement two interfaces: <a href="https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ExternalTransformBuilder.java">ExternalTransformBuilder</a> and <a href="https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalTransformRegistrar.java">ExternalTransformRegistrar</a>. The <code>Extern [...] <p><strong>Implementing the interfaces</strong></p> <ol> <li> -<p>Define a Builder class for your transform that implements the <code>ExternalTransformBuilder</code> interface and overrides the <code>buildExternal</code> method that will be used to build your transform object. Initial configuration values for your transform should be defined in the <code>buildExternal</code> method. In most cases, it is convenient to make the Java transform builder class implement <code>ExternalTransformBuilder</code>.</p> +<p>Define a Builder class for your transform that implements the <code>ExternalTransformBuilder</code> interface and overrides the <code>buildExternal</code> method that will be used to build your transform object. Initial configuration values for your transform should be defined in the <code>buildExternal</code> method. In most cases, it&rsquo;s convenient to make the Java transform builder class implement <code>ExternalTransformBuilder</code>.</p> <blockquote> <p><strong>Note:</strong> <code>ExternalTransformBuilder</code> requires you to define a configuration object (a simple POJO) to capture a set of parameters sent by external SDKs to initiate the Java transform. Usually these parameters directly map to constructor parameters of the Java transform.</p> </blockquote> -<div class="snippet"> -<div class="notebook-skip code-snippet without_switcher"> -<a class="copy" type="button" data-bs-toggle="tooltip" data-bs-placement="bottom" title="Copy to clipboard"> -<img src="/images/copy-icon.svg"/> -</a> -<pre><code>@AutoValue.Builder -abstract static class Builder&lt;K, V&gt; -implements ExternalTransformBuilder&lt;External.Configuration, PBegin, PCollection&lt;KV&lt;K, V&gt;&gt;&gt; { -abstract Builder&lt;K, V&gt; setConsumerConfig(Map&lt;String, Object&gt; config); -abstract Builder&lt;K, V&gt; setTopics(List&lt;String&gt; topics); -/** Remaining property declarations omitted for clarity. */ -abstract Read&lt;K, V&gt; build(); -@Override -public PTransform&lt;PBegin, PCollection&lt;KV&lt;K, V&gt;&gt;&gt; buildExternal( -External.Configuration config) { -setTopics(ImmutableList.copyOf(config.topics)); -/** Remaining property defaults omitted for clarity. */ -} -} -</code></pre> -</div> -</div> -<p>Note that <code>buildExternal</code> method may choose to perform additional operations before setting properties received from external SDKs in the transform. For example, <code>buildExternal</code> method may validates properties available in the configuration object before setting them in the transform.</p> +<div class="highlight"><pre class="chroma"><code class="language-java" data-lang="java"><span class="nd">@AutoValue.Builder</span> +<span class="kd">abstract</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">Builder</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">V</span><span class="o">&gt;</span> +<span class="kd">implements</span> <span class="n">ExternalTransformBuilder</span><span class="o">&lt;</span><span class="n">External</span><span class="o">.</span><span class="na">Configuration</span><span class="o">,</span> <span class="n">PBegin</span><span class="o">,</span> <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span [...] +<span class="kd">abstract</span> <span class="n">Builder</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">V</span><span class="o">&gt;</span> <span class="nf">setConsumerConfig</span><span class="o">(</span><span class="n">Map</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Object</span>< [...] +<span class="kd">abstract</span> <span class="n">Builder</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">V</span><span class="o">&gt;</span> <span class="nf">setTopics</span><span class="o">(</span><span class="n">List</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">topics</span>< [...] +<span class="cm">/** Remaining property declarations omitted for clarity. */</span> +<span class="kd">abstract</span> <span class="n">Read</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">V</span><span class="o">&gt;</span> <span class="nf">build</span><span class="o">();</span> +<span class="nd">@Override</span> +<span class="kd">public</span> <span class="n">PTransform</span><span class="o">&lt;</span><span class="n">PBegin</span><span class="o">,</span> <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">V</span><span class="o">&gt;&gt;&gt;</span> <span class="nf">bui [...] +<span class="n">External</span><span class="o">.</span><span class="na">Configuration</span> <span class="n">config</span><span class="o">)</span> <span class="o">{</span> +<span class="n">setTopics</span><span class="o">(</span><span class="n">ImmutableList</span><span class="o">.</span><span class="na">copyOf</span><span class="o">(</span><span class="n">config</span><span class="o">.</span><span class="na">topics</span><span class="o">));</span> +<span class="cm">/** Remaining property defaults omitted for clarity. */</span> +<span class="o">}</span> +<span class="o">}</span> +</code></pre></div><p>For complete examples, see <a href="https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/JavaCountBuilder.java">JavaCountBuilder</a> and <a href="https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/JavaPrefixBuilder.java">JavaPrefixBuilder</a>.</p> +<p>Note that the <code>buildExternal</code> method can perform additional operations before setting properties received from external SDKs in the transform. For example, <code>buildExternal</code> can validate properties available in the configuration object before setting them in the transform.</p> </li> <li> <p>Register the transform as an external cross-language transform by defining a class that implements <code>ExternalTransformRegistrar</code>. You must annotate your class with the <code>AutoService</code> annotation to ensure that your transform is registered and instantiated properly by the expansion service.</p> @@ -11744,42 +11721,35 @@ setTopics(ImmutableList.copyOf(config.topics)); <li> <p>From within your registrar class, define a configuration class for the parameters used during the initialization of your transform by the external SDK.</p> <p>The following example from the KafkaIO transform shows how to implement steps two through four:</p> -<div class="snippet"> -<div class="notebook-skip code-snippet without_switcher"> -<a class="copy" type="button" data-bs-toggle="tooltip" data-bs-placement="bottom" title="Copy to clipboard"> -<img src="/images/copy-icon.svg"/> -</a> -<pre><code>@AutoService(ExternalTransformRegistrar.class) -public static class External implements ExternalTransformRegistrar { -public static final String URN = &#34;beam:external:java:kafka:read:v1&#34;; -@Override -public Map&lt;String, Class&lt;? extends ExternalTransformBuilder&lt;?, ?, ?&gt;&gt;&gt; knownBuilders() { -return ImmutableMap.of( -URN, -(Class&lt;? extends ExternalTransformBuilder&lt;?, ?, ?&gt;&gt;) -(Class&lt;?&gt;) AutoValue_KafkaIO_Read.Builder.class); -} -/** Parameters class to expose the Read transform to an external SDK. */ -public static class Configuration { -private Map&lt;String, String&gt; consumerConfig; -private List&lt;String&gt; topics; -public void setConsumerConfig(Map&lt;String, String&gt; consumerConfig) { -this.consumerConfig = consumerConfig; -} -public void setTopics(List&lt;String&gt; topics) { -this.topics = topics; -} -/** Remaining properties omitted for clarity. */ -} -} -</code></pre> -</div> -</div> +<div class="highlight"><pre class="chroma"><code class="language-java" data-lang="java"><span class="nd">@AutoService</span><span class="o">(</span><span class="n">ExternalTransformRegistrar</span><span class="o">.</span><span class="na">class</span><span class="o">)</span> +<span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">External</span> <span class="kd">implements</span> <span class="n">ExternalTransformRegistrar</span> <span class="o">{</span> +<span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="n">String</span> <span class="n">URN</span> <span class="o">=</span> <span class="s">&#34;beam:external:java:kafka:read:v1&#34;</span><span class="o">;</span> +<span class="nd">@Override</span> +<span class="kd">public</span> <span class="n">Map</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Class</span><span class="o">&lt;?</span> <span class="kd">extends</span> <span class="n">ExternalTransformBuilder</span><span class="o">&lt;?,</span> <span class="o">?,</span> <span class="o">?&gt;&gt;&gt;</span> <span class="n">knownBuild [...] +<span class="k">return</span> <span class="n">ImmutableMap</span><span class="o">.</span><span class="na">of</span><span class="o">(</span> +<span class="n">URN</span><span class="o">,</span> +<span class="o">(</span><span class="n">Class</span><span class="o">&lt;?</span> <span class="kd">extends</span> <span class="n">ExternalTransformBuilder</span><span class="o">&lt;?,</span> <span class="o">?,</span> <span class="o">?&gt;&gt;)</span> +<span class="o">(</span><span class="n">Class</span><span class="o">&lt;?&gt;)</span> <span class="n">AutoValue_KafkaIO_Read</span><span class="o">.</span><span class="na">Builder</span><span class="o">.</span><span class="na">class</span><span class="o">);</span> +<span class="o">}</span> +<span class="cm">/** Parameters class to expose the Read transform to an external SDK. */</span> +<span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">Configuration</span> <span class="o">{</span> +<span class="kd">private</span> <span class="n">Map</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">consumerConfig</span><span class="o">;</span> +<span class="kd">private</span> <span class="n">List</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">topics</span><span class="o">;</span> +<span class="kd">public</span> <span class="kt">void</span> <span class="nf">setConsumerConfig</span><span class="o">(</span><span class="n">Map</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">consumerConfig</span><span class="o">)</span> <span class="o">{</span> +<span class="k">this</span><span class="o">.</span><span class="na">consumerConfig</span> <span class="o">=</span> <span class="n">consumerConfig</span><span class="o">;</span> +<span class="o">}</span> +<span class="kd">public</span> <span class="kt">void</span> <span class="nf">setTopics</span><span class="o">(</span><span class="n">List</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">topics</span><span class="o">)</span> <span class="o">{</span> +<span class="k">this</span><span class="o">.</span><span class="na">topics</span> <span class="o">=</span> <span class="n">topics</span><span class="o">;</span> +<span class="o">}</span> +<span class="cm">/** Remaining properties omitted for clarity. */</span> +<span class="o">}</span> +<span class="o">}</span> +</code></pre></div><p>For additional examples, see <a href="https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/JavaCountRegistrar.java">JavaCountRegistrar</a> and <a href="https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/JavaPrefixRegistrar.java">JavaPrefixRegistrar</a>.</p> </li> </ol> <p>After you have implemented the <code>ExternalTransformBuilder</code> and <code>ExternalTransformRegistrar</code> interfaces, your transform can be registered and created successfully by the default Java expansion service.</p> <p><strong>Starting the expansion service</strong></p> -<p>An expansion service can be used with multiple transforms in the same pipeline. Java has a default expansion service included and available in the Apache Beam Java SDK for you to use with your Java transforms. You can write your own expansion service, but that is generally not needed, so it is not covered in this section.</p> +<p>You can use an expansion service with multiple transforms in the same pipeline. The Beam Java SDK provides a default expansion service for Java transforms. You can also write your own expansion service, but that&rsquo;s generally not needed, so it&rsquo;s not covered in this section.</p> <p>Perform the following to start up a Java expansion service directly:</p> <div class="snippet"> <div class="notebook-skip code-snippet without_switcher"> @@ -11792,53 +11762,37 @@ $ jar -jar /path/to/expansion_service.jar &lt;PORT_NUMBER&gt;</code>& </div> </div> <p>The expansion service is now ready to serve transforms on the specified port.</p> -<p>When creating SDK-specific wrappers for your transform, SDKs may provide utilities that are readily available for easily starting up an expansion service. For example, the Python SDK provides the utilities <a href="https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.JavaJarExpansionService"><code>JavaJarExpansionService</code></a> and <a href="https://beam.apache.org/releases/pydoc/current/apache_beam.tr [...] +<p>When creating SDK-specific wrappers for your transform, you may be able to use SDK-provided utilities to start up an expansion service. For example, the Python SDK provides the utilities <a href="https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.JavaJarExpansionService"><code>JavaJarExpansionService</code></a> and <a href="https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.h [...] <p><strong>Including dependencies</strong></p> <p>If your transform requires external libraries, you can include them by adding them to the classpath of the expansion service. After they are included in the classpath, they will be staged when your transform is expanded by the expansion service.</p> <p><strong>Writing SDK-specific wrappers</strong></p> -<p>Your cross-language Java transform can be called through the lower-level <a href="https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.ExternalTransform"><code>ExternalTransform</code></a> class in a multi-language pipeline (as described in the next section); however, if possible, you should create a SDK-specific wrapper written in the programming language of the pipeline (such as Python) to access the trans [...] +<p>Your cross-language Java transform can be called through the lower-level <a href="https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.ExternalTransform"><code>ExternalTransform</code></a> class in a multi-language pipeline (as described in the next section); however, if possible, you should write an SDK-specific wrapper in the language of the pipeline (such as Python) to access the transform instead. This h [...] <p>To create an SDK wrapper for use in a Python pipeline, do the following:</p> <ol> <li> <p>Create a Python module for your cross-language transform(s).</p> </li> <li> -<p>In the module, build the payload that should be used to initiate the cross-language transform expansion request using one of the available <a href="https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.PayloadBuilder"><code>PayloadBuilder</code></a> classes.</p> +<p>In the module, use one of the <a href="https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.PayloadBuilder"><code>PayloadBuilder</code></a> classes to build the payload for the initial cross-language transform expansion request.</p> <p>The parameter names and types of the payload should map to parameter names and types of the configuration POJO provided to the Java <code>ExternalTransformBuilder</code>. Parameter types are mapped across SDKs using a <a href="https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/schema.proto">Beam schema</a>. Parameter names are mapped by simply converting Python underscore-separated variable names to camel-case (Java standard).</p> -<p>In the following example, <a href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py">kafka.py</a> uses <code>NamedTupleBasedPayloadBuilder</code> to build the payload. The parameters map to the Java <a href="https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java">KafkaIO.External.Configuration</a> config object defined previously in the <strong>Implementing the interfac [...] -<div class="snippet"> -<div class="notebook-skip code-snippet without_switcher"> -<a class="copy" type="button" data-bs-toggle="tooltip" data-bs-placement="bottom" title="Copy to clipboard"> -<img src="/images/copy-icon.svg"/> -</a> -<pre><code>class ReadFromKafkaSchema(typing.NamedTuple): -consumer_config: typing.Mapping[str, str] -topics: typing.List[str] -# Other properties omitted for clarity. -payload = NamedTupleBasedPayloadBuilder(ReadFromKafkaSchema(...)) -</code></pre> -</div> -</div> -</li> +<p>In the following example, <a href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py">kafka.py</a> uses <code>NamedTupleBasedPayloadBuilder</code> to build the payload. The parameters map to the Java <a href="https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java">KafkaIO.External.Configuration</a> config object defined in the previous section.</p> +<div class="highlight"><pre class="chroma"><code class="language-py" data-lang="py"><span class="k">class</span> <span class="nc">ReadFromKafkaSchema</span><span class="p">(</span><span class="n">typing</span><span class="o">.</span><span class="n">NamedTuple</span><span class="p">):</span> +<span class="n">consumer_config</span><span class="p">:</span> <span class="n">typing</span><span class="o">.</span><span class="n">Mapping</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="nb">str</span><span class="p">]</span> +<span class="n">topics</span><span class="p">:</span> <span class="n">typing</span><span class="o">.</span><span class="n">List</span><span class="p">[</span><span class="nb">str</span><span class="p">]</span> +<span class="c1"># Other properties omitted for clarity.</span> +<span class="n">payload</span> <span class="o">=</span> <span class="n">NamedTupleBasedPayloadBuilder</span><span class="p">(</span><span class="n">ReadFromKafkaSchema</span><span class="p">(</span><span class="o">...</span><span class="p">))</span> +</code></pre></div></li> <li> -<p>Start an expansion service unless one is specified by the pipeline creator. The Apache Beam Python SDK provides utilities <a href="https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.JavaJarExpansionService"><code>JavaJarExpansionService</code></a> and <a href="https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.BeamJarExpansionService">< [...] -<p>For transforms released with Beam do the following:</p> +<p>Start an expansion service, unless one is specified by the pipeline creator. The Beam Python SDK provides the utilities <a href="https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.JavaJarExpansionService"><code>JavaJarExpansionService</code></a> and <a href="https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.BeamJarExpansionService"><co [...] +<p>For transforms released with Beam, do the following:</p> <ol> <li> -<p>Add a Gradle target to Beam that can be used to build a shaded expansion service JAR for the target Java transform. This target should produce a Beam JAR that contains all dependencies needed for expanding the Java transform and the JAR should be released with Beam. Note that you might be able to use one of the existing Gradle target that offer an aggregated version of an expansion service jar (for example, for all GCP IO).</p> +<p>Add a Gradle target to Beam that can be used to build a shaded expansion service JAR for the target Java transform. This target should produce a Beam JAR that contains all dependencies needed for expanding the Java transform, and the JAR should be released with Beam. You might be able to use an existing Gradle target that offers an aggregated version of an expansion service JAR (for example, for all GCP IO).</p> </li> <li> <p>In your Python module, instantiate <code>BeamJarExpansionService</code> with the Gradle target.</p> -<div class="snippet"> -<div class="notebook-skip code-snippet without_switcher"> -<a class="copy" type="button" data-bs-toggle="tooltip" data-bs-placement="bottom" title="Copy to clipboard"> -<img src="/images/copy-icon.svg"/> -</a> -<pre><code> expansion_service = BeamJarExpansionService(&#39;sdks:java:io:expansion-service:shadowJar&#39;) -</code></pre> -</div> -</div> -</li> +<div class="highlight"><pre class="chroma"><code class="language-py" data-lang="py"><span class="n">expansion_service</span> <span class="o">=</span> <span class="n">BeamJarExpansionService</span><span class="p">(</span><span class="s1">&#39;sdks:java:io:expansion-service:shadowJar&#39;</span><span class="p">)</span> +</code></pre></div></li> </ol> </li> <li> @@ -11851,75 +11805,35 @@ payload = NamedTupleBasedPayloadBuilder(ReadFromKafkaSchema(...)) <ol> <li> <p>Define a Uniform Resource Name (URN) for your transform. The URN must be a unique string that identifies your transform with the expansion service.</p> -<div class="snippet"> -<div class="notebook-skip code-snippet without_switcher"> -<a class="copy" type="button" data-bs-toggle="tooltip" data-bs-placement="bottom" title="Copy to clipboard"> -<img src="/images/copy-icon.svg"/> -</a> -<pre><code>TEST_COMPK_URN = &#34;beam:transforms:xlang:test:compk&#34; -</code></pre> -</div> -</div> -</li> +<div class="highlight"><pre class="chroma"><code class="language-py" data-lang="py"><span class="n">TEST_COMPK_URN</span> <span class="o">=</span> <span class="s2">&#34;beam:transforms:xlang:test:compk&#34;</span> +</code></pre></div></li> <li> <p>For an existing Python transform, create a new class to register the URN with the Python expansion service.</p> -<div class="snippet"> -<div class="notebook-skip code-snippet without_switcher"> -<a class="copy" type="button" data-bs-toggle="tooltip" data-bs-placement="bottom" title="Copy to clipboard"> -<img src="/images/copy-icon.svg"/> -</a> -<pre><code>@ptransform.PTransform.register_urn(TEST_COMPK_URN, None) -class CombinePerKeyTransform(ptransform.PTransform): -</code></pre> -</div> -</div> -</li> +<div class="highlight"><pre class="chroma"><code class="language-py" data-lang="py"><span class="nd">@ptransform.PTransform.register_urn</span><span class="p">(</span><span class="n">TEST_COMPK_URN</span><span class="p">,</span> <span class="bp">None</span><span class="p">)</span> +<span class="k">class</span> <span class="nc">CombinePerKeyTransform</span><span class="p">(</span><span class="n">ptransform</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span> +</code></pre></div></li> <li> <p>From within the class, define an expand method that takes an input PCollection, runs the Python transform, and then returns the output PCollection.</p> -<div class="snippet"> -<div class="notebook-skip code-snippet without_switcher"> -<a class="copy" type="button" data-bs-toggle="tooltip" data-bs-placement="bottom" title="Copy to clipboard"> -<img src="/images/copy-icon.svg"/> -</a> -<pre><code>def expand(self, pcoll): -return pcoll \ -| beam.CombinePerKey(sum).with_output_types( -typing.Tuple[unicode, int]) -</code></pre> -</div> -</div> -</li> +<div class="highlight"><pre class="chroma"><code class="language-py" data-lang="py"><span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span> +<span class="k">return</span> <span class="n">pcoll</span> \ +<span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">CombinePerKey</span><span class="p">(</span><span class="nb">sum</span><span class="p">)</span><span class="o">.</span><span class="n">with_output_types</span><span class="p">(</span> +<span class="n">typing</span><span class="o">.</span><span class="n">Tuple</span><span class="p">[</span><span class="nb">unicode</span><span class="p">,</span> <span class="nb">int</span><span class="p">])</span> +</code></pre></div></li> <li> <p>As with other Python transforms, define a <code>to_runner_api_parameter</code> method that returns the URN.</p> -<div class="snippet"> -<div class="notebook-skip code-snippet without_switcher"> -<a class="copy" type="button" data-bs-toggle="tooltip" data-bs-placement="bottom" title="Copy to clipboard"> -<img src="/images/copy-icon.svg"/> -</a> -<pre><code>def to_runner_api_parameter(self, unused_context): -return TEST_COMPK_URN, None -</code></pre> -</div> -</div> -</li> +<div class="highlight"><pre class="chroma"><code class="language-py" data-lang="py"><span class="k">def</span> <span class="nf">to_runner_api_parameter</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">unused_context</span><span class="p">):</span> +<span class="k">return</span> <span class="n">TEST_COMPK_URN</span><span class="p">,</span> <span class="bp">None</span> +</code></pre></div></li> <li> <p>Define a static <code>from_runner_api_parameter</code> method that returns an instantiation of the cross-language Python transform.</p> -<div class="snippet"> -<div class="notebook-skip code-snippet without_switcher"> -<a class="copy" type="button" data-bs-toggle="tooltip" data-bs-placement="bottom" title="Copy to clipboard"> -<img src="/images/copy-icon.svg"/> -</a> -<pre><code>@staticmethod -def from_runner_api_parameter( -unused_ptransform, unused_parameter, unused_context): -return CombinePerKeyTransform() -</code></pre> -</div> -</div> -</li> +<div class="highlight"><pre class="chroma"><code class="language-py" data-lang="py"><span class="nd">@staticmethod</span> +<span class="k">def</span> <span class="nf">from_runner_api_parameter</span><span class="p">(</span> +<span class="n">unused_ptransform</span><span class="p">,</span> <span class="n">unused_parameter</span><span class="p">,</span> <span class="n">unused_context</span><span class="p">):</span> +<span class="k">return</span> <span class="n">CombinePerKeyTransform</span><span class="p">()</span> +</code></pre></div></li> </ol> <p><strong>Starting the expansion service</strong></p> -<p>An expansion service can be used with multiple transforms in the same pipeline. Python has a default expansion service included and available in the Apache Beam Python SDK for you to use with your Python transforms. You are free to write your own expansion service, but that is generally not needed, so it is not covered in this section.</p> +<p>An expansion service can be used with multiple transforms in the same pipeline. The Beam Python SDK provides a default expansion service for you to use with your Python transforms. You are free to write your own expansion service, but that is generally not needed, so it is not covered in this section.</p> <p>Perform the following steps to start up the default Python expansion service directly:</p> <ol> <li> @@ -11950,25 +11864,25 @@ return CombinePerKeyTransform() </div> </li> <li> -<p>This expansion service is not ready to serve up transforms on the address <code>localhost:$PORT_FOR_EXPANSION_SERVICE</code>.</p> +<p>This expansion service is now ready to serve up transforms on the address <code>localhost:$PORT_FOR_EXPANSION_SERVICE</code>.</p> </li> </ol> <p><strong>Including dependencies</strong></p> -<p>Currently Python external transforms are limited to dependencies available in core Beam SDK Harness.</p> +<p>Currently Python external transforms are limited to dependencies available in the core Beam SDK harness.</p> <h4 id="1313-creating-cross-language-go-transforms">13.1.3. Creating cross-language Go transforms</h4> <p>Go currently does not support creating cross-language transforms, only using cross-language transforms from other languages; see more at <a href="https://issues.apache.org/jira/browse/BEAM-9923">BEAM-9923</a>.</p> -<h4 id="1314-selecting-a-urn-for-cross-language-transforms">13.1.4. Selecting a URN for Cross-language Transforms</h4> +<h4 id="1314-defining-a-urn">13.1.4. Defining a URN</h4> <p>Developing a cross-language transform involves defining a URN for registering the transform with an expansion service. In this section we provide a convention for defining such URNs. Following this convention is optional but it will ensure that your transform will not run into conflicts when registering in an expansion service along with transforms developed by other developers.</p> -<h5 id="schema">Schema</h5> +<h5 id="13141-schema">13.1.4.1. Schema</h5> <p>A URN should consist of the following components:</p> <ul> -<li>ns-id: A namespace identifier. Default recommendation is <code>beam:transform</code>.</li> -<li>org-identifier: Identifies the organization where the transform was defined. Transforms defined in Apache Beam use <code>org.apache.beam</code> for this.</li> -<li>functionality-identifier - Identifies the functionality of the cross-language transform.</li> -<li>version - a version number for the transform</li> +<li><strong>ns-id</strong>: A namespace identifier. Default recommendation is <code>beam:transform</code>.</li> +<li><strong>org-identifier</strong>: Identifies the organization where the transform was defined. Transforms defined in Apache Beam use <code>org.apache.beam</code> for this.</li> +<li><strong>functionality-identifier</strong>: Identifies the functionality of the cross-language transform.</li> +<li><strong>version</strong>: a version number for the transform.</li> </ul> <p>We provide the schema from the URN convention in <a href="https://en.wikipedia.org/wiki/Augmented_Backus%E2%80%93Naur_form">augmented Backus–Naur</a> form. Keywords in upper case are from the <a href="https://datatracker.ietf.org/doc/html/rfc8141">URN spec</a>.</p> @@ -11985,7 +11899,7 @@ functionality-identifier = 1*id-char version = “v” 1*(DIGIT / “.”) ; For example, ‘v1.2’</code></pre> </div> </div> -<h5 id="examples">Examples</h5> +<h5 id="13142-examples">13.1.4.2. Examples</h5> <p>Below we’ve given some example transform classes and corresponding URNs to be used.</p> <ul> <li>A transform offered with Apache Beam that writes Parquet files. @@ -12011,7 +11925,7 @@ version = “v” 1*(DIGIT / “.”) ; For example, ‘v1.2’</code></pr <p><strong>Using the External class</strong></p> <ol> <li> -<p>Make sure you have any runtime environment dependencies (like JRE) installed on your local machine (either directly on the local machine or available through a container). See the expansion service section for more details.</p> +<p>Make sure you have any runtime environment dependencies (like the JRE) installed on your local machine (either directly on the local machine or available through a container). See the expansion service section for more details.</p> <blockquote> <p><strong>Note:</strong> When including Python transforms from within a Java pipeline, all Python dependencies have to be included in the SDK harness container.</p> </blockquote> @@ -12028,64 +11942,49 @@ version = “v” 1*(DIGIT / “.”) ; For example, ‘v1.2’</code></pr </li> </ol> <h4 id="1322-using-cross-language-transforms-in-a-python-pipeline">13.2.2. Using cross-language transforms in a Python pipeline</h4> -<p>If a Python-specific wrapper for a cross-language transform is available, use that; otherwise, you have to use the lower-level <a href="https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.ExternalTransform"><code>ExternalTransform</code></a> class to access the transform.</p> +<p>If a Python-specific wrapper for a cross-language transform is available, use that. Otherwise, you have to use the lower-level <a href="https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.ExternalTransform"><code>ExternalTransform</code></a> class to access the transform.</p> <p><strong>Using an SDK wrapper</strong></p> -<p>To use a cross-language transform through an SDK wrapper, import the module for the SDK wrapper and call it from your pipeline as shown in the example:</p> -<div class="snippet"> -<div class="notebook-skip code-snippet without_switcher"> -<a class="copy" type="button" data-bs-toggle="tooltip" data-bs-placement="bottom" title="Copy to clipboard"> -<img src="/images/copy-icon.svg"/> -</a> -<pre><code>from apache_beam.io.kafka import ReadFromKafka -kafka_records = ( -pipeline -| &#39;ReadFromKafka&#39; &gt;&gt; ReadFromKafka( -consumer_config={ -&#39;bootstrap.servers&#39;: self.bootstrap_servers, -&#39;auto.offset.reset&#39;: &#39;earliest&#39; -}, -topics=[self.topic], -max_num_records=max_num_records, -expansion_service=&lt;Address of expansion service&gt;)) -</code></pre> -</div> -</div> -<p><strong>Using the ExternalTransform class</strong></p> +<p>To use a cross-language transform through an SDK wrapper, import the module for the SDK wrapper and call it from your pipeline, as shown in the example:</p> +<div class="highlight"><pre class="chroma"><code class="language-py" data-lang="py"><span class="kn">from</span> <span class="nn">apache_beam.io.kafka</span> <span class="kn">import</span> <span class="n">ReadFromKafka</span> +<span class="n">kafka_records</span> <span class="o">=</span> <span class="p">(</span> +<span class="n">pipeline</span> +<span class="o">|</span> <span class="s1">&#39;ReadFromKafka&#39;</span> <span class="o">&gt;&gt;</span> <span class="n">ReadFromKafka</span><span class="p">(</span> +<span class="n">consumer_config</span><span class="o">=</span><span class="p">{</span> +<span class="s1">&#39;bootstrap.servers&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">bootstrap_servers</span><span class="p">,</span> +<span class="s1">&#39;auto.offset.reset&#39;</span><span class="p">:</span> <span class="s1">&#39;earliest&#39;</span> +<span class="p">},</span> +<span class="n">topics</span><span class="o">=</span><span class="p">[</span><span class="bp">self</span><span class="o">.</span><span class="n">topic</span><span class="p">],</span> +<span class="n">max_num_records</span><span class="o">=</span><span class="n">max_num_records</span><span class="p">,</span> +<span class="n">expansion_service</span><span class="o">=&lt;</span><span class="n">Address</span> <span class="n">of</span> <span class="n">expansion</span> <span class="n">service</span><span class="o">&gt;</span><span class="p">))</span> +</code></pre></div><p><strong>Using the ExternalTransform class</strong></p> <p>When an SDK-specific wrapper isn&rsquo;t available, you will have to access the cross-language transform through the <code>ExternalTransform</code> class.</p> <ol> <li> -<p>Make sure you have any runtime environment dependencies (like JRE) installed on your local machine. See the expansion service section for more details.</p> +<p>Make sure you have any runtime environment dependencies (like the JRE) installed on your local machine. See the expansion service section for more details.</p> </li> <li> <p>Start up the expansion service for the SDK that is in the language of the transform you&rsquo;re trying to consume, if not available.</p> <p>Make sure the transform you&rsquo;re trying to use is available and can be used by the expansion service. For Java, make sure the builder and registrar for the transform are available in the classpath of the expansion service.</p> </li> <li> -<p>Include <code>ExternalTransform</code> when instantiating your pipeline. Reference the URN, Payload, and expansion service. You can use one of the available <a href="https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.PayloadBuilder"><code>PayloadBuilder</code></a> classes to build the payload for <code>ExternalTransform</code>.</p> -<div class="snippet"> -<div class="notebook-skip code-snippet without_switcher"> -<a class="copy" type="button" data-bs-toggle="tooltip" data-bs-placement="bottom" title="Copy to clipboard"> -<img src="/images/copy-icon.svg"/> -</a> -<pre><code>with pipeline as p: -res = ( -p -| beam.Create([&#39;a&#39;, &#39;b&#39;]).with_output_types(unicode) -| beam.ExternalTransform( -TEST_PREFIX_URN, -ImplicitSchemaPayloadBuilder({&#39;data&#39;: u&#39;0&#39;}), -&lt;Address of expansion service&gt;)) -assert_that(res, equal_to([&#39;0a&#39;, &#39;0b&#39;])) -</code></pre> -</div> -</div> +<p>Include <code>ExternalTransform</code> when instantiating your pipeline. Reference the URN, payload, and expansion service. You can use one of the available <a href="https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.PayloadBuilder"><code>PayloadBuilder</code></a> classes to build the payload for <code>ExternalTransform</code>.</p> +<div class="highlight"><pre class="chroma"><code class="language-py" data-lang="py"><span class="k">with</span> <span class="n">pipeline</span> <span class="k">as</span> <span class="n">p</span><span class="p">:</span> +<span class="n">res</span> <span class="o">=</span> <span class="p">(</span> +<span class="n">p</span> +<span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Create</span><span class="p">([</span><span class="s1">&#39;a&#39;</span><span class="p">,</span> <span class="s1">&#39;b&#39;</span><span class="p">])</span><span class="o">.</span><span class="n">with_output_types</span><span class="p">(</span><span class="nb">unicode</span><span class="p">)</span> +<span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ExternalTransform</span><span class="p">(</span> +<span class="n">TEST_PREFIX_URN</span><span class="p">,</span> +<span class="n">ImplicitSchemaPayloadBuilder</span><span class="p">({</span><span class="s1">&#39;data&#39;</span><span class="p">:</span> <span class="sa">u</span><span class="s1">&#39;0&#39;</span><span class="p">}),</span> +<span class="o">&lt;</span><span class="n">Address</span> <span class="n">of</span> <span class="n">expansion</span> <span class="n">service</span><span class="o">&gt;</span><span class="p">))</span> +<span class="n">assert_that</span><span class="p">(</span><span class="n">res</span><span class="p">,</span> <span class="n">equal_to</span><span class="p">([</span><span class="s1">&#39;0a&#39;</span><span class="p">,</span> <span class="s1">&#39;0b&#39;</span><span class="p">]))</span> +</code></pre></div><p>For additional examples, see <a href="https://github.com/apache/beam/blob/master/examples/multi-language/python/addprefix.py">addprefix.py</a> and <a href="https://github.com/apache/beam/blob/master/examples/multi-language/python/javacount.py">javacount.py</a>.</p> </li> <li> -<p>After the job has been submitted to the Beam runner, shutdown the expansion service by terminating the expansion service process.</p> +<p>After the job has been submitted to the Beam runner, shut down the expansion service by terminating the expansion service process.</p> </li> </ol> <h4 id="1323-using-cross-language-transforms-in-a-go-pipeline">13.2.3. Using cross-language transforms in a Go pipeline</h4> -<p>If a Go-specific wrapper for a cross-language is available, use that; otherwise, you have to use the +<p>If a Go-specific wrapper for a cross-language is available, use that. Otherwise, you have to use the lower-level <a href="https://pkg.go.dev/github.com/apache/beam/sdks/go/pkg/beam#CrossLanguage">CrossLanguage</a> function to access the transform.</p> <p><strong>Expansion Services</strong></p> @@ -12096,25 +11995,18 @@ machine and ensure they are accessible to your code during pipeline construction <p><strong>Using an SDK wrapper</strong></p> <p>To use a cross-language transform through an SDK wrapper, import the package for the SDK wrapper and call it from your pipeline as shown in the example:</p> -<div class="snippet"> -<div class="notebook-skip code-snippet without_switcher"> -<a class="copy" type="button" data-bs-toggle="tooltip" data-bs-placement="bottom" title="Copy to clipboard"> -<img src="/images/copy-icon.svg"/> -</a> -<pre><code>import ( -&#34;github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/kafkaio&#34; -) -// Kafka Read using previously defined values. -kafkaRecords := kafkaio.Read( -s, -expansionAddr, // Address of expansion service. -bootstrapAddr, -[]string{topicName}, -kafkaio.MaxNumRecords(numRecords), -kafkaio.ConsumerConfigs(map[string]string{&#34;auto.offset.reset&#34;: &#34;earliest&#34;}))</code></pre> -</div> -</div> -<p><strong>Using the CrossLanguage function</strong></p> +<div class="highlight"><pre class="chroma"><code class="language-go" data-lang="go"><span class="kn">import</span> <span class="p">(</span> +<span class="s">&#34;github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/kafkaio&#34;</span> +<span class="p">)</span> +<span class="c1">// Kafka Read using previously defined values. +</span><span class="c1"></span><span class="nx">kafkaRecords</span> <span class="o">:=</span> <span class="nx">kafkaio</span><span class="p">.</span><span class="nf">Read</span><span class="p">(</span> +<span class="nx">s</span><span class="p">,</span> +<span class="nx">expansionAddr</span><span class="p">,</span> <span class="c1">// Address of expansion service. +</span><span class="c1"></span> <span class="nx">bootstrapAddr</span><span class="p">,</span> +<span class="p">[]</span><span class="kt">string</span><span class="p">{</span><span class="nx">topicName</span><span class="p">},</span> +<span class="nx">kafkaio</span><span class="p">.</span><span class="nf">MaxNumRecords</span><span class="p">(</span><span class="nx">numRecords</span><span class="p">),</span> +<span class="nx">kafkaio</span><span class="p">.</span><span class="nf">ConsumerConfigs</span><span class="p">(</span><span class="kd">map</span><span class="p">[</span><span class="kt">string</span><span class="p">]</span><span class="kt">string</span><span class="p">{</span><span class="s">&#34;auto.offset.reset&#34;</span><span class="p">:</span> <span class="s">&#34;earliest&#34;</span><s [...] +</code></pre></div><p><strong>Using the CrossLanguage function</strong></p> <p>When an SDK-specific wrapper isn&rsquo;t available, you will have to access the cross-language transform through the <code>beam.CrossLanguage</code> function.</p> <ol> <li> @@ -12125,38 +12017,30 @@ kafkaio.ConsumerConfigs(map[string]string{&#34;auto.offset.reset&#34;: & Refer to <a href="#create-x-lang-transforms">Creating cross-language transforms</a> for details.</p> </li> <li> -<p>Use the <code>beam.CrossLanguage</code> function in your pipeline as appropriate. Reference the URN, Payload, +<p>Use the <code>beam.CrossLanguage</code> function in your pipeline as appropriate. Reference the URN, payload, expansion service address, and define inputs and outputs. You can use the <a href="https://pkg.go.dev/github.com/apache/beam/sdks/go/pkg/beam#CrossLanguagePayload">beam.CrossLanguagePayload</a> function as a helper for encoding a payload. You can use the <a href="https://pkg.go.dev/github.com/apache/beam/sdks/go/pkg/beam#UnnamedInput">beam.UnnamedInput</a> and <a href="https://pkg.go.dev/github.com/apache/beam/sdks/go/pkg/beam#UnnamedOutput">beam.UnnamedOutput</a> functions as shortcuts for single, unnamed inputs/outputs or define a map for named ones.</p> -<div class="snippet"> -<div class="notebook-skip code-snippet without_switcher"> -<a class="copy" type="button" data-bs-toggle="tooltip" data-bs-placement="bottom" title="Copy to clipboard"> -<img src="/images/copy-icon.svg"/> -</a> -<pre><code>type prefixPayload struct { -Data string `beam:&#34;data&#34;` -} -urn := &#34;beam:transforms:xlang:test:prefix&#34; -payload := beam.CrossLanguagePayload(prefixPayload{Data: prefix}) -expansionAddr := &#34;localhost:8097&#34; -outT := beam.UnnamedOutput(typex.New(reflectx.String)) -res := beam.CrossLanguage(s, urn, payload, expansionAddr, beam.UnnamedInput(inputPCol), outT) -</code></pre> -</div> -</div> -</li> +<div class="highlight"><pre class="chroma"><code class="language-go" data-lang="go"><span class="kd">type</span> <span class="nx">prefixPayload</span> <span class="kd">struct</span> <span class="p">{</span> +<span class="nx">Data</span> <span class="kt">string</span> <span class="s">`beam:&#34;data&#34;`</span> +<span class="p">}</span> +<span class="nx">urn</span> <span class="o">:=</span> <span class="s">&#34;beam:transforms:xlang:test:prefix&#34;</span> +<span class="nx">payload</span> <span class="o">:=</span> <span class="nx">beam</span><span class="p">.</span><span class="nf">CrossLanguagePayload</span><span class="p">(</span><span class="nx">prefixPayload</span><span class="p">{</span><span class="nx">Data</span><span class="p">:</span> <span class="nx">prefix</span><span class="p">})</span> +<span class="nx">expansionAddr</span> <span class="o">:=</span> <span class="s">&#34;localhost:8097&#34;</span> +<span class="nx">outT</span> <span class="o">:=</span> <span class="nx">beam</span><span class="p">.</span><span class="nf">UnnamedOutput</span><span class="p">(</span><span class="nx">typex</span><span class="p">.</span><span class="nf">New</span><span class="p">(</span><span class="nx">reflectx</span><span class="p">.</span><span class="nx">String</span><span class="p">))</span> +<span class="nx">res</span> <span class="o">:=</span> <span class="nx">beam</span><span class="p">.</span><span class="nf">CrossLanguage</span><span class="p">(</span><span class="nx">s</span><span class="p">,</span> <span class="nx">urn</span><span class="p">,</span> <span class="nx">payload</span><span class="p">,</span> <span class="nx">expansionAddr</span><span class="p">,</span> <span class="nx">b [...] +</code></pre></div></li> <li> <p>After the job has been submitted to the Beam runner, shutdown the expansion service by terminating the expansion service process.</p> </li> </ol> <h3 id="x-lang-transform-runner-support">13.3. Runner Support</h3> -<p>Currently, portable runners such as Flink, Spark, and the Direct runner can be used with multi-language pipelines.</p> -<p>Google Cloud Dataflow supports multi-language pipelines through the Dataflow Runner v2 backend architecture.</p></description></item><item><title>Documentation: BigQuery ML integration</title><link>/documentation/patterns/bqml/</link><pubDate>Mon, 01 Jan 0001 00:00:00 +0000</pubDate><guid>/documentation/patterns/bqml/</guid><description> +<p>Currently, portable runners such as Flink, Spark, and the direct runner can be used with multi-language pipelines.</p> +<p>Dataflow supports multi-language pipelines through the Dataflow Runner v2 backend architecture.</p></description></item><item><title>Documentation: BigQuery ML integration</title><link>/documentation/patterns/bqml/</link><pubDate>Mon, 01 Jan 0001 00:00:00 +0000</pubDate><guid>/documentation/patterns/bqml/</guid><description> <!-- Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/website/generated-content/documentation/programming-guide/index.html b/website/generated-content/documentation/programming-guide/index.html index 641d360..4d18f5c 100644 --- a/website/generated-content/documentation/programming-guide/index.html +++ b/website/generated-content/documentation/programming-guide/index.html @@ -18,7 +18,7 @@ function addPlaceholder(){$('input:text').attr('placeholder',"What are you looking for?");} function endSearch(){var search=document.querySelector(".searchBar");search.classList.add("disappear");var icons=document.querySelector("#iconsBar");icons.classList.remove("disappear");} function blockScroll(){$("body").toggleClass("fixedPosition");} -function openMenu(){addPlaceholder();blockScroll();}</script><div class="clearfix container-main-content"><div class="section-nav closed" data-offset-top=90 data-offset-bottom=500><span class="section-nav-back glyphicon glyphicon-menu-left"></span><nav><ul class=section-nav-list data-section-nav><li><span class=section-nav-list-main-title>Documentation</span></li><li><a href=/documentation>Using the Documentation</a></li><li class=section-nav-item--collapsible><span class=section-nav-lis [...] +function openMenu(){addPlaceholder();blockScroll();}</script><div class="clearfix container-main-content"><div class="section-nav closed" data-offset-top=90 data-offset-bottom=500><span class="section-nav-back glyphicon glyphicon-menu-left"></span><nav><ul class=section-nav-list data-section-nav><li><span class=section-nav-list-main-title>Documentation</span></li><li><a href=/documentation>Using the Documentation</a></li><li class=section-nav-item--collapsible><span class=section-nav-lis [...] Beam SDKs to create data processing pipelines. It provides guidance for using the Beam SDK classes to build and test your pipeline. The programming guide is not intended as an exhaustive reference, but as a language-agnostic, high-level @@ -4132,187 +4132,186 @@ use case.</p><div class="language-java snippet"><div class="notebook-skip code-s <span class=c1># Register callback function for this bundle that performs the side</span> <span class=c1># effect.</span> - <span class=n>bundle_finalizer</span><span class=o>.</span><span class=n>register</span><span class=p>(</span><span class=n>my_callback_func</span><span class=p>)</span></code></pre></div></div></div><div class="language-go snippet"><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre class=chroma><code class=language-go data-lang=go><spa [...] - . . . - - // Following method satisfies the Requirement 1. - // Note that you may also use a class constructor instead of a static method. - public static JavaDataGenerator create(Integer size) { - return new JavaDataGenerator(size); - } - - static class JavaDataGeneratorConfig implements Serializable { - public String prefix; - public long length; - public String suffix; - . . . - } - - // Following method conforms to the Requirement 2 - public JavaDataGenerator withJavaDataGeneratorConfig(JavaDataGeneratorConfig dataConfig) { - return new JavaDataGenerator(this.size, javaDataGeneratorConfig); - } - - . . . -}</code></pre></div></div><p>To use a Java class that conforms to the above requirement from a Python SDK pipeline you may do the following.</p><ul><li>Step 1: create an allowlist file in the <em>yaml</em> format that describes the Java transform classes and methods that will be directly accessed from Python.</li><li>Step 2: start an Expansion Service with the <code>javaClassLookupAllowlistFile</code> option passing path to the allowlist defined in Step 1 as the value.</li><li>Step 3: Us [...] -access Java transforms defined in the allowlist from the Python side.</li></ul><p>Starting with Beam 2.35.0, Step 1 and 2 may be skipped as described in corresponding sections below.</p><h5 id=step-1>Step 1</h5><p>To use this Java transform from Python, you may define an allowlist file in the <em>yaml</em> format. This allowlist lists the class names, -constructor methods, and builder methods that are directly available to be used from the Python side.</p><p>Starting with Beam 2.35.0, you have the option to specify <code>*</code> to the <code>javaClassLookupAllowlistFile</code> option instead of defining an actual allowlist which -denotes that all supported transforms in the classpath of the expansion service may be accessed through the API.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre><code>version: v1 + <span class=n>bundle_finalizer</span><span class=o>.</span><span class=n>register</span><span class=p>(</span><span class=n>my_callback_func</span><span class=p>)</span></code></pre></div></div></div><div class="language-go snippet"><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre class=chroma><code class=language-go data-lang=go><spa [...] + <span class=o>.</span> <span class=o>.</span> <span class=o>.</span> + + <span class=c1>// The following method satisfies requirement 1. +</span><span class=c1></span> <span class=c1>// Note that you could use a class constructor instead of a static method. +</span><span class=c1></span> <span class=kd>public</span> <span class=kd>static</span> <span class=n>JavaDataGenerator</span> <span class=nf>create</span><span class=o>(</span><span class=n>Integer</span> <span class=n>size</span><span class=o>)</span> <span class=o>{</span> + <span class=k>return</span> <span class=k>new</span> <span class=n>JavaDataGenerator</span><span class=o>(</span><span class=n>size</span><span class=o>);</span> + <span class=o>}</span> + + <span class=kd>static</span> <span class=kd>class</span> <span class=nc>JavaDataGeneratorConfig</span> <span class=kd>implements</span> <span class=n>Serializable</span> <span class=o>{</span> + <span class=kd>public</span> <span class=n>String</span> <span class=n>prefix</span><span class=o>;</span> + <span class=kd>public</span> <span class=kt>long</span> <span class=n>length</span><span class=o>;</span> + <span class=kd>public</span> <span class=n>String</span> <span class=n>suffix</span><span class=o>;</span> + <span class=o>.</span> <span class=o>.</span> <span class=o>.</span> + <span class=o>}</span> + + <span class=c1>// The following method conforms to requirement 2. +</span><span class=c1></span> <span class=kd>public</span> <span class=n>JavaDataGenerator</span> <span class=nf>withJavaDataGeneratorConfig</span><span class=o>(</span><span class=n>JavaDataGeneratorConfig</span> <span class=n>dataConfig</span><span class=o>)</span> <span class=o>{</span> + <span class=k>return</span> <span class=k>new</span> <span class=n>JavaDataGenerator</span><span class=o>(</span><span class=k>this</span><span class=o>.</span><span class=na>size</span><span class=o>,</span> <span class=n>javaDataGeneratorConfig</span><span class=o>);</span> + <span class=o>}</span> + + <span class=o>.</span> <span class=o>.</span> <span class=o>.</span> +<span class=o>}</span> +</code></pre></div><p>For a complete example, see <a href=https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/JavaDataGenerator.java>JavaDataGenerator</a>.</p><p>To use a Java class that conforms to the above requirements from a Python SDK pipeline, follow these steps:</p><ol><li>Create a <em>yaml</em> allowlist that describes the Java transform classes and methods that will be directly accessed from Python.</li><li>Star [...] +constructor methods, and builder methods that are directly available to be used from the Python side.</p><p>Starting with Beam 2.35.0, you have the option to pass <code>*</code> to the <code>javaClassLookupAllowlistFile</code> option instead of defining an actual allowlist. The <code>*</code> specifies that all supported transforms in the classpath of the expansion service can be accessed through the API. We encourage using an actual allowlist for production, because allowing clients to [...] allowedClasses: - className: my.beam.transforms.JavaDataGenerator allowedConstructorMethods: - create allowedBuilderMethods: - - withJavaDataGeneratorConfig</code></pre></div></div><h5 id=step-2>Step 2</h5><p>The allowlist is provided as an argument when starting up the Java expansion service. For example, the expansion service can be started -as a local Java process using the following command.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre><code>java -jar <jar file> <port> --javaClassLookupAllowlistFile=<path to the allowlist file></code></pre></div></div><p>Starting with Beam 2.35.0, Beam ``JavaExternalTransform<code>API will automatical [...] -if an expansion service address was not provided.</p><h5 id=step-3>Step 3</h5><p>You can directly use the Java class from your Python pipeline using a stub transform created using the <code>JavaExternalTransform</code> API. This API allows you to construct the transform -using the Java class name and allows you to invoke builder methods to configure the class.</p><p>Constructor and method parameter types are mapped between Python and Java using a Beam Schema. The Schema is auto-generated using the object types -provided on the Python side. If the Java class constructor method or builder method accepts any complex object types, make sure that the Beam Schema + - withJavaDataGeneratorConfig</code></pre></div></div><p><strong>Step 2</strong></p><p>Provide the allowlist as an argument when starting up the Java expansion service. For example, you can start the expansion service +as a local Java process using the following command:</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre><code>java -jar <jar file> <port> --javaClassLookupAllowlistFile=<path to the allowlist file></code></pre></div></div><p>Starting with Beam 2.36.0, the <code>JavaExternalTransform</code> API will automa [...] +provided on the Python side. If the Java class constructor method or builder method accepts any complex object types, make sure that the Beam schema for these objects is registered and available for the Java expansion service. If a schema has not been registered, the Java expansion service will -try to register a schema using <a href=https://beam.apache.org/documentation/programming-guide/#creating-schemas>JavaFieldSchema</a>. In Python arbitrary objects -can be represented using <code>NamedTuple</code>s which will be represented as Beam Rows in the Schema. See below for a Python stub transform that represents the above -mentioned Java transform.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre><code>JavaDataGeneratorConfig = typing.NamedTuple( -'JavaDataGeneratorConfig', [('prefix', str), ('length', int), ('suffix', str)]) -data_config = JavaDataGeneratorConfig(prefix='start', length=20, suffix='end') - -java_transform = JavaExternalTransform( -'my.beam.transforms.JavaDataGenerator', expansion_service='localhost:<port>').create(numpy.int32(100)).withJavaDataGeneratorConfig(data_config)</code></pre></div></div><p>This transform can be used in a Python pipeline along with other Python transforms.</p><h5 id=13112-full-api-for-making-existing-java-transforms-available-to-other-sdks>13.1.1.2 Full API for Making Existing Java Transforms Available to Other SDKs</h5><p>To make your Apache Beam Java SDK transform p [...] -abstract static class Builder<K, V> - implements ExternalTransformBuilder<External.Configuration, PBegin, PCollection<KV<K, V>>> { - abstract Builder<K, V> setConsumerConfig(Map<String, Object> config); - - abstract Builder<K, V> setTopics(List<String> topics); - - /** Remaining property declarations omitted for clarity. */ - - abstract Read<K, V> build(); - - @Override - public PTransform<PBegin, PCollection<KV<K, V>>> buildExternal( - External.Configuration config) { - setTopics(ImmutableList.copyOf(config.topics)); - - /** Remaining property defaults omitted for clarity. */ - } -} - </code></pre></div></div><p>Note that <code>buildExternal</code> method may choose to perform additional operations before setting properties received from external SDKs in the transform. For example, <code>buildExternal</code> method may validates properties available in the configuration object before setting them in the transform.</p></li><li><p>Register the transform as an external cross-language transform by defining a class that implements <code>ExternalTransformRegistrar</code [...] -public static class External implements ExternalTransformRegistrar { - - public static final String URN = "beam:external:java:kafka:read:v1"; - - @Override - public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() { - return ImmutableMap.of( - URN, - (Class<? extends ExternalTransformBuilder<?, ?, ?>>) - (Class<?>) AutoValue_KafkaIO_Read.Builder.class); - } - - /** Parameters class to expose the Read transform to an external SDK. */ - public static class Configuration { - private Map<String, String> consumerConfig; - private List<String> topics; - - public void setConsumerConfig(Map<String, String> consumerConfig) { - this.consumerConfig = consumerConfig; - } - - public void setTopics(List<String> topics) { - this.topics = topics; - } - - /** Remaining properties omitted for clarity. */ - } -} - </code></pre></div></div></li></ol><p>After you have implemented the <code>ExternalTransformBuilder</code> and <code>ExternalTransformRegistrar</code> interfaces, your transform can be registered and created successfully by the default Java expansion service.</p><p><strong>Starting the expansion service</strong></p><p>An expansion service can be used with multiple transforms in the same pipeline. Java has a default expansion service included and available in the Apache Beam Java SDK [...] +try to register a schema using <a href=https://beam.apache.org/documentation/programming-guide/#creating-schemas>JavaFieldSchema</a>. In Python, arbitrary objects +can be represented using <code>NamedTuple</code>s, which will be represented as Beam rows in the schema. Here is a Python stub transform that represents the above +mentioned Java transform:</p><div class=highlight><pre class=chroma><code class=language-py data-lang=py><span class=n>JavaDataGeneratorConfig</span> <span class=o>=</span> <span class=n>typing</span><span class=o>.</span><span class=n>NamedTuple</span><span class=p>(</span> +<span class=s1>'JavaDataGeneratorConfig'</span><span class=p>,</span> <span class=p>[(</span><span class=s1>'prefix'</span><span class=p>,</span> <span class=nb>str</span><span class=p>),</span> <span class=p>(</span><span class=s1>'length'</span><span class=p>,</span> <span class=nb>int</span><span class=p>),</span> <span class=p>(</span><span class=s1>'suffix'</span><span class=p>,</span> <span class=nb>str</span><span class=p>)])</span> +<span class=n>data_config</span> <span class=o>=</span> <span class=n>JavaDataGeneratorConfig</span><span class=p>(</span><span class=n>prefix</span><span class=o>=</span><span class=s1>'start'</span><span class=p>,</span> <span class=n>length</span><span class=o>=</span><span class=mi>20</span><span class=p>,</span> <span class=n>suffix</span><span class=o>=</span><span class=s1>'end'</span><span class=p>)</span> + +<span class=n>java_transform</span> <span class=o>=</span> <span class=n>JavaExternalTransform</span><span class=p>(</span> +<span class=s1>'my.beam.transforms.JavaDataGenerator'</span><span class=p>,</span> <span class=n>expansion_service</span><span class=o>=</span><span class=s1>'localhost:<port>'</span><span class=p>)</span><span class=o>.</span><span class=n>create</span><span class=p>(</span><span class=n>numpy</span><span class=o>.</span><span class=n>int32</span><span class=p>(</span><span class=mi>100</span><span class=p>))</span><span class=o>.</span><span class=n>withJavaDataGe [...] +</code></pre></div><p>You can use this transform in a Python pipeline along with other Python transforms. For a complete example, see <a href=https://github.com/apache/beam/blob/master/examples/multi-language/python/javadatagenerator.py>javadatagenerator.py</a>.</p><h5 id=13112-using-the-api-to-make-existing-java-transforms-available-to-other-sdks>13.1.1.2 Using the API to make existing Java transforms available to other SDKs</h5><p>To make your Beam Java SDK transform portable across SD [...] +<span class=kd>abstract</span> <span class=kd>static</span> <span class=kd>class</span> <span class=nc>Builder</span><span class=o><</span><span class=n>K</span><span class=o>,</span> <span class=n>V</span><span class=o>></span> + <span class=kd>implements</span> <span class=n>ExternalTransformBuilder</span><span class=o><</span><span class=n>External</span><span class=o>.</span><span class=na>Configuration</span><span class=o>,</span> <span class=n>PBegin</span><span class=o>,</span> <span class=n>PCollection</span><span class=o><</span><span class=n>KV</span><span class=o><</span><span class=n>K</span><span class=o>,</span> <span class=n>V</span><span class=o>>>></span> <span class=o>{</span> + <span class=kd>abstract</span> <span class=n>Builder</span><span class=o><</span><span class=n>K</span><span class=o>,</span> <span class=n>V</span><span class=o>></span> <span class=nf>setConsumerConfig</span><span class=o>(</span><span class=n>Map</span><span class=o><</span><span class=n>String</span><span class=o>,</span> <span class=n>Object</span><span class=o>></span> <span class=n>config</span><span class=o>);</span> + + <span class=kd>abstract</span> <span class=n>Builder</span><span class=o><</span><span class=n>K</span><span class=o>,</span> <span class=n>V</span><span class=o>></span> <span class=nf>setTopics</span><span class=o>(</span><span class=n>List</span><span class=o><</span><span class=n>String</span><span class=o>></span> <span class=n>topics</span><span class=o>);</span> + + <span class=cm>/** Remaining property declarations omitted for clarity. */</span> + + <span class=kd>abstract</span> <span class=n>Read</span><span class=o><</span><span class=n>K</span><span class=o>,</span> <span class=n>V</span><span class=o>></span> <span class=nf>build</span><span class=o>();</span> + + <span class=nd>@Override</span> + <span class=kd>public</span> <span class=n>PTransform</span><span class=o><</span><span class=n>PBegin</span><span class=o>,</span> <span class=n>PCollection</span><span class=o><</span><span class=n>KV</span><span class=o><</span><span class=n>K</span><span class=o>,</span> <span class=n>V</span><span class=o>>>></span> <span class=nf>buildExternal</span><span class=o>(</span> + <span class=n>External</span><span class=o>.</span><span class=na>Configuration</span> <span class=n>config</span><span class=o>)</span> <span class=o>{</span> + <span class=n>setTopics</span><span class=o>(</span><span class=n>ImmutableList</span><span class=o>.</span><span class=na>copyOf</span><span class=o>(</span><span class=n>config</span><span class=o>.</span><span class=na>topics</span><span class=o>));</span> + + <span class=cm>/** Remaining property defaults omitted for clarity. */</span> + <span class=o>}</span> +<span class=o>}</span> +</code></pre></div><p>For complete examples, see <a href=https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/JavaCountBuilder.java>JavaCountBuilder</a> and <a href=https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/JavaPrefixBuilder.java>JavaPrefixBuilder</a>.</p><p>Note that the <code>buildExternal</code> method can perform additional operations before [...] +<span class=kd>public</span> <span class=kd>static</span> <span class=kd>class</span> <span class=nc>External</span> <span class=kd>implements</span> <span class=n>ExternalTransformRegistrar</span> <span class=o>{</span> + + <span class=kd>public</span> <span class=kd>static</span> <span class=kd>final</span> <span class=n>String</span> <span class=n>URN</span> <span class=o>=</span> <span class=s>"beam:external:java:kafka:read:v1"</span><span class=o>;</span> + + <span class=nd>@Override</span> + <span class=kd>public</span> <span class=n>Map</span><span class=o><</span><span class=n>String</span><span class=o>,</span> <span class=n>Class</span><span class=o><?</span> <span class=kd>extends</span> <span class=n>ExternalTransformBuilder</span><span class=o><?,</span> <span class=o>?,</span> <span class=o>?>>></span> <span class=n>knownBuilders</span><span class=o>()</span> <span class=o>{</span> + <span class=k>return</span> <span class=n>ImmutableMap</span><span class=o>.</span><span class=na>of</span><span class=o>(</span> + <span class=n>URN</span><span class=o>,</span> + <span class=o>(</span><span class=n>Class</span><span class=o><?</span> <span class=kd>extends</span> <span class=n>ExternalTransformBuilder</span><span class=o><?,</span> <span class=o>?,</span> <span class=o>?>>)</span> + <span class=o>(</span><span class=n>Class</span><span class=o><?>)</span> <span class=n>AutoValue_KafkaIO_Read</span><span class=o>.</span><span class=na>Builder</span><span class=o>.</span><span class=na>class</span><span class=o>);</span> + <span class=o>}</span> + + <span class=cm>/** Parameters class to expose the Read transform to an external SDK. */</span> + <span class=kd>public</span> <span class=kd>static</span> <span class=kd>class</span> <span class=nc>Configuration</span> <span class=o>{</span> + <span class=kd>private</span> <span class=n>Map</span><span class=o><</span><span class=n>String</span><span class=o>,</span> <span class=n>String</span><span class=o>></span> <span class=n>consumerConfig</span><span class=o>;</span> + <span class=kd>private</span> <span class=n>List</span><span class=o><</span><span class=n>String</span><span class=o>></span> <span class=n>topics</span><span class=o>;</span> + + <span class=kd>public</span> <span class=kt>void</span> <span class=nf>setConsumerConfig</span><span class=o>(</span><span class=n>Map</span><span class=o><</span><span class=n>String</span><span class=o>,</span> <span class=n>String</span><span class=o>></span> <span class=n>consumerConfig</span><span class=o>)</span> <span class=o>{</span> + <span class=k>this</span><span class=o>.</span><span class=na>consumerConfig</span> <span class=o>=</span> <span class=n>consumerConfig</span><span class=o>;</span> + <span class=o>}</span> + + <span class=kd>public</span> <span class=kt>void</span> <span class=nf>setTopics</span><span class=o>(</span><span class=n>List</span><span class=o><</span><span class=n>String</span><span class=o>></span> <span class=n>topics</span><span class=o>)</span> <span class=o>{</span> + <span class=k>this</span><span class=o>.</span><span class=na>topics</span> <span class=o>=</span> <span class=n>topics</span><span class=o>;</span> + <span class=o>}</span> + + <span class=cm>/** Remaining properties omitted for clarity. */</span> + <span class=o>}</span> +<span class=o>}</span> +</code></pre></div><p>For additional examples, see <a href=https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/JavaCountRegistrar.java>JavaCountRegistrar</a> and <a href=https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/JavaPrefixRegistrar.java>JavaPrefixRegistrar</a>.</p></li></ol><p>After you have implemented the <code>ExternalTransformBuilder</code> [...] # Start the expansion service at the specified port. -$ jar -jar /path/to/expansion_service.jar <PORT_NUMBER></code></pre></div></div><p>The expansion service is now ready to serve transforms on the specified port.</p><p>When creating SDK-specific wrappers for your transform, SDKs may provide utilities that are readily available for easily starting up an expansion service. For example, the Python SDK provides the utilities <a href=https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transform [...] - consumer_config: typing.Mapping[str, str] - topics: typing.List[str] - # Other properties omitted for clarity. - -payload = NamedTupleBasedPayloadBuilder(ReadFromKafkaSchema(...)) - </code></pre></div></div></li><li><p>Start an expansion service unless one is specified by the pipeline creator. The Apache Beam Python SDK provides utilities <a href=https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.JavaJarExpansionService><code>JavaJarExpansionService</code></a> and <a href=https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.BeamJarExpan [...] - </code></pre></div></div></li></ol></li><li><p>Add a Python wrapper transform class that extends <a href=https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.ExternalTransform><code>ExternalTransform</code></a>. Pass the payload and expansion service defined above as parameters to the constructor of the <a href=https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.e [...] - </code></pre></div></div></li><li><p>For an existing Python transform, create a new class to register the URN with the Python expansion service.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre><code>@ptransform.PTransform.register_urn(TEST_COMPK_URN, None) -class CombinePerKeyTransform(ptransform.PTransform): - </code></pre></div></div></li><li><p>From within the class, define an expand method that takes an input PCollection, runs the Python transform, and then returns the output PCollection.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre><code>def expand(self, pcoll): - return pcoll \ - | beam.CombinePerKey(sum).with_output_types( - typing.Tuple[unicode, int]) - </code></pre></div></div></li><li><p>As with other Python transforms, define a <code>to_runner_api_parameter</code> method that returns the URN.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre><code>def to_runner_api_parameter(self, unused_context): - return TEST_COMPK_URN, None - </code></pre></div></div></li><li><p>Define a static <code>from_runner_api_parameter</code> method that returns an instantiation of the cross-language Python transform.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre><code>@staticmethod -def from_runner_api_parameter( - unused_ptransform, unused_parameter, unused_context): - return CombinePerKeyTransform() - </code></pre></div></div></li></ol><p><strong>Starting the expansion service</strong></p><p>An expansion service can be used with multiple transforms in the same pipeline. Python has a default expansion service included and available in the Apache Beam Python SDK for you to use with your Python transforms. You are free to write your own expansion service, but that is generally not needed, so it is not covered in this section.</p><p>Perform the following steps to start up the default [...] +$ jar -jar /path/to/expansion_service.jar <PORT_NUMBER></code></pre></div></div><p>The expansion service is now ready to serve transforms on the specified port.</p><p>When creating SDK-specific wrappers for your transform, you may be able to use SDK-provided utilities to start up an expansion service. For example, the Python SDK provides the utilities <a href=https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.JavaJarE [...] + <span class=n>consumer_config</span><span class=p>:</span> <span class=n>typing</span><span class=o>.</span><span class=n>Mapping</span><span class=p>[</span><span class=nb>str</span><span class=p>,</span> <span class=nb>str</span><span class=p>]</span> + <span class=n>topics</span><span class=p>:</span> <span class=n>typing</span><span class=o>.</span><span class=n>List</span><span class=p>[</span><span class=nb>str</span><span class=p>]</span> + <span class=c1># Other properties omitted for clarity.</span> + +<span class=n>payload</span> <span class=o>=</span> <span class=n>NamedTupleBasedPayloadBuilder</span><span class=p>(</span><span class=n>ReadFromKafkaSchema</span><span class=p>(</span><span class=o>...</span><span class=p>))</span> +</code></pre></div></li><li><p>Start an expansion service, unless one is specified by the pipeline creator. The Beam Python SDK provides the utilities <a href=https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.JavaJarExpansionService><code>JavaJarExpansionService</code></a> and <a href=https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.BeamJarExpansionService> [...] +</code></pre></div></li></ol></li><li><p>Add a Python wrapper transform class that extends <a href=https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.ExternalTransform><code>ExternalTransform</code></a>. Pass the payload and expansion service defined above as parameters to the constructor of the <a href=https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.Extern [...] +</code></pre></div></li><li><p>For an existing Python transform, create a new class to register the URN with the Python expansion service.</p><div class=highlight><pre class=chroma><code class=language-py data-lang=py><span class=nd>@ptransform.PTransform.register_urn</span><span class=p>(</span><span class=n>TEST_COMPK_URN</span><span class=p>,</span> <span class=bp>None</span><span class=p>)</span> +<span class=k>class</span> <span class=nc>CombinePerKeyTransform</span><span class=p>(</span><span class=n>ptransform</span><span class=o>.</span><span class=n>PTransform</span><span class=p>):</span> +</code></pre></div></li><li><p>From within the class, define an expand method that takes an input PCollection, runs the Python transform, and then returns the output PCollection.</p><div class=highlight><pre class=chroma><code class=language-py data-lang=py><span class=k>def</span> <span class=nf>expand</span><span class=p>(</span><span class=bp>self</span><span class=p>,</span> <span class=n>pcoll</span><span class=p>):</span> + <span class=k>return</span> <span class=n>pcoll</span> \ + <span class=o>|</span> <span class=n>beam</span><span class=o>.</span><span class=n>CombinePerKey</span><span class=p>(</span><span class=nb>sum</span><span class=p>)</span><span class=o>.</span><span class=n>with_output_types</span><span class=p>(</span> + <span class=n>typing</span><span class=o>.</span><span class=n>Tuple</span><span class=p>[</span><span class=nb>unicode</span><span class=p>,</span> <span class=nb>int</span><span class=p>])</span> +</code></pre></div></li><li><p>As with other Python transforms, define a <code>to_runner_api_parameter</code> method that returns the URN.</p><div class=highlight><pre class=chroma><code class=language-py data-lang=py><span class=k>def</span> <span class=nf>to_runner_api_parameter</span><span class=p>(</span><span class=bp>self</span><span class=p>,</span> <span class=n>unused_context</span><span class=p>):</span> + <span class=k>return</span> <span class=n>TEST_COMPK_URN</span><span class=p>,</span> <span class=bp>None</span> +</code></pre></div></li><li><p>Define a static <code>from_runner_api_parameter</code> method that returns an instantiation of the cross-language Python transform.</p><div class=highlight><pre class=chroma><code class=language-py data-lang=py><span class=nd>@staticmethod</span> +<span class=k>def</span> <span class=nf>from_runner_api_parameter</span><span class=p>(</span> + <span class=n>unused_ptransform</span><span class=p>,</span> <span class=n>unused_parameter</span><span class=p>,</span> <span class=n>unused_context</span><span class=p>):</span> + <span class=k>return</span> <span class=n>CombinePerKeyTransform</span><span class=p>()</span> +</code></pre></div></li></ol><p><strong>Starting the expansion service</strong></p><p>An expansion service can be used with multiple transforms in the same pipeline. The Beam Python SDK provides a default expansion service for you to use with your Python transforms. You are free to write your own expansion service, but that is generally not needed, so it is not covered in this section.</p><p>Perform the following steps to start up the default Python expansion service directly:</p><ol><li [...] </code></pre></div></div></li><li><p>Import any modules that contain transforms to be made available using the expansion service.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre><code>$ python -m apache_beam.runners.portability.expansion_service_test -p $PORT_FOR_EXPANSION_SERVICE - </code></pre></div></div></li><li><p>This expansion service is not ready to serve up transforms on the address <code>localhost:$PORT_FOR_EXPANSION_SERVICE</code>.</p></li></ol><p><strong>Including dependencies</strong></p><p>Currently Python external transforms are limited to dependencies available in core Beam SDK Harness.</p><h4 id=1313-creating-cross-language-go-transforms>13.1.3. Creating cross-language Go transforms</h4><p>Go currently does not support creating cross-language tr [...] -transforms from other languages; see more at <a href=https://issues.apache.org/jira/browse/BEAM-9923>BEAM-9923</a>.</p><h4 id=1314-selecting-a-urn-for-cross-language-transforms>13.1.4. Selecting a URN for Cross-language Transforms</h4><p>Developing a cross-language transform involves defining a URN for registering the transform with an expansion service. In this section + </code></pre></div></div></li><li><p>This expansion service is now ready to serve up transforms on the address <code>localhost:$PORT_FOR_EXPANSION_SERVICE</code>.</p></li></ol><p><strong>Including dependencies</strong></p><p>Currently Python external transforms are limited to dependencies available in the core Beam SDK harness.</p><h4 id=1313-creating-cross-language-go-transforms>13.1.3. Creating cross-language Go transforms</h4><p>Go currently does not support creating cross-languag [...] +transforms from other languages; see more at <a href=https://issues.apache.org/jira/browse/BEAM-9923>BEAM-9923</a>.</p><h4 id=1314-defining-a-urn>13.1.4. Defining a URN</h4><p>Developing a cross-language transform involves defining a URN for registering the transform with an expansion service. In this section we provide a convention for defining such URNs. Following this convention is optional but it will ensure that your transform -will not run into conflicts when registering in an expansion service along with transforms developed by other developers.</p><h5 id=schema>Schema</h5><p>A URN should consist of the following components:</p><ul><li>ns-id: A namespace identifier. Default recommendation is <code>beam:transform</code>.</li><li>org-identifier: Identifies the organization where the transform was defined. Transforms defined in Apache Beam use <code>org.apache.beam</code> for this.</li><li>functionality-identifi [...] +will not run into conflicts when registering in an expansion service along with transforms developed by other developers.</p><h5 id=13141-schema>13.1.4.1. Schema</h5><p>A URN should consist of the following components:</p><ul><li><strong>ns-id</strong>: A namespace identifier. Default recommendation is <code>beam:transform</code>.</li><li><strong>org-identifier</strong>: Identifies the organization where the transform was defined. Transforms defined in Apache Beam use <code>org.apache.be [...] Keywords in upper case are from the <a href=https://datatracker.ietf.org/doc/html/rfc8141>URN spec</a>.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre><code>transform-urn = ns-id “:” org-identifier “:” functionality-identifier “:” version ns-id = (“beam” / NID) “:” “transform” id-char = ALPHA / DIGIT / "-" / "." / "_" / "~" ; A subset of characters allowed in a URN org-identifier = 1*id-char functionality-identifier = 1*id-char -version = “v” 1*(DIGIT / “.”) ; For example, ‘v1.2’</code></pre></div></div><h5 id=examples>Examples</h5><p>Below we’ve given some example transform classes and corresponding URNs to be used.</p><ul><li>A transform offered with Apache Beam that writes Parquet files.<ul><li><code>beam:transform:org.apache.beam:parquet_write:v1</code></li></ul></li><li>A transform offered with Apache Beam that reads from Kafka with metadata.<ul><li><code>beam:transform:org.apache.beam:kafka_read_with_meta [...] - -kafka_records = ( - pipeline - | 'ReadFromKafka' >> ReadFromKafka( - consumer_config={ - 'bootstrap.servers': self.bootstrap_servers, - 'auto.offset.reset': 'earliest' - }, - topics=[self.topic], - max_num_records=max_num_records, - expansion_service=<Address of expansion service>)) - </code></pre></div></div><p><strong>Using the ExternalTransform class</strong></p><p>When an SDK-specific wrapper isn’t available, you will have to access the cross-language transform through the <code>ExternalTransform</code> class.</p><ol><li><p>Make sure you have any runtime environment dependencies (like JRE) installed on your local machine. See the expansion service section for more details.</p></li><li><p>Start up the expansion service for the SDK that is in the language of [...] - res = ( - p - | beam.Create(['a', 'b']).with_output_types(unicode) - | beam.ExternalTransform( - TEST_PREFIX_URN, - ImplicitSchemaPayloadBuilder({'data': u'0'}), - <Address of expansion service>)) - assert_that(res, equal_to(['0a', '0b'])) - </code></pre></div></div></li><li><p>After the job has been submitted to the Beam runner, shutdown the expansion service by terminating the expansion service process.</p></li></ol><h4 id=1323-using-cross-language-transforms-in-a-go-pipeline>13.2.3. Using cross-language transforms in a Go pipeline</h4><p>If a Go-specific wrapper for a cross-language is available, use that; otherwise, you have to use the +version = “v” 1*(DIGIT / “.”) ; For example, ‘v1.2’</code></pre></div></div><h5 id=13142-examples>13.1.4.2. Examples</h5><p>Below we’ve given some example transform classes and corresponding URNs to be used.</p><ul><li>A transform offered with Apache Beam that writes Parquet files.<ul><li><code>beam:transform:org.apache.beam:parquet_write:v1</code></li></ul></li><li>A transform offered with Apache Beam that reads from Kafka with metadata.<ul><li><code>beam:transform:org.apache.beam:kafk [...] + +<span class=n>kafka_records</span> <span class=o>=</span> <span class=p>(</span> + <span class=n>pipeline</span> + <span class=o>|</span> <span class=s1>'ReadFromKafka'</span> <span class=o>>></span> <span class=n>ReadFromKafka</span><span class=p>(</span> + <span class=n>consumer_config</span><span class=o>=</span><span class=p>{</span> + <span class=s1>'bootstrap.servers'</span><span class=p>:</span> <span class=bp>self</span><span class=o>.</span><span class=n>bootstrap_servers</span><span class=p>,</span> + <span class=s1>'auto.offset.reset'</span><span class=p>:</span> <span class=s1>'earliest'</span> + <span class=p>},</span> + <span class=n>topics</span><span class=o>=</span><span class=p>[</span><span class=bp>self</span><span class=o>.</span><span class=n>topic</span><span class=p>],</span> + <span class=n>max_num_records</span><span class=o>=</span><span class=n>max_num_records</span><span class=p>,</span> + <span class=n>expansion_service</span><span class=o>=<</span><span class=n>Address</span> <span class=n>of</span> <span class=n>expansion</span> <span class=n>service</span><span class=o>></span><span class=p>))</span> +</code></pre></div><p><strong>Using the ExternalTransform class</strong></p><p>When an SDK-specific wrapper isn’t available, you will have to access the cross-language transform through the <code>ExternalTransform</code> class.</p><ol><li><p>Make sure you have any runtime environment dependencies (like the JRE) installed on your local machine. See the expansion service section for more details.</p></li><li><p>Start up the expansion service for the SDK that is in the language of the [...] + <span class=n>res</span> <span class=o>=</span> <span class=p>(</span> + <span class=n>p</span> + <span class=o>|</span> <span class=n>beam</span><span class=o>.</span><span class=n>Create</span><span class=p>([</span><span class=s1>'a'</span><span class=p>,</span> <span class=s1>'b'</span><span class=p>])</span><span class=o>.</span><span class=n>with_output_types</span><span class=p>(</span><span class=nb>unicode</span><span class=p>)</span> + <span class=o>|</span> <span class=n>beam</span><span class=o>.</span><span class=n>ExternalTransform</span><span class=p>(</span> + <span class=n>TEST_PREFIX_URN</span><span class=p>,</span> + <span class=n>ImplicitSchemaPayloadBuilder</span><span class=p>({</span><span class=s1>'data'</span><span class=p>:</span> <span class=sa>u</span><span class=s1>'0'</span><span class=p>}),</span> + <span class=o><</span><span class=n>Address</span> <span class=n>of</span> <span class=n>expansion</span> <span class=n>service</span><span class=o>></span><span class=p>))</span> + <span class=n>assert_that</span><span class=p>(</span><span class=n>res</span><span class=p>,</span> <span class=n>equal_to</span><span class=p>([</span><span class=s1>'0a'</span><span class=p>,</span> <span class=s1>'0b'</span><span class=p>]))</span> +</code></pre></div><p>For additional examples, see <a href=https://github.com/apache/beam/blob/master/examples/multi-language/python/addprefix.py>addprefix.py</a> and <a href=https://github.com/apache/beam/blob/master/examples/multi-language/python/javacount.py>javacount.py</a>.</p></li><li><p>After the job has been submitted to the Beam runner, shut down the expansion service by terminating the expansion service process.</p></li></ol><h4 id=1323-using-cross-language-transforms-in-a-go-p [...] lower-level <a href=https://pkg.go.dev/github.com/apache/beam/sdks/go/pkg/beam#CrossLanguage>CrossLanguage</a> function to access the transform.</p><p><strong>Expansion Services</strong></p><p>The Go SDK does not yet support automatically starting an expansion service. In order to use cross-language transforms, you must manually start any necessary expansion services on your local machine and ensure they are accessible to your code during pipeline construction; see more at <a href=https://issues.apache.org/jira/browse/BEAM-12862>BEAM-12862</a>.</p><p><strong>Using an SDK wrapper</strong></p><p>To use a cross-language transform through an SDK wrapper, import the package for the SDK wrapper -and call it from your pipeline as shown in the example:</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre><code>import ( - "github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/kafkaio" -) - -// Kafka Read using previously defined values. -kafkaRecords := kafkaio.Read( - s, - expansionAddr, // Address of expansion service. - bootstrapAddr, - []string{topicName}, - kafkaio.MaxNumRecords(numRecords), - kafkaio.ConsumerConfigs(map[string]string{"auto.offset.reset": "earliest"}))</code></pre></div></div><p><strong>Using the CrossLanguage function</strong></p><p>When an SDK-specific wrapper isn’t available, you will have to access the cross-language transform through the <code>beam.CrossLanguage</code> function.</p><ol><li><p>Make sure you have the appropriate expansion service running. See the expansion service section for details.</p></li><li><p>Make sure the t [...] -Refer to <a href=#create-x-lang-transforms>Creating cross-language transforms</a> for details.</p></li><li><p>Use the <code>beam.CrossLanguage</code> function in your pipeline as appropriate. Reference the URN, Payload, +and call it from your pipeline as shown in the example:</p><div class=highlight><pre class=chroma><code class=language-go data-lang=go><span class=kn>import</span> <span class=p>(</span> + <span class=s>"github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/kafkaio"</span> +<span class=p>)</span> + +<span class=c1>// Kafka Read using previously defined values. +</span><span class=c1></span><span class=nx>kafkaRecords</span> <span class=o>:=</span> <span class=nx>kafkaio</span><span class=p>.</span><span class=nf>Read</span><span class=p>(</span> + <span class=nx>s</span><span class=p>,</span> + <span class=nx>expansionAddr</span><span class=p>,</span> <span class=c1>// Address of expansion service. +</span><span class=c1></span> <span class=nx>bootstrapAddr</span><span class=p>,</span> + <span class=p>[]</span><span class=kt>string</span><span class=p>{</span><span class=nx>topicName</span><span class=p>},</span> + <span class=nx>kafkaio</span><span class=p>.</span><span class=nf>MaxNumRecords</span><span class=p>(</span><span class=nx>numRecords</span><span class=p>),</span> + <span class=nx>kafkaio</span><span class=p>.</span><span class=nf>ConsumerConfigs</span><span class=p>(</span><span class=kd>map</span><span class=p>[</span><span class=kt>string</span><span class=p>]</span><span class=kt>string</span><span class=p>{</span><span class=s>"auto.offset.reset"</span><span class=p>:</span> <span class=s>"earliest"</span><span class=p>}))</span> +</code></pre></div><p><strong>Using the CrossLanguage function</strong></p><p>When an SDK-specific wrapper isn’t available, you will have to access the cross-language transform through the <code>beam.CrossLanguage</code> function.</p><ol><li><p>Make sure you have the appropriate expansion service running. See the expansion service section for details.</p></li><li><p>Make sure the transform you’re trying to use is available and can be used by the expansion service. +Refer to <a href=#create-x-lang-transforms>Creating cross-language transforms</a> for details.</p></li><li><p>Use the <code>beam.CrossLanguage</code> function in your pipeline as appropriate. Reference the URN, payload, expansion service address, and define inputs and outputs. You can use the <a href=https://pkg.go.dev/github.com/apache/beam/sdks/go/pkg/beam#CrossLanguagePayload>beam.CrossLanguagePayload</a> function as a helper for encoding a payload. You can use the <a href=https://pkg.go.dev/github.com/apache/beam/sdks/go/pkg/beam#UnnamedInput>beam.UnnamedInput</a> and <a href=https://pkg.go.dev/github.com/apache/beam/sdks/go/pkg/beam#UnnamedOutput>beam.UnnamedOutput</a> -functions as shortcuts for single, unnamed inputs/outputs or define a map for named ones.</p><div class=snippet><div class="notebook-skip code-snippet without_switcher"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><pre><code>type prefixPayload struct { - Data string `beam:"data"` -} -urn := "beam:transforms:xlang:test:prefix" -payload := beam.CrossLanguagePayload(prefixPayload{Data: prefix}) -expansionAddr := "localhost:8097" -outT := beam.UnnamedOutput(typex.New(reflectx.String)) -res := beam.CrossLanguage(s, urn, payload, expansionAddr, beam.UnnamedInput(inputPCol), outT) - </code></pre></div></div></li><li><p>After the job has been submitted to the Beam runner, shutdown the expansion service by -terminating the expansion service process.</p></li></ol><h3 id=x-lang-transform-runner-support>13.3. Runner Support</h3><p>Currently, portable runners such as Flink, Spark, and the Direct runner can be used with multi-language pipelines.</p><p>Google Cloud Dataflow supports multi-language pipelines through the Dataflow Runner v2 backend architecture.</p><div class=feedback><p class=update>Last updated on 2022/01/20</p><h3>Have you found everything you were looking for?</h3><p class=descr [...] +functions as shortcuts for single, unnamed inputs/outputs or define a map for named ones.</p><div class=highlight><pre class=chroma><code class=language-go data-lang=go><span class=kd>type</span> <span class=nx>prefixPayload</span> <span class=kd>struct</span> <span class=p>{</span> + <span class=nx>Data</span> <span class=kt>string</span> <span class=s>`beam:"data"`</span> +<span class=p>}</span> +<span class=nx>urn</span> <span class=o>:=</span> <span class=s>"beam:transforms:xlang:test:prefix"</span> +<span class=nx>payload</span> <span class=o>:=</span> <span class=nx>beam</span><span class=p>.</span><span class=nf>CrossLanguagePayload</span><span class=p>(</span><span class=nx>prefixPayload</span><span class=p>{</span><span class=nx>Data</span><span class=p>:</span> <span class=nx>prefix</span><span class=p>})</span> +<span class=nx>expansionAddr</span> <span class=o>:=</span> <span class=s>"localhost:8097"</span> +<span class=nx>outT</span> <span class=o>:=</span> <span class=nx>beam</span><span class=p>.</span><span class=nf>UnnamedOutput</span><span class=p>(</span><span class=nx>typex</span><span class=p>.</span><span class=nf>New</span><span class=p>(</span><span class=nx>reflectx</span><span class=p>.</span><span class=nx>String</span><span class=p>))</span> +<span class=nx>res</span> <span class=o>:=</span> <span class=nx>beam</span><span class=p>.</span><span class=nf>CrossLanguage</span><span class=p>(</span><span class=nx>s</span><span class=p>,</span> <span class=nx>urn</span><span class=p>,</span> <span class=nx>payload</span><span class=p>,</span> <span class=nx>expansionAddr</span><span class=p>,</span> <span class=nx>beam</span><span class=p>.</span><span class=nf>UnnamedInput</span><span class=p>(</span><span class=nx>inputPCol</spa [...] +</code></pre></div></li><li><p>After the job has been submitted to the Beam runner, shutdown the expansion service by +terminating the expansion service process.</p></li></ol><h3 id=x-lang-transform-runner-support>13.3. Runner Support</h3><p>Currently, portable runners such as Flink, Spark, and the direct runner can be used with multi-language pipelines.</p><p>Dataflow supports multi-language pipelines through the Dataflow Runner v2 backend architecture.</p><div class=feedback><p class=update>Last updated on 2022/01/24</p><h3>Have you found everything you were looking for?</h3><p class=description>Was it [...] <a href=http://www.apache.org>The Apache Software Foundation</a> | <a href=/privacy_policy>Privacy Policy</a> | <a href=/feed.xml>RSS Feed</a><br><br>Apache Beam, Apache, Beam, the Beam logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation. All other products or name brands are trademarks of their respective holders, including The Apache Software Foundation.</div></div></div></div></footer></body></html> \ No newline at end of file diff --git a/website/generated-content/sitemap.xml b/website/generated-content/sitemap.xml index 3789575..ff690e8 100644 --- a/website/generated-content/sitemap.xml +++ b/website/generated-content/sitemap.xml @@ -1 +1 @@ -<?xml version="1.0" encoding="utf-8" standalone="yes"?><urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9" xmlns:xhtml="http://www.w3.org/1999/xhtml"><url><loc>/blog/beam-2.35.0/</loc><lastmod>2021-12-29T17:49:39-08:00</lastmod></url><url><loc>/categories/blog/</loc><lastmod>2021-12-29T17:49:39-08:00</lastmod></url><url><loc>/blog/</loc><lastmod>2021-12-29T17:49:39-08:00</lastmod></url><url><loc>/categories/</loc><lastmod>2021-12-29T17:49:39-08:00</lastmod></url><url><loc>/blog/b [...] \ No newline at end of file +<?xml version="1.0" encoding="utf-8" standalone="yes"?><urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9" xmlns:xhtml="http://www.w3.org/1999/xhtml"><url><loc>/blog/beam-2.35.0/</loc><lastmod>2021-12-29T17:49:39-08:00</lastmod></url><url><loc>/categories/blog/</loc><lastmod>2021-12-29T17:49:39-08:00</lastmod></url><url><loc>/blog/</loc><lastmod>2021-12-29T17:49:39-08:00</lastmod></url><url><loc>/categories/</loc><lastmod>2021-12-29T17:49:39-08:00</lastmod></url><url><loc>/blog/b [...] \ No newline at end of file