alxp1982 commented on code in PR #24488:
URL: https://github.com/apache/beam/pull/24488#discussion_r1058641412


##########
learning/tour-of-beam/learning-content/java/schema-based-transforms/co-group/description.md:
##########
@@ -0,0 +1,88 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# CoGroup
+
+A transform that performs equijoins across multiple schema PCollections.
+
+This transform has similarities to `CoGroupByKey`, however works on 
PCollections that have schemas. This allows users of the transform to simply 
specify schema fields to join on. The output type of the transform is Row that 
contains one row field for the key and an ITERABLE field for each input 
containing the rows that joined on that key; by default the cross product is 
not expanded, but the cross product can be optionally expanded. By default the 
key field is named "key" (the name can be overridden using `withKeyField`) and 
has index 0. The tags in the `PCollectionTuple` control the names of the value 
fields in the `Row`.
+
+For example, the following demonstrates joining three PCollections on the 
"user" and "country" fields:
+
+
+```
+PCollection<Row> joined =
+PCollectionTuple.of("input1", input1, "input2", input2, "input3", input3)
+.apply(CoGroup.join(By.fieldNames("user", "country")));
+```
+
+### JOIN DIFFERENT FIELDS
+
+It's also possible to join between different fields in two inputs, as long as 
the types of those fields match. In this case, fields must be specified for 
every input PCollection. For example:
+
+For example, consider the SQL join: `SELECT * FROM input1Tag JOIN input2Tag ON 
input1Tag.referringUser = input2Tag.user`
+
+```
+PCollection joined = PCollectionTuple.of("input1Tag", input1, "input2Tag", 
input2)
+   .apply(CoGroup
+     .join("input1Tag", By.fieldNames("referringUser")))
+     .join("input2Tag", By.fieldNames("user")));
+```
+
+
+### INNER JOIN
+
+For example, consider the SQL join: `SELECT * FROM input1 INNER JOIN input2 ON 
input1.user = input2.user`
+
+```
+PCollection joined = PCollectionTuple.of("input1", input1, "input2", input2)
+   .apply(CoGroup.join(By.fieldNames("user")).crossProductJoin();
+```
+
+### LEFT OUTER JOIN
+
+For example, consider the SQL join: `SELECT * FROM input1 LEFT OUTER JOIN 
input2 ON input1.user = input2.user`
+
+```
+PCollection joined = PCollectionTuple.of("input1", input1, "input2", input2)
+   .apply(CoGroup.join("input1", 
By.fieldNames("user").withOptionalParticipation())
+                 .join("input2", By.fieldNames("user"))
+                 .crossProductJoin();
+```
+
+### RIGHT OUTER JOIN
+
+For example, consider the SQL join: `SELECT * FROM input1 RIGHT OUTER JOIN 
input2 ON input1.user = input2.user`
+
+```
+PCollection joined = PCollectionTuple.of("input1", input1, "input2", input2)
+   .apply(CoGroup.join("input1", By.fieldNames("user"))
+                 .join("input2", 
By.fieldNames("user").withOptionalParticipation())
+                 .crossProductJoin();
+```
+
+### FULL OUTER JOIN
+
+For example, consider the SQL join: `SELECT * FROM input1 FULL OUTER JOIN 
input2 ON input1.user = input2.user`
+
+```
+PCollection joined = PCollectionTuple.of("input1", input1, "input2", input2)
+   .apply(CoGroup.join("input1", 
By.fieldNames("user").withOptionalParticipation())
+                 .join("input2", 
By.fieldNames("user").withOptionalParticipation())
+                 .crossProductJoin();
+```
+
+### Playground exercise
+
+You can find the complete code of this example in the playground window you 
can run and experiment with.

Review Comment:
   In the playground window, you can find examples of `CoGroup` usage. Running 
this example, you'll see a list of purchase transactions made by users and 
their locations. Can you change it in such a way that `Location` object is 
referenced from the `Purchase` object to represent where the transaction was 
made and modify grouping in such a way that the final output contains location 
information for every user purchase as well as all the locations where no 
purchases were made? 



