davidwrede commented on a change in pull request #13317:
URL: https://github.com/apache/beam/pull/13317#discussion_r523300408



##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5431,4 +5431,280 @@ use case.
 
 {{< highlight py >}}
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" 
BundleFinalize >}}
-{{< /highlight >}}
\ No newline at end of file
+{{< /highlight >}}
+
+## 13. Multi-language pipelines {#mulit-language-pipelines}
+
+Beam allows you to combine transforms written in any supported SDK language 
(currently, Java and Python) and use them in one multi-language pipeline. This 
capability makes it easy to provide new functionality simultaneously in 
different Apache Beam SDKs through a single cross-language transform. For 
example, the [Apache Kafka 
connector](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py)
 and [SQL 
transform](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/sql.py)
 from the Java SDK can be used in Python streaming pipelines.
+
+Pipelines that use transforms from more than one SDK-language are known as 
*multi-language pipelines*.
+
+### 13.1. Creating cross-language transforms {#create-x-lang-transforms}
+
+To make transforms written in one language available to pipelines written in 
another language, an *expansion service* for that transform is used to create 
and inject the appropriate language-specific pipeline fragments into your 
pipeline.
+
+In the following example, a Python pipeline written the Apache Beam SDK for 
Python starts up a local Java expansion service on your computer to create and 
inject the appropriate Java pipeline fragments for executing the Java Kafka 
cross-language transform into your Python pipeline. The SDK then downloads and 
stages the necessary Java dependencies needed to execute these transforms.
+
+![Diagram of multi-language pipeline execution 
flow.](/images/multi-language-pipelines-diagram.svg)
+
+At runtime, the Beam runner will execute both Python and Java transforms to 
execute your pipeline.
+
+In this section, we will use 
[KafkaIO.Read](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html)
 to illustrate how to create a cross-language transform for Java and a test 
example for Python.
+
+#### 13.1.1. Creating cross-language Java transforms
+
+To make your Apache Beam Java SDK transform portable across SDK languages, you 
must implement two interfaces: 
[ExternalTransformBuilder](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ExternalTransformBuilder.java)
 and 
[ExternalTransformRegistrar](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalTransformRegistrar.java).
 The `ExternalTransformBuilder` interface constructs the cross-language 
transform using configuration values passed in from the pipeline and the 
`ExternalTransformRegistrar` interface registers the cross-language transform 
for use with the expansion service.
+
+**Implementing the interfaces**
+
+1. Define a Builder class for your transform that implements the 
`ExternalTransformBuilder` interface and overrides the `buildExternal` method 
that will be used to build your transform object. Initial configuration values 
for your transform should be defined in the `buildExternal` method. In most 
cases, it is convenient to make the Java transform builder class implement 
`ExternalTransformBuilder`.
+
+    > **Note:** `ExternalTransformBuilder` requires you to define a 
configuration object (a simple POJO) to capture a set of parameters sent by 
external SDKs to initiate the Java transform. Usually these parameters directly 
map to constructor parameters of the Java transform.
+
+    {{< highlight >}}
+@AutoValue.Builder
+abstract static class Builder<K, V>
+  implements ExternalTransformBuilder<External.Configuration, PBegin, 
PCollection<KV<K, V>>> {
+  abstract Builder<K, V> setConsumerConfig(Map<String, Object> config);
+
+  abstract Builder<K, V> setTopics(List<String> topics);
+
+  /** Remaining property declarations omitted for clarity. */
+
+  abstract Read<K, V> build();
+
+  @Override
+  public PTransform<PBegin, PCollection<KV<K, V>>> buildExternal(
+      External.Configuration config) {
+    ImmutableList.Builder<String> listBuilder = ImmutableList.builder();
+    for (String topic : config.topics) {
+      listBuilder.add(topic);
+    }
+    setTopics(listBuilder.build());
+
+    /** Remaining property defaults omitted for clarity. */
+  }
+}
+    {{< /highlight >}}
+2. Register the transform as an external cross-language transform by defining 
a class that implements `ExternalTransformRegistrar`. You must annotate your 
class with the `AutoService` annotation to ensure that your transform is 
registered and instantiated properly by the expansion service.
+3. In your registrar class, define a Uniform Resource Name (URN) for your 
transform. The URN must be a unique string that identifies your transform with 
the expansion service.
+4. From within your registrar class, define a configuration class for the 
parameters used during the initialization of your transform by the external SDK.
+
+    The following example from the KafkaIO transform shows how to implement 
steps two through four:
+
+    {{< highlight >}}
+@AutoService(ExternalTransformRegistrar.class)
+public static class External implements ExternalTransformRegistrar {
+
+  public static final String URN = "beam:external:java:kafka:read:v1";
+
+  @Override
+  public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> 
knownBuilders() {
+    return ImmutableMap.of(
+        URN,
+        (Class<? extends ExternalTransformBuilder<?, ?, ?>>)
+            (Class<?>) AutoValue_KafkaIO_Read.Builder.class);
+  }
+
+  /** Parameters class to expose the Read transform to an external SDK. */
+  public static class Configuration {
+    private Map<String, String> consumerConfig;
+      private List<String> topics;
+
+
+      public void setConsumerConfig(Map<String, String> consumerConfig) {
+        this.consumerConfig = consumerConfig;
+      }
+
+      public void setTopics(List<String> topics) {
+        this.topics = topics;
+      }
+
+    /** Remaining properties omitted for clarity. */
+  }
+}
+    {{< /highlight >}}
+
+After you have implemented the `ExternalTransformBuilder` and 
`ExternalTransformRegistrar` interfaces, your transform can be registered and 
created successfully by the default Java expansion service.
+
+**Using the expansion service**
+
+Java has a default expansion service included and available in the Apache Beam 
Java SDK. You can write your own expansion service, but that is generally not 
needed, so it is not covered in this section.

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to