alxp1982 commented on code in PR #24488: URL: https://github.com/apache/beam/pull/24488#discussion_r1058641412
########## learning/tour-of-beam/learning-content/java/schema-based-transforms/co-group/description.md: ########## @@ -0,0 +1,88 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# CoGroup + +A transform that performs equijoins across multiple schema PCollections. + +This transform has similarities to `CoGroupByKey`, however works on PCollections that have schemas. This allows users of the transform to simply specify schema fields to join on. The output type of the transform is Row that contains one row field for the key and an ITERABLE field for each input containing the rows that joined on that key; by default the cross product is not expanded, but the cross product can be optionally expanded. By default the key field is named "key" (the name can be overridden using `withKeyField`) and has index 0. The tags in the `PCollectionTuple` control the names of the value fields in the `Row`. + +For example, the following demonstrates joining three PCollections on the "user" and "country" fields: + + +``` +PCollection<Row> joined = +PCollectionTuple.of("input1", input1, "input2", input2, "input3", input3) +.apply(CoGroup.join(By.fieldNames("user", "country"))); +``` + +### JOIN DIFFERENT FIELDS + +It's also possible to join between different fields in two inputs, as long as the types of those fields match. In this case, fields must be specified for every input PCollection. For example: + +For example, consider the SQL join: `SELECT * FROM input1Tag JOIN input2Tag ON input1Tag.referringUser = input2Tag.user` + +``` +PCollection joined = PCollectionTuple.of("input1Tag", input1, "input2Tag", input2) + .apply(CoGroup + .join("input1Tag", By.fieldNames("referringUser"))) + .join("input2Tag", By.fieldNames("user"))); +``` + + +### INNER JOIN + +For example, consider the SQL join: `SELECT * FROM input1 INNER JOIN input2 ON input1.user = input2.user` + +``` +PCollection joined = PCollectionTuple.of("input1", input1, "input2", input2) + .apply(CoGroup.join(By.fieldNames("user")).crossProductJoin(); +``` + +### LEFT OUTER JOIN + +For example, consider the SQL join: `SELECT * FROM input1 LEFT OUTER JOIN input2 ON input1.user = input2.user` + +``` +PCollection joined = PCollectionTuple.of("input1", input1, "input2", input2) + .apply(CoGroup.join("input1", By.fieldNames("user").withOptionalParticipation()) + .join("input2", By.fieldNames("user")) + .crossProductJoin(); +``` + +### RIGHT OUTER JOIN + +For example, consider the SQL join: `SELECT * FROM input1 RIGHT OUTER JOIN input2 ON input1.user = input2.user` + +``` +PCollection joined = PCollectionTuple.of("input1", input1, "input2", input2) + .apply(CoGroup.join("input1", By.fieldNames("user")) + .join("input2", By.fieldNames("user").withOptionalParticipation()) + .crossProductJoin(); +``` + +### FULL OUTER JOIN + +For example, consider the SQL join: `SELECT * FROM input1 FULL OUTER JOIN input2 ON input1.user = input2.user` + +``` +PCollection joined = PCollectionTuple.of("input1", input1, "input2", input2) + .apply(CoGroup.join("input1", By.fieldNames("user").withOptionalParticipation()) + .join("input2", By.fieldNames("user").withOptionalParticipation()) + .crossProductJoin(); +``` + +### Playground exercise + +You can find the complete code of this example in the playground window you can run and experiment with. Review Comment: In the playground window, you can find examples of `CoGroup` usage. Running this example, you'll see a list of purchase transactions made by users and their locations. Can you change it in such a way that `Location` object is referenced from the `Purchase` object to represent where the transaction was made and modify grouping in such a way that the final output contains location information for every user purchase as well as all the locations where no purchases were made? ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/coder/description.md: ########## @@ -0,0 +1,116 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Coder + +A Coder<T> defines how to encode and decode values of type T into byte streams. + Review Comment: > Note that coders are unrelated to parsing or formatting data when interacting with external data sources or sinks. You need to do such parsing or formatting explicitly, using transforms such as ParDo or MapElements. ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/coder/description.md: ########## @@ -0,0 +1,116 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Coder + +A Coder<T> defines how to encode and decode values of type T into byte streams. + +Coder instances are serialized during job creation and deserialized before use. This will generally be performed by serializing the object via Java Serialization. + +Coder classes for compound types are often composed of coder classes for types contains therein. The composition of Coder instances into a coder for the compound class is the subject of the Coder Provider type, which enables automatic generic composition of Coder classes within the CoderRegistry. See Coder Provider and CoderRegistry for more information about how coders are inferred. Review Comment: The Beam SDK requires a coder for every `PCollection` in your pipeline. In many cases, Beam can automatically infer the `Coder` for type in `PCollection` and use predefined coders to perform encoding and decoding. However, in some cases, you will need to specify the `Coder` explicitly or create a `Coder` for custom types. To set the `Coder` for `PCollection`, you need to call `PCollection.setCoder`. You can also get the Coder associated with `PCollection` using the `PCollection.getCoder` method. ### CoderRegistry When Beam tries to infer `Coder` for `PCollection`, it uses mappings stored in the `CoderRegistry` object associated with `PCollection`. You can access the CoderRegistry for a given pipeline using the method `Pipeline.getCoderRegistry` or get a coder for a particular type using `CoderRegistry.getCoder`. Please note that since CoderRegistry is associated with each `PCollection`, you can encode\decode the same type differently in different `PCollection`. The following example demonstrates how to register a coder for a type using CoderRegistry: ``` PipelineOptions options = PipelineOptionsFactory.create(); Pipeline p = Pipeline.create(options); CoderRegistry cr = p.getCoderRegistry(); cr.registerCoder(Integer.class, BigEndianIntegerCoder.class); ``` ### Specifying default coder for a type You can specify the default coder for your custom type by annotating it with @DefaultCoder annotation. For example: ``` @DefaultCoder(AvroCoder.class) public class MyCustomDataType { ... } ``` ### Creating a custom coder ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/coder/description.md: ########## @@ -0,0 +1,116 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Coder + +A Coder<T> defines how to encode and decode values of type T into byte streams. + +Coder instances are serialized during job creation and deserialized before use. This will generally be performed by serializing the object via Java Serialization. + +Coder classes for compound types are often composed of coder classes for types contains therein. The composition of Coder instances into a coder for the compound class is the subject of the Coder Provider type, which enables automatic generic composition of Coder classes within the CoderRegistry. See Coder Provider and CoderRegistry for more information about how coders are inferred. + +When you create custom objects and schemas, you need to create a subclass of Coder for your object and implement the following methods: Review Comment: To create a custom coder for your type, you need to create a subclass of Coder for your object and implement the following methods: * `encode` - converting objects to bytes * `decode` - converting bytes to objects * `getCoderArguments` - If it is a `Coder` for a parameterized type, it returns a list of `Coders` used for each of the parameters in the same order in which they appear in the type signature of the parameterized type. * `verifyDeterministic` - throw the `Coder.NonDeterministicException` if the encoding is not deterministic. ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/convert/description.md: ########## @@ -0,0 +1,27 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +### Converting between types + +As mentioned, Beam can automatically convert between different Java types, as long as those types have equivalent schemas. One way to do this is by using the ```Convert``` transform, as follows. Review Comment: Beam SDK provides a class `Convert` that allows conversion between types as long as those types have compatible schemas. Schemas are compatible if they recursively have fields with the same names but possibly different orders. For example: Please come up with a snippet. `Convert` also provides methods to convert from `Row` to custom type and vice versa. Following is an example of converting to and from `Row`: ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/filter/description.md: ########## @@ -0,0 +1,64 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Filter + +A `PTransform` for filtering a collection of schema types. Review Comment: `Filter` is a `PTransform` you can use on `PCollection` with the schema to filter elements. ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/filter/description.md: ########## @@ -0,0 +1,64 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Filter + +A `PTransform` for filtering a collection of schema types. + +Separate Predicates can be registered for different schema fields, and the result is allowed to pass if all predicates return true. The output type is the same as the input type. + +### Single fields filter + +For example, consider the following schema type: + +``` +public class Location { + public double latitude; + public double longitude; +} +``` + +In order to examine only locations in south Manhattan, you would write: Review Comment: To filter out all the locations except the ones in South Manhattan, you can write: ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/filter/description.md: ########## @@ -0,0 +1,64 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Filter + +A `PTransform` for filtering a collection of schema types. + +Separate Predicates can be registered for different schema fields, and the result is allowed to pass if all predicates return true. The output type is the same as the input type. + +### Single fields filter + +For example, consider the following schema type: + +``` +public class Location { + public double latitude; + public double longitude; +} +``` + +In order to examine only locations in south Manhattan, you would write: + +``` +PCollection<Location> locations = readLocations(); +locations.apply(Filter + .whereFieldName("latitude", latitude -> latitude < 40.720 && latitude > 40.699) + .whereFieldName("longitude", longitude -> longitude < -73.969 && longitude > -74.747)); +``` + +### Multiple fields filter + +Predicates that require examining multiple fields at once are also supported. For example, consider the following class representing a user account: + +``` +class UserAccount { + public double spendOnBooks; + public double spendOnMovies; + ... +} +``` + +Say you want to examine only users who`s total spend is above $100. You could write: Review Comment: Let's say you'd like to process only users who's total spend is over $100. You could write: ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/filter/example/Task.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 +/* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// beam-playground: +// name: schema-filter +// description: Schema filter example. +// multifile: false +// context_line: 46 +// categories: +// - Quickstart +// complexity: ADVANCED +// tags: +// - hellobeam + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.JavaFieldSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaCreate; +import org.apache.beam.sdk.schemas.transforms.Filter; +import org.apache.beam.sdk.schemas.transforms.Select; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class Task { + private static final Logger LOG = LoggerFactory.getLogger(Task.class); + + // UserPurchase schema + @DefaultSchema(JavaFieldSchema.class) + public static class UserPurchase { + public Long userId; + public String country; + public long cost; + public double transactionDuration; + + @SchemaCreate + public UserPurchase(Long userId, String country, long cost, double transactionDuration) { + this.userId = userId; + this.country = country; + this.cost = cost; + this.transactionDuration = transactionDuration; + } + } + + public static void main(String[] args) { + PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); + Pipeline pipeline = Pipeline.create(options); + + UserPurchase user1 = new UserPurchase(1L, "America", 123, 22); Review Comment: UserPurchase user1 = new UserPurchase(1L, "USA", 123, 22); ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/coder/description.md: ########## @@ -0,0 +1,116 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Coder + +A Coder<T> defines how to encode and decode values of type T into byte streams. + +Coder instances are serialized during job creation and deserialized before use. This will generally be performed by serializing the object via Java Serialization. Review Comment: Please remove this sentence. ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/coder/example/Task.java: ########## @@ -0,0 +1,98 @@ +/* Review Comment: This is a grouping example. How does it relate to `Coder`? Please put a runnable example that: - Shows custom coder implementation - Type annotation for setting the default Coder Also, is it possible to add a different Coder for a custom type through CoderRegistry? Would it take precedence over the coder specified in annotation? ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/coder/description.md: ########## @@ -0,0 +1,116 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Coder + +A Coder<T> defines how to encode and decode values of type T into byte streams. + +Coder instances are serialized during job creation and deserialized before use. This will generally be performed by serializing the object via Java Serialization. + +Coder classes for compound types are often composed of coder classes for types contains therein. The composition of Coder instances into a coder for the compound class is the subject of the Coder Provider type, which enables automatic generic composition of Coder classes within the CoderRegistry. See Coder Provider and CoderRegistry for more information about how coders are inferred. + +When you create custom objects and schemas, you need to create a subclass of Coder for your object and implement the following methods: +* `encode` - converting objects to bytes +* `decode` - converting bytes to objects +* `getCoderArguments` - If it is a `Coder` for a parameterized type, returns a list of `Coders` used for each of the parameters, in the same order in which they appear in the type signature of the parameterized type. +* `verifyDeterministic` - throw the `Coder.NonDeterministicException`, if the encoding is not deterministic. + +For example, consider the following schema type: + +``` +@DefaultSchema(JavaFieldSchema.class) +class VendorToPassengerDTO { + @JsonProperty(value = "PassengerCount") + Integer PassengerCount; + @JsonProperty(value = "VendorID") + Integer VendorID; + + @SchemaCreate + public VendorToPassengerDTO(Integer passengerCount, Integer vendorID) { + this.PassengerCount = passengerCount; + this.VendorID = vendorID; + } + + public static VendorToPassengerDTO of(final Integer passengerCount, final Integer vendorID) { + return new VendorToPassengerDTO(passengerCount, vendorID); + } + + public void setPassengerCount(final Integer passengerCount) { + this.PassengerCount = passengerCount; + } + + public void setVendorID(final Integer vendorID) { + this.VendorID = vendorID; + } + + public Integer getVendorID() { + return this.VendorID; + } + + public Integer getPassengerCount() { + return this.PassengerCount; + } + + @Override + public String toString() { + return String.format("{\"PassengerCount\":%d,\"VendorID\":%d}", PassengerCount, VendorID); + } +} +``` + +Explicit `scheme` for converting to `Row`: +``` +final Schema schema = Schema.builder() + .addInt32Field("PassengerCount") + .addInt32Field("VendorID") + .build(); +``` + +`Coder` for `VendorToPassengerDTO`: +``` +class CustomCoderSecond extends Coder<VendorToPassengerDTO> { + final ObjectMapper om = new ObjectMapper(); + + private static final CustomCoderSecond INSTANCE = new CustomCoderSecond(); + + public static CustomCoderSecond of() { + return INSTANCE; + } + + @Override + public void encode(VendorToPassengerDTO dto, OutputStream outStream) throws IOException { + final String result = dto.toString(); + outStream.write(result.getBytes()); + } + + @Override + public VendorToPassengerDTO decode(InputStream inStream) throws IOException { + final String serializedDTOs = new String(StreamUtils.getBytesWithoutClosing(inStream)); + return om.readValue(serializedDTOs, VendorToPassengerDTO.class); + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return Collections.emptyList(); + } + + @Override + public void verifyDeterministic() { + } +} +``` + +### Playground exercise + +You can find the complete code of this example in the playground window you can run and experiment with. Review Comment: Can you modify it to display ContainerRegistry mappings for input `PCollection`? How would you change integer type coding to use big-endian notation? ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/coder/description.md: ########## @@ -0,0 +1,116 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Coder + +A Coder<T> defines how to encode and decode values of type T into byte streams. + +Coder instances are serialized during job creation and deserialized before use. This will generally be performed by serializing the object via Java Serialization. + +Coder classes for compound types are often composed of coder classes for types contains therein. The composition of Coder instances into a coder for the compound class is the subject of the Coder Provider type, which enables automatic generic composition of Coder classes within the CoderRegistry. See Coder Provider and CoderRegistry for more information about how coders are inferred. + +When you create custom objects and schemas, you need to create a subclass of Coder for your object and implement the following methods: +* `encode` - converting objects to bytes +* `decode` - converting bytes to objects +* `getCoderArguments` - If it is a `Coder` for a parameterized type, returns a list of `Coders` used for each of the parameters, in the same order in which they appear in the type signature of the parameterized type. +* `verifyDeterministic` - throw the `Coder.NonDeterministicException`, if the encoding is not deterministic. + +For example, consider the following schema type: + +``` +@DefaultSchema(JavaFieldSchema.class) +class VendorToPassengerDTO { + @JsonProperty(value = "PassengerCount") + Integer PassengerCount; + @JsonProperty(value = "VendorID") + Integer VendorID; + + @SchemaCreate + public VendorToPassengerDTO(Integer passengerCount, Integer vendorID) { + this.PassengerCount = passengerCount; + this.VendorID = vendorID; + } + + public static VendorToPassengerDTO of(final Integer passengerCount, final Integer vendorID) { + return new VendorToPassengerDTO(passengerCount, vendorID); + } + + public void setPassengerCount(final Integer passengerCount) { + this.PassengerCount = passengerCount; + } + + public void setVendorID(final Integer vendorID) { + this.VendorID = vendorID; + } + + public Integer getVendorID() { + return this.VendorID; + } + + public Integer getPassengerCount() { + return this.PassengerCount; + } + + @Override + public String toString() { + return String.format("{\"PassengerCount\":%d,\"VendorID\":%d}", PassengerCount, VendorID); + } +} +``` + +Explicit `scheme` for converting to `Row`: +``` +final Schema schema = Schema.builder() + .addInt32Field("PassengerCount") + .addInt32Field("VendorID") + .build(); +``` + +`Coder` for `VendorToPassengerDTO`: Review Comment: Custom `Coder` for VendorToPassengerDTO object: ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/coder/description.md: ########## @@ -0,0 +1,116 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Coder + +A Coder<T> defines how to encode and decode values of type T into byte streams. + +Coder instances are serialized during job creation and deserialized before use. This will generally be performed by serializing the object via Java Serialization. + +Coder classes for compound types are often composed of coder classes for types contains therein. The composition of Coder instances into a coder for the compound class is the subject of the Coder Provider type, which enables automatic generic composition of Coder classes within the CoderRegistry. See Coder Provider and CoderRegistry for more information about how coders are inferred. + +When you create custom objects and schemas, you need to create a subclass of Coder for your object and implement the following methods: +* `encode` - converting objects to bytes +* `decode` - converting bytes to objects +* `getCoderArguments` - If it is a `Coder` for a parameterized type, returns a list of `Coders` used for each of the parameters, in the same order in which they appear in the type signature of the parameterized type. +* `verifyDeterministic` - throw the `Coder.NonDeterministicException`, if the encoding is not deterministic. + +For example, consider the following schema type: + +``` +@DefaultSchema(JavaFieldSchema.class) +class VendorToPassengerDTO { + @JsonProperty(value = "PassengerCount") + Integer PassengerCount; + @JsonProperty(value = "VendorID") + Integer VendorID; + + @SchemaCreate + public VendorToPassengerDTO(Integer passengerCount, Integer vendorID) { + this.PassengerCount = passengerCount; + this.VendorID = vendorID; + } + + public static VendorToPassengerDTO of(final Integer passengerCount, final Integer vendorID) { + return new VendorToPassengerDTO(passengerCount, vendorID); + } + + public void setPassengerCount(final Integer passengerCount) { + this.PassengerCount = passengerCount; + } + + public void setVendorID(final Integer vendorID) { + this.VendorID = vendorID; + } + + public Integer getVendorID() { + return this.VendorID; + } + + public Integer getPassengerCount() { + return this.PassengerCount; + } + + @Override + public String toString() { + return String.format("{\"PassengerCount\":%d,\"VendorID\":%d}", PassengerCount, VendorID); + } +} +``` + +Explicit `scheme` for converting to `Row`: Review Comment: Why we need the part related to explicit schema creation for converting to `Row`? ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/convert/example/Task.java: ########## @@ -0,0 +1,99 @@ +/* Review Comment: Let's do two types with compatible schemas as Poco and Bean or AutoValue, then do Poco->Bean -> Row-> Poco conversion. ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/filter/description.md: ########## @@ -0,0 +1,64 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Filter + +A `PTransform` for filtering a collection of schema types. + +Separate Predicates can be registered for different schema fields, and the result is allowed to pass if all predicates return true. The output type is the same as the input type. Review Comment: You can use a predicate that can reference the schema field, and if it returns true, the processed element is passed to output `PCollection`. Output elements have the same schema. ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/filter/description.md: ########## @@ -0,0 +1,64 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Filter + +A `PTransform` for filtering a collection of schema types. + +Separate Predicates can be registered for different schema fields, and the result is allowed to pass if all predicates return true. The output type is the same as the input type. + +### Single fields filter Review Comment: ### Single field filter ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/filter/example/Task.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 +/* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// beam-playground: +// name: schema-filter +// description: Schema filter example. +// multifile: false +// context_line: 46 +// categories: +// - Quickstart +// complexity: ADVANCED +// tags: +// - hellobeam + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.JavaFieldSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaCreate; +import org.apache.beam.sdk.schemas.transforms.Filter; +import org.apache.beam.sdk.schemas.transforms.Select; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class Task { + private static final Logger LOG = LoggerFactory.getLogger(Task.class); + + // UserPurchase schema + @DefaultSchema(JavaFieldSchema.class) + public static class UserPurchase { + public Long userId; + public String country; + public long cost; + public double transactionDuration; + + @SchemaCreate + public UserPurchase(Long userId, String country, long cost, double transactionDuration) { + this.userId = userId; + this.country = country; + this.cost = cost; + this.transactionDuration = transactionDuration; + } + } + + public static void main(String[] args) { + PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); + Pipeline pipeline = Pipeline.create(options); + + UserPurchase user1 = new UserPurchase(1L, "America", 123, 22); + UserPurchase user2 = new UserPurchase(1L, "Brazilian", 645, 86); Review Comment: UserPurchase user2 = new UserPurchase(1L, "Brazil", 645, 86); ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/filter/description.md: ########## @@ -0,0 +1,64 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Filter + +A `PTransform` for filtering a collection of schema types. + +Separate Predicates can be registered for different schema fields, and the result is allowed to pass if all predicates return true. The output type is the same as the input type. + +### Single fields filter + +For example, consider the following schema type: + +``` +public class Location { + public double latitude; + public double longitude; +} +``` + +In order to examine only locations in south Manhattan, you would write: + +``` +PCollection<Location> locations = readLocations(); +locations.apply(Filter + .whereFieldName("latitude", latitude -> latitude < 40.720 && latitude > 40.699) + .whereFieldName("longitude", longitude -> longitude < -73.969 && longitude > -74.747)); +``` + +### Multiple fields filter + +Predicates that require examining multiple fields at once are also supported. For example, consider the following class representing a user account: Review Comment: You can also use multiple fields inside the filtering predicate. For example, consider the following schema type representing user account: ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/filter/description.md: ########## @@ -0,0 +1,64 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Filter + +A `PTransform` for filtering a collection of schema types. + +Separate Predicates can be registered for different schema fields, and the result is allowed to pass if all predicates return true. The output type is the same as the input type. + +### Single fields filter + +For example, consider the following schema type: + +``` +public class Location { + public double latitude; + public double longitude; +} +``` + +In order to examine only locations in south Manhattan, you would write: + +``` +PCollection<Location> locations = readLocations(); +locations.apply(Filter + .whereFieldName("latitude", latitude -> latitude < 40.720 && latitude > 40.699) + .whereFieldName("longitude", longitude -> longitude < -73.969 && longitude > -74.747)); +``` + +### Multiple fields filter + +Predicates that require examining multiple fields at once are also supported. For example, consider the following class representing a user account: + +``` +class UserAccount { + public double spendOnBooks; + public double spendOnMovies; + ... +} +``` + +Say you want to examine only users who`s total spend is above $100. You could write: + +``` +PCollection<UserAccount> users = readUsers(); +users.apply(Filter + .whereFieldNames(Lists.newArrayList("spendOnBooks", "spendOnMovies"), + row -> return row.getDouble("spendOnBooks") + row.getDouble("spendOnMovies") > 100.00)); +``` + +### Playground exercise + +You can find the complete code of this example in the playground window you can run and experiment with. Review Comment: In the playground window, you can find an example of `Filter` usage to find all user purchase transactions that took more than 50 seconds to complete. You can run and experiment with it. Try to filter input by several fields to find, for example, all transactions over $100 that happened in France. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