##########
learning/tour-of-beam/learning-content/java/schema-based-transforms/coder/description.md:
##########
@@ -0,0 +1,116 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Coder
+
+A Coder<T> defines how to encode and decode values of type T into byte streams.
+

Review Comment:
   > Note that coders are unrelated to parsing or formatting data when 
interacting with external data sources or sinks. You need to do such parsing or 
formatting explicitly, using transforms such as ParDo or MapElements.



##########
learning/tour-of-beam/learning-content/java/schema-based-transforms/coder/description.md:
##########
@@ -0,0 +1,116 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Coder
+
+A Coder<T> defines how to encode and decode values of type T into byte streams.
+
+Coder instances are serialized during job creation and deserialized before 
use. This will generally be performed by serializing the object via Java 
Serialization.
+
+Coder classes for compound types are often composed of coder classes for types 
contains therein. The composition of Coder instances into a coder for the 
compound class is the subject of the Coder Provider type, which enables 
automatic generic composition of Coder classes within the CoderRegistry. See 
Coder Provider and CoderRegistry for more information about how coders are 
inferred.

Review Comment:
   The Beam SDK requires a coder for every `PCollection` in your pipeline. In 
many cases, Beam can automatically infer the `Coder` for type in `PCollection` 
and use predefined coders to perform encoding and decoding. However, in some 
cases, you will need to specify the `Coder` explicitly or create a `Coder` for 
custom types. 
   
   To set the `Coder` for `PCollection`, you need to call 
`PCollection.setCoder`. You can also get the Coder associated with 
`PCollection` using the `PCollection.getCoder` method. 
   
   ### CoderRegistry
   When Beam tries to infer `Coder` for `PCollection`, it uses mappings stored 
in the `CoderRegistry` object associated with `PCollection`. You can access the 
CoderRegistry for a given pipeline using the method `Pipeline.getCoderRegistry` 
or get a coder for a particular type using `CoderRegistry.getCoder`.
   
   Please note that since CoderRegistry is associated with each `PCollection`, 
you can encode\decode the same type differently in different `PCollection`. 
   
   The following example demonstrates how to register a coder for a type using 
CoderRegistry:
   
   ```
   PipelineOptions options = PipelineOptionsFactory.create();
   Pipeline p = Pipeline.create(options);
   
   CoderRegistry cr = p.getCoderRegistry();
   cr.registerCoder(Integer.class, BigEndianIntegerCoder.class);
   ```
   
   ### Specifying default coder for a type
   You can specify the default coder for your custom type by annotating it with 
@DefaultCoder annotation. For example: 
   
   ```
   @DefaultCoder(AvroCoder.class)
   public class MyCustomDataType {
     ...
   }
   ```
   
   ### Creating a custom coder 
   



##########
learning/tour-of-beam/learning-content/java/schema-based-transforms/coder/description.md:
##########
@@ -0,0 +1,116 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Coder
+
+A Coder<T> defines how to encode and decode values of type T into byte streams.
+
+Coder instances are serialized during job creation and deserialized before 
use. This will generally be performed by serializing the object via Java 
Serialization.
+
+Coder classes for compound types are often composed of coder classes for types 
contains therein. The composition of Coder instances into a coder for the 
compound class is the subject of the Coder Provider type, which enables 
automatic generic composition of Coder classes within the CoderRegistry. See 
Coder Provider and CoderRegistry for more information about how coders are 
inferred.
+
+When you create custom objects and schemas, you need to create a subclass of 
Coder for your object and implement the following methods:

Review Comment:
   To create a custom coder for your type, you need to create a subclass of 
Coder for your object and implement the following methods:
                * `encode` - converting objects to bytes
                * `decode` - converting bytes to objects
                * `getCoderArguments` - If it is a `Coder` for a parameterized 
type, it returns a list of `Coders` used for each of the parameters in the same 
order in which they appear in the type signature of the parameterized type.
                * `verifyDeterministic` - throw the 
`Coder.NonDeterministicException` if the encoding is not deterministic.



##########
learning/tour-of-beam/learning-content/java/schema-based-transforms/convert/description.md:
##########
@@ -0,0 +1,27 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+### Converting between types
+
+As mentioned, Beam can automatically convert between different Java types, as 
long as those types have equivalent schemas. One way to do this is by using the 
```Convert``` transform, as follows.

Review Comment:
   Beam SDK provides a class `Convert` that allows conversion between types as 
long as those types have compatible schemas.  Schemas are compatible if they 
recursively have fields with the same names but possibly different orders. For 
example:
    
   Please come up with a snippet. 
   
   `Convert` also provides methods to convert from `Row` to custom type and 
vice versa. Following is an example of converting to and from `Row`:



##########
learning/tour-of-beam/learning-content/java/schema-based-transforms/filter/description.md:
##########
@@ -0,0 +1,64 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Filter
+
+A `PTransform` for filtering a collection of schema types.

Review Comment:
   `Filter` is a `PTransform` you can use on `PCollection` with the schema to 
filter elements. 



##########
learning/tour-of-beam/learning-content/java/schema-based-transforms/filter/description.md:
##########
@@ -0,0 +1,64 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Filter
+
+A `PTransform` for filtering a collection of schema types.
+
+Separate Predicates can be registered for different schema fields, and the 
result is allowed to pass if all predicates return true. The output type is the 
same as the input type.
+
+### Single fields filter
+
+For example, consider the following schema type:
+
+```
+public class Location {
+   public double latitude;
+   public double longitude;
+}
+```
+
+In order to examine only locations in south Manhattan, you would write:

Review Comment:
   To filter out all the locations except the ones in South Manhattan, you can 
write: 



##########
learning/tour-of-beam/learning-content/java/schema-based-transforms/filter/description.md:
##########
@@ -0,0 +1,64 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Filter
+
+A `PTransform` for filtering a collection of schema types.
+
+Separate Predicates can be registered for different schema fields, and the 
result is allowed to pass if all predicates return true. The output type is the 
same as the input type.
+
+### Single fields filter
+
+For example, consider the following schema type:
+
+```
+public class Location {
+   public double latitude;
+   public double longitude;
+}
+```
+
+In order to examine only locations in south Manhattan, you would write:
+
+```
+PCollection<Location> locations = readLocations();
+locations.apply(Filter
+   .whereFieldName("latitude", latitude -> latitude < 40.720 && latitude > 
40.699)
+   .whereFieldName("longitude", longitude -> longitude < -73.969 && longitude 
> -74.747));
+```
+
+### Multiple fields filter
+
+Predicates that require examining multiple fields at once are also supported. 
For example, consider the following class representing a user account:
+
+```
+class UserAccount {
+   public double spendOnBooks;
+   public double spendOnMovies;
+   ...
+}
+```
+
+Say you want to examine only users who`s total spend is above $100. You could 
write:

Review Comment:
   Let's say you'd like to process only users who's total spend is over $100. 
You could write: 



##########
learning/tour-of-beam/learning-content/java/schema-based-transforms/filter/example/Task.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+/*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// beam-playground:
+//   name: schema-filter
+//   description: Schema filter example.
+//   multifile: false
+//   context_line: 46
+//   categories:
+//     - Quickstart
+//   complexity: ADVANCED
+//   tags:
+//     - hellobeam
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
+import org.apache.beam.sdk.schemas.transforms.Filter;
+import org.apache.beam.sdk.schemas.transforms.Select;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class Task {
+    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
+
+    // UserPurchase schema
+    @DefaultSchema(JavaFieldSchema.class)
+    public static class UserPurchase {
+        public Long userId;
+        public String country;
+        public long cost;
+        public double transactionDuration;
+
+        @SchemaCreate
+        public UserPurchase(Long userId, String country, long cost, double 
transactionDuration) {
+            this.userId = userId;
+            this.country = country;
+            this.cost = cost;
+            this.transactionDuration = transactionDuration;
+        }
+    }
+
+    public static void main(String[] args) {
+        PipelineOptions options = 
PipelineOptionsFactory.fromArgs(args).create();
+        Pipeline pipeline = Pipeline.create(options);
+
+        UserPurchase user1 = new UserPurchase(1L, "America", 123, 22);

Review Comment:
   UserPurchase user1 = new UserPurchase(1L, "USA", 123, 22);



##########
learning/tour-of-beam/learning-content/java/schema-based-transforms/coder/description.md:
##########
@@ -0,0 +1,116 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Coder
+
+A Coder<T> defines how to encode and decode values of type T into byte streams.
+
+Coder instances are serialized during job creation and deserialized before 
use. This will generally be performed by serializing the object via Java 
Serialization.

Review Comment:
   Please remove this sentence. 



##########
learning/tour-of-beam/learning-content/java/schema-based-transforms/coder/example/Task.java:
##########
@@ -0,0 +1,98 @@
+/*

Review Comment:
   This is a grouping example. How does it relate to `Coder`? Please put a 
runnable example that:
   - Shows custom coder implementation
   - Type annotation for setting the default Coder
   Also, is it possible to add a different Coder for a custom type through 
CoderRegistry? Would it take precedence over the coder specified in annotation? 



##########
learning/tour-of-beam/learning-content/java/schema-based-transforms/coder/description.md:
##########
@@ -0,0 +1,116 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Coder
+
+A Coder<T> defines how to encode and decode values of type T into byte streams.
+
+Coder instances are serialized during job creation and deserialized before 
use. This will generally be performed by serializing the object via Java 
Serialization.
+
+Coder classes for compound types are often composed of coder classes for types 
contains therein. The composition of Coder instances into a coder for the 
compound class is the subject of the Coder Provider type, which enables 
automatic generic composition of Coder classes within the CoderRegistry. See 
Coder Provider and CoderRegistry for more information about how coders are 
inferred.
+
+When you create custom objects and schemas, you need to create a subclass of 
Coder for your object and implement the following methods:
+* `encode` - converting objects to bytes
+* `decode` - converting bytes to objects
+* `getCoderArguments` - If it is a `Coder` for a parameterized type, returns a 
list of `Coders` used for each of the parameters, in the same order in which 
they appear in the type signature of the parameterized type.
+* `verifyDeterministic` - throw the `Coder.NonDeterministicException`, if the 
encoding is not deterministic.
+
+For example, consider the following schema type:
+
+```
+@DefaultSchema(JavaFieldSchema.class)
+class VendorToPassengerDTO {
+    @JsonProperty(value = "PassengerCount")
+    Integer PassengerCount;
+    @JsonProperty(value = "VendorID")
+    Integer VendorID;
+
+    @SchemaCreate
+    public VendorToPassengerDTO(Integer passengerCount, Integer vendorID) {
+        this.PassengerCount = passengerCount;
+        this.VendorID = vendorID;
+    }
+
+    public static VendorToPassengerDTO of(final Integer passengerCount, final 
Integer vendorID) {
+        return new VendorToPassengerDTO(passengerCount, vendorID);
+    }
+
+    public void setPassengerCount(final Integer passengerCount) {
+        this.PassengerCount = passengerCount;
+    }
+
+    public void setVendorID(final Integer vendorID) {
+        this.VendorID = vendorID;
+    }
+
+    public Integer getVendorID() {
+        return this.VendorID;
+    }
+
+    public Integer getPassengerCount() {
+        return this.PassengerCount;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("{\"PassengerCount\":%d,\"VendorID\":%d}", 
PassengerCount, VendorID);
+    }
+}
+```
+
+Explicit `scheme` for converting to `Row`:
+```
+final Schema schema = Schema.builder()
+                .addInt32Field("PassengerCount")
+                .addInt32Field("VendorID")
+                .build();
+```
+
+`Coder` for `VendorToPassengerDTO`:
+```
+class CustomCoderSecond extends Coder<VendorToPassengerDTO> {
+    final ObjectMapper om = new ObjectMapper();
+
+    private static final CustomCoderSecond INSTANCE = new CustomCoderSecond();
+
+    public static CustomCoderSecond of() {
+        return INSTANCE;
+    }
+
+    @Override
+    public void encode(VendorToPassengerDTO dto, OutputStream outStream) 
throws IOException {
+        final String result = dto.toString();
+        outStream.write(result.getBytes());
+    }
+
+    @Override
+    public VendorToPassengerDTO decode(InputStream inStream) throws 
IOException {
+        final String serializedDTOs = new 
String(StreamUtils.getBytesWithoutClosing(inStream));
+        return om.readValue(serializedDTOs, VendorToPassengerDTO.class);
+    }
+
+    @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void verifyDeterministic() {
+    }
+}
+```
+
+### Playground exercise
+
+You can find the complete code of this example in the playground window you 
can run and experiment with.

Review Comment:
   Can you modify it to display ContainerRegistry mappings for input 
`PCollection`? How would you change integer type coding to use big-endian 
notation? 



##########
learning/tour-of-beam/learning-content/java/schema-based-transforms/coder/description.md:
##########
@@ -0,0 +1,116 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Coder
+
+A Coder<T> defines how to encode and decode values of type T into byte streams.
+
+Coder instances are serialized during job creation and deserialized before 
use. This will generally be performed by serializing the object via Java 
Serialization.
+
+Coder classes for compound types are often composed of coder classes for types 
contains therein. The composition of Coder instances into a coder for the 
compound class is the subject of the Coder Provider type, which enables 
automatic generic composition of Coder classes within the CoderRegistry. See 
Coder Provider and CoderRegistry for more information about how coders are 
inferred.
+
+When you create custom objects and schemas, you need to create a subclass of 
Coder for your object and implement the following methods:
+* `encode` - converting objects to bytes
+* `decode` - converting bytes to objects
+* `getCoderArguments` - If it is a `Coder` for a parameterized type, returns a 
list of `Coders` used for each of the parameters, in the same order in which 
they appear in the type signature of the parameterized type.
+* `verifyDeterministic` - throw the `Coder.NonDeterministicException`, if the 
encoding is not deterministic.
+
+For example, consider the following schema type:
+
+```
+@DefaultSchema(JavaFieldSchema.class)
+class VendorToPassengerDTO {
+    @JsonProperty(value = "PassengerCount")
+    Integer PassengerCount;
+    @JsonProperty(value = "VendorID")
+    Integer VendorID;
+
+    @SchemaCreate
+    public VendorToPassengerDTO(Integer passengerCount, Integer vendorID) {
+        this.PassengerCount = passengerCount;
+        this.VendorID = vendorID;
+    }
+
+    public static VendorToPassengerDTO of(final Integer passengerCount, final 
Integer vendorID) {
+        return new VendorToPassengerDTO(passengerCount, vendorID);
+    }
+
+    public void setPassengerCount(final Integer passengerCount) {
+        this.PassengerCount = passengerCount;
+    }
+
+    public void setVendorID(final Integer vendorID) {
+        this.VendorID = vendorID;
+    }
+
+    public Integer getVendorID() {
+        return this.VendorID;
+    }
+
+    public Integer getPassengerCount() {
+        return this.PassengerCount;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("{\"PassengerCount\":%d,\"VendorID\":%d}", 
PassengerCount, VendorID);
+    }
+}
+```
+
+Explicit `scheme` for converting to `Row`:
+```
+final Schema schema = Schema.builder()
+                .addInt32Field("PassengerCount")
+                .addInt32Field("VendorID")
+                .build();
+```
+
+`Coder` for `VendorToPassengerDTO`:

Review Comment:
   Custom `Coder` for VendorToPassengerDTO object: 



##########
learning/tour-of-beam/learning-content/java/schema-based-transforms/coder/description.md:
##########
@@ -0,0 +1,116 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Coder
+
+A Coder<T> defines how to encode and decode values of type T into byte streams.
+
+Coder instances are serialized during job creation and deserialized before 
use. This will generally be performed by serializing the object via Java 
Serialization.
+
+Coder classes for compound types are often composed of coder classes for types 
contains therein. The composition of Coder instances into a coder for the 
compound class is the subject of the Coder Provider type, which enables 
automatic generic composition of Coder classes within the CoderRegistry. See 
Coder Provider and CoderRegistry for more information about how coders are 
inferred.
+
+When you create custom objects and schemas, you need to create a subclass of 
Coder for your object and implement the following methods:
+* `encode` - converting objects to bytes
+* `decode` - converting bytes to objects
+* `getCoderArguments` - If it is a `Coder` for a parameterized type, returns a 
list of `Coders` used for each of the parameters, in the same order in which 
they appear in the type signature of the parameterized type.
+* `verifyDeterministic` - throw the `Coder.NonDeterministicException`, if the 
encoding is not deterministic.
+
+For example, consider the following schema type:
+
+```
+@DefaultSchema(JavaFieldSchema.class)
+class VendorToPassengerDTO {
+    @JsonProperty(value = "PassengerCount")
+    Integer PassengerCount;
+    @JsonProperty(value = "VendorID")
+    Integer VendorID;
+
+    @SchemaCreate
+    public VendorToPassengerDTO(Integer passengerCount, Integer vendorID) {
+        this.PassengerCount = passengerCount;
+        this.VendorID = vendorID;
+    }
+
+    public static VendorToPassengerDTO of(final Integer passengerCount, final 
Integer vendorID) {
+        return new VendorToPassengerDTO(passengerCount, vendorID);
+    }
+
+    public void setPassengerCount(final Integer passengerCount) {
+        this.PassengerCount = passengerCount;
+    }
+
+    public void setVendorID(final Integer vendorID) {
+        this.VendorID = vendorID;
+    }
+
+    public Integer getVendorID() {
+        return this.VendorID;
+    }
+
+    public Integer getPassengerCount() {
+        return this.PassengerCount;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("{\"PassengerCount\":%d,\"VendorID\":%d}", 
PassengerCount, VendorID);
+    }
+}
+```
+
+Explicit `scheme` for converting to `Row`:

Review Comment:
   Why we need the part related to explicit schema creation for converting to 
`Row`? 



##########
learning/tour-of-beam/learning-content/java/schema-based-transforms/convert/example/Task.java:
##########
@@ -0,0 +1,99 @@
+/*

Review Comment:
   Let's do two types with compatible schemas as Poco and Bean or AutoValue, 
then do Poco->Bean -> Row-> Poco conversion. 



##########
learning/tour-of-beam/learning-content/java/schema-based-transforms/filter/description.md:
##########
@@ -0,0 +1,64 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Filter
+
+A `PTransform` for filtering a collection of schema types.
+
+Separate Predicates can be registered for different schema fields, and the 
result is allowed to pass if all predicates return true. The output type is the 
same as the input type.

Review Comment:
   You can use a predicate that can reference the schema field, and if it 
returns true, the processed element is passed to output `PCollection`. Output 
elements have the same schema. 



##########
learning/tour-of-beam/learning-content/java/schema-based-transforms/filter/description.md:
##########
@@ -0,0 +1,64 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Filter
+
+A `PTransform` for filtering a collection of schema types.
+
+Separate Predicates can be registered for different schema fields, and the 
result is allowed to pass if all predicates return true. The output type is the 
same as the input type.
+
+### Single fields filter

Review Comment:
   ### Single field filter



##########
learning/tour-of-beam/learning-content/java/schema-based-transforms/filter/example/Task.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+/*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// beam-playground:
+//   name: schema-filter
+//   description: Schema filter example.
+//   multifile: false
+//   context_line: 46
+//   categories:
+//     - Quickstart
+//   complexity: ADVANCED
+//   tags:
+//     - hellobeam
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
+import org.apache.beam.sdk.schemas.transforms.Filter;
+import org.apache.beam.sdk.schemas.transforms.Select;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class Task {
+    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
+
+    // UserPurchase schema
+    @DefaultSchema(JavaFieldSchema.class)
+    public static class UserPurchase {
+        public Long userId;
+        public String country;
+        public long cost;
+        public double transactionDuration;
+
+        @SchemaCreate
+        public UserPurchase(Long userId, String country, long cost, double 
transactionDuration) {
+            this.userId = userId;
+            this.country = country;
+            this.cost = cost;
+            this.transactionDuration = transactionDuration;
+        }
+    }
+
+    public static void main(String[] args) {
+        PipelineOptions options = 
PipelineOptionsFactory.fromArgs(args).create();
+        Pipeline pipeline = Pipeline.create(options);
+
+        UserPurchase user1 = new UserPurchase(1L, "America", 123, 22);
+        UserPurchase user2 = new UserPurchase(1L, "Brazilian", 645, 86);

Review Comment:
   UserPurchase user2 = new UserPurchase(1L, "Brazil", 645, 86);



##########
learning/tour-of-beam/learning-content/java/schema-based-transforms/filter/description.md:
##########
@@ -0,0 +1,64 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Filter
+
+A `PTransform` for filtering a collection of schema types.
+
+Separate Predicates can be registered for different schema fields, and the 
result is allowed to pass if all predicates return true. The output type is the 
same as the input type.
+
+### Single fields filter
+
+For example, consider the following schema type:
+
+```
+public class Location {
+   public double latitude;
+   public double longitude;
+}
+```
+
+In order to examine only locations in south Manhattan, you would write:
+
+```
+PCollection<Location> locations = readLocations();
+locations.apply(Filter
+   .whereFieldName("latitude", latitude -> latitude < 40.720 && latitude > 
40.699)
+   .whereFieldName("longitude", longitude -> longitude < -73.969 && longitude 
> -74.747));
+```
+
+### Multiple fields filter
+
+Predicates that require examining multiple fields at once are also supported. 
For example, consider the following class representing a user account:

Review Comment:
   You can also use multiple fields inside the filtering predicate. For 
example, consider the following schema type representing user account: 



##########
learning/tour-of-beam/learning-content/java/schema-based-transforms/filter/description.md:
##########
@@ -0,0 +1,64 @@
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Filter
+
+A `PTransform` for filtering a collection of schema types.
+
+Separate Predicates can be registered for different schema fields, and the 
result is allowed to pass if all predicates return true. The output type is the 
same as the input type.
+
+### Single fields filter
+
+For example, consider the following schema type:
+
+```
+public class Location {
+   public double latitude;
+   public double longitude;
+}
+```
+
+In order to examine only locations in south Manhattan, you would write:
+
+```
+PCollection<Location> locations = readLocations();
+locations.apply(Filter
+   .whereFieldName("latitude", latitude -> latitude < 40.720 && latitude > 
40.699)
+   .whereFieldName("longitude", longitude -> longitude < -73.969 && longitude 
> -74.747));
+```
+
+### Multiple fields filter
+
+Predicates that require examining multiple fields at once are also supported. 
For example, consider the following class representing a user account:
+
+```
+class UserAccount {
+   public double spendOnBooks;
+   public double spendOnMovies;
+   ...
+}
+```
+
+Say you want to examine only users who`s total spend is above $100. You could 
write:
+
+```
+PCollection<UserAccount> users = readUsers();
+users.apply(Filter
+    .whereFieldNames(Lists.newArrayList("spendOnBooks", "spendOnMovies"),
+        row -> return row.getDouble("spendOnBooks") + 
row.getDouble("spendOnMovies") > 100.00));
+```
+
+### Playground exercise
+
+You can find the complete code of this example in the playground window you 
can run and experiment with.

Review Comment:
   In the playground window, you can find an example of `Filter` usage to find 
all user purchase transactions that took more than 50 seconds to complete. You 
can run and experiment with it. Try to filter input by several fields to find, 
for example, all transactions over $100 that happened in France. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to