alxp1982 commented on code in PR #24488: URL: https://github.com/apache/beam/pull/24488#discussion_r1055854387
########## learning/tour-of-beam/learning-content/java/schema-based-transforms/select/description.md: ########## @@ -0,0 +1,122 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Select + +The `Select` transform allows one to easily project out only the fields of interest. The resulting `PCollection` has a schema containing each selected field as a top-level field. Both top-level and nested fields can be selected. Review Comment: The `Select` transform allows one to easily project out only the fields of interest. The resulting `PCollection` has a schema containing each selected field as a top-level field. You can choose both top-level and nested fields. The output of this transform is of type Row, which you can convert into any other type with matching schema using the `Convert` transform. ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/select/description.md: ########## @@ -0,0 +1,122 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Select + +The `Select` transform allows one to easily project out only the fields of interest. The resulting `PCollection` has a schema containing each selected field as a top-level field. Both top-level and nested fields can be selected. + +The output of this transform is of type Row, though that can be converted into any other type with matching schema using the `Convert` transform. + +Sometimes different nested rows will have fields with the same name. Selecting multiple of these fields would result in a name conflict, as all selected fields are put in the same row schema. When this situation arises, the Select.withFieldNameAs builder method can be used to provide an alternate name for the selected field. Review Comment: Let's describe how to handle names collision (with an example) later in ## Nested Fields subsection ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/select/example/Task.java: ########## @@ -0,0 +1,120 @@ +/* Review Comment: I'm getting output: Exception in thread "main" java.lang.RuntimeException: Creator parameter arg0 Doesn't correspond to a schema field ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/select/description.md: ########## @@ -0,0 +1,122 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Select + +The `Select` transform allows one to easily project out only the fields of interest. The resulting `PCollection` has a schema containing each selected field as a top-level field. Both top-level and nested fields can be selected. + +The output of this transform is of type Row, though that can be converted into any other type with matching schema using the `Convert` transform. + +Sometimes different nested rows will have fields with the same name. Selecting multiple of these fields would result in a name conflict, as all selected fields are put in the same row schema. When this situation arises, the Select.withFieldNameAs builder method can be used to provide an alternate name for the selected field. + +### Top-level fields + +In order to select a field at the top level of a schema, the name of the field is specified. For example, to select just the user ids from a `PCollection` of purchases one would write (using the Select transform). + +``` +PCollection<Row> rows = purchases.apply(Select.fieldNames("userId", "shippingAddress.postCode")); +``` + +Will result in the following schema: + +``` +Field Name Field Type +userId STRING +``` + +### Nested fields + +Individual nested fields can be specified using the dot operator. For example, to select just the postal code from the shipping address one would write. + +``` +PCollection<Row> rows = purchases.apply(Select.fieldNames("shippingAddress.postCode")); +``` + +Will result in the following schema: + +``` +Field Name Field Type +postCode STRING +``` + +### Wildcards + +The `*` operator can be specified at any nesting level to represent all fields at that level. For example, to select all shipping-address fields one would write. + +The same is true for wildcard selections. The following: + +``` +PCollection<Row> rows = purchases.apply(Select.fieldNames("shippingAddress.*")); +``` + +Will result in the following schema: + +``` +Field Name Field Type +streetAddress STRING +city STRING +state nullable STRING +country STRING +postCode STRING + +``` + +### Select array + +When selecting fields nested inside of an array, the same rule applies that each selected field appears separately as a top-level field in the resulting row. This means that if multiple fields are selected from the same nested row, each selected field will appear as its own array field. + +``` +PCollection<Row> rows = purchases.apply(Select.fieldNames( "transactions.bank", "transactions.purchaseAmount")); +``` + +Will result in the following schema: + +``` +Field Name Field Type +bank ARRAY[STRING] +purchaseAmount ARRAY[DOUBLE] +``` + +### Flatten schema + +Another use of the `Select` transform is to flatten a nested schema into a single flat schema. + +``` +PCollection<Row> rows = purchases.apply(Select.flattenedSchema()); +``` + +Will result in the following schema: + +``` +Field Name Field Type +userId STRING +itemId STRING +shippingAddress_streetAddress STRING +shippingAddress_city nullable STRING +shippingAddress_state STRING +shippingAddress_country STRING +shippingAddress_postCode STRING +costCents INT64 +transactions_bank ARRAY[STRING] +transactions_purchaseAmount ARRAY[DOUBLE] + +``` + +### Playground exercise + +You can find the complete code of this example in the playground window you can run and experiment with. Review Comment: What is the challenge here? We should invite users to experiment by giving a challenge. ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/co-group/description.md: ########## @@ -0,0 +1,91 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# CoGroup + +A transform that performs equijoins across multiple schema PCollections. + +This transform has similarities to `CoGroupByKey`, however works on PCollections that have schemas. This allows users of the transform to simply specify schema fields to join on. The output type of the transform is Row that contains one row field for the key and an ITERABLE field for each input containing the rows that joined on that key; by default the cross product is not expanded, but the cross product can be optionally expanded. By default the key field is named "key" (the name can be overridden using `withKeyField`) and has index 0. The tags in the `PCollectionTuple` control the names of the value fields in the `Row`. + +For example, the following demonstrates joining three PCollections on the "user" and "country" fields: + + +``` +PCollection<Row> joined = +PCollectionTuple.of("input1", input1, "input2", input2, "input3", input3) +.apply(CoGroup.join(By.fieldNames("user", "country"))); +``` + +### JOIN DIFFERENT FIELDS + +It's also possible to join between different fields in two inputs, as long as the types of those fields match. In this case, fields must be specified for every input PCollection. For example: + +For example, consider the SQL join: `SELECT * FROM input1Tag JOIN input2Tag ON input1Tag.referringUser = input2Tag.user` + +``` +PCollection joined = PCollectionTuple.of("input1Tag", input1, "input2Tag", input2) + .apply(CoGroup + .join("input1Tag", By.fieldNames("referringUser"))) + .join("input2Tag", By.fieldNames("user"))); +``` + + +### INNER JOIN + +For example, consider the SQL join: `SELECT * FROM input1 INNER JOIN input2 ON input1.user = input2.user` + +``` +PCollection joined = PCollectionTuple.of("input1", input1, "input2", input2) + .apply(CoGroup.join(By.fieldNames("user")).crossProductJoin(); +``` + +### LEFT OUTER JOIN + +For example, consider the SQL join: `SELECT * FROM input1 LEFT OUTER JOIN input2 ON input1.user = input2.user` + +``` +PCollection joined = PCollectionTuple.of("input1", input1, "input2", input2) + .apply(CoGroup.join("input1", By.fieldNames("user").withOptionalParticipation()) + .join("input2", By.fieldNames("user")) + .crossProductJoin(); +``` + +### RIGHT OUTER JOIN + +For example, consider the SQL join: `SELECT * FROM input1 RIGHT OUTER JOIN input2 ON input1.user = input2.user` + +``` +PCollection joined = PCollectionTuple.of("input1", input1, "input2", input2) + .apply(CoGroup.join("input1", By.fieldNames("user")) + .join("input2", By.fieldNames("user").withOptionalParticipation()) + .crossProductJoin(); +``` + +### FULL OUTER JOIN + +For example, consider the SQL join: `SELECT * FROM input1 FULL OUTER JOIN input2 ON input1.user = input2.user` Review Comment: You can also do full outer join in Beam. For example, consider the following SQL join: `SELECT * FROM input1 FULL OUTER JOIN input2 ON input1.user = input2.user`. In Beam, you can do a similar join as follows: ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/co-group/description.md: ########## @@ -0,0 +1,91 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# CoGroup + +A transform that performs equijoins across multiple schema PCollections. + +This transform has similarities to `CoGroupByKey`, however works on PCollections that have schemas. This allows users of the transform to simply specify schema fields to join on. The output type of the transform is Row that contains one row field for the key and an ITERABLE field for each input containing the rows that joined on that key; by default the cross product is not expanded, but the cross product can be optionally expanded. By default the key field is named "key" (the name can be overridden using `withKeyField`) and has index 0. The tags in the `PCollectionTuple` control the names of the value fields in the `Row`. + +For example, the following demonstrates joining three PCollections on the "user" and "country" fields: + + +``` +PCollection<Row> joined = +PCollectionTuple.of("input1", input1, "input2", input2, "input3", input3) +.apply(CoGroup.join(By.fieldNames("user", "country"))); +``` + +### JOIN DIFFERENT FIELDS + +It's also possible to join between different fields in two inputs, as long as the types of those fields match. In this case, fields must be specified for every input PCollection. For example: + +For example, consider the SQL join: `SELECT * FROM input1Tag JOIN input2Tag ON input1Tag.referringUser = input2Tag.user` + +``` +PCollection joined = PCollectionTuple.of("input1Tag", input1, "input2Tag", input2) + .apply(CoGroup + .join("input1Tag", By.fieldNames("referringUser"))) + .join("input2Tag", By.fieldNames("user"))); +``` + + +### INNER JOIN + +For example, consider the SQL join: `SELECT * FROM input1 INNER JOIN input2 ON input1.user = input2.user` + +``` +PCollection joined = PCollectionTuple.of("input1", input1, "input2", input2) + .apply(CoGroup.join(By.fieldNames("user")).crossProductJoin(); +``` + +### LEFT OUTER JOIN + +For example, consider the SQL join: `SELECT * FROM input1 LEFT OUTER JOIN input2 ON input1.user = input2.user` + +``` +PCollection joined = PCollectionTuple.of("input1", input1, "input2", input2) + .apply(CoGroup.join("input1", By.fieldNames("user").withOptionalParticipation()) + .join("input2", By.fieldNames("user")) + .crossProductJoin(); +``` + +### RIGHT OUTER JOIN + +For example, consider the SQL join: `SELECT * FROM input1 RIGHT OUTER JOIN input2 ON input1.user = input2.user` + +``` +PCollection joined = PCollectionTuple.of("input1", input1, "input2", input2) + .apply(CoGroup.join("input1", By.fieldNames("user")) + .join("input2", By.fieldNames("user").withOptionalParticipation()) + .crossProductJoin(); +``` + +### FULL OUTER JOIN + +For example, consider the SQL join: `SELECT * FROM input1 FULL OUTER JOIN input2 ON input1.user = input2.user` + +``` +PCollection joined = PCollectionTuple.of("input1", input1, "input2", input2) + .apply(CoGroup.join("input1", By.fieldNames("user").withOptionalParticipation()) + .join("input2", By.fieldNames("user").withOptionalParticipation()) + .crossProductJoin(); +``` + +### Playground exercise + +You can find the complete code of this example in the playground window you can run and experiment with. + +One of the differences you will notice is that it also contains the part to output `PCollection` elements to the console. Review Comment: Remove this as well ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/coder/description.md: ########## @@ -0,0 +1,120 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Coder + +A Coder<T> defines how to encode and decode values of type T into byte streams. + +Coder instances are serialized during job creation and deserialized before use. This will generally be performed by serializing the object via Java Serialization. + +Coder classes for compound types are often composed of coder classes for types contains therein. The composition of Coder instances into a coder for the compound class is the subject of the Coder Provider type, which enables automatic generic composition of Coder classes within the CoderRegistry. See Coder Provider and CoderRegistry for more information about how coders are inferred. + +When you write your Custom code, you inherit from Coder and you have to implement its methods. Review Comment: When you create custom objects and schemas, you need to create a subclass of `Coder` for your object and implement the following methods: ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/coder/description.md: ########## @@ -0,0 +1,120 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Coder + +A Coder<T> defines how to encode and decode values of type T into byte streams. + +Coder instances are serialized during job creation and deserialized before use. This will generally be performed by serializing the object via Java Serialization. + +Coder classes for compound types are often composed of coder classes for types contains therein. The composition of Coder instances into a coder for the compound class is the subject of the Coder Provider type, which enables automatic generic composition of Coder classes within the CoderRegistry. See Coder Provider and CoderRegistry for more information about how coders are inferred. + +When you write your Custom code, you inherit from Coder and you have to implement its methods. +* `encode` - converting objects to bytes +* `decode` - converting bytes to objects +* `getCoderArguments` - If it is a `Coder` for a parameterized type, returns a list of `Coders` used for each of the parameters, in the same order in which they appear in the type signature of the parameterized type. +* `verifyDeterministic` - throw the `Coder.NonDeterministicException`, if the encoding is not deterministic. + +Entity `VendorToPassengerDTO` to work with pipeline: Review Comment: For example, consider the following schema type: ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/select/description.md: ########## @@ -0,0 +1,122 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Select + +The `Select` transform allows one to easily project out only the fields of interest. The resulting `PCollection` has a schema containing each selected field as a top-level field. Both top-level and nested fields can be selected. + +The output of this transform is of type Row, though that can be converted into any other type with matching schema using the `Convert` transform. + +Sometimes different nested rows will have fields with the same name. Selecting multiple of these fields would result in a name conflict, as all selected fields are put in the same row schema. When this situation arises, the Select.withFieldNameAs builder method can be used to provide an alternate name for the selected field. + +### Top-level fields + +In order to select a field at the top level of a schema, the name of the field is specified. For example, to select just the user ids from a `PCollection` of purchases one would write (using the Select transform). + +``` +PCollection<Row> rows = purchases.apply(Select.fieldNames("userId", "shippingAddress.postCode")); +``` + +Will result in the following schema: + +``` +Field Name Field Type +userId STRING +``` + +### Nested fields + +Individual nested fields can be specified using the dot operator. For example, to select just the postal code from the shipping address one would write. + +``` +PCollection<Row> rows = purchases.apply(Select.fieldNames("shippingAddress.postCode")); +``` + +Will result in the following schema: + +``` +Field Name Field Type +postCode STRING +``` + Review Comment: I think it would be a better place here to describe (with example) how to handle names collision ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/select/description.md: ########## @@ -0,0 +1,122 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Select + +The `Select` transform allows one to easily project out only the fields of interest. The resulting `PCollection` has a schema containing each selected field as a top-level field. Both top-level and nested fields can be selected. + +The output of this transform is of type Row, though that can be converted into any other type with matching schema using the `Convert` transform. + +Sometimes different nested rows will have fields with the same name. Selecting multiple of these fields would result in a name conflict, as all selected fields are put in the same row schema. When this situation arises, the Select.withFieldNameAs builder method can be used to provide an alternate name for the selected field. + +### Top-level fields + +In order to select a field at the top level of a schema, the name of the field is specified. For example, to select just the user ids from a `PCollection` of purchases one would write (using the Select transform). + +``` +PCollection<Row> rows = purchases.apply(Select.fieldNames("userId", "shippingAddress.postCode")); +``` + +Will result in the following schema: + +``` +Field Name Field Type +userId STRING +``` + +### Nested fields + +Individual nested fields can be specified using the dot operator. For example, to select just the postal code from the shipping address one would write. Review Comment: Individual nested fields can be specified using the dot operator. For example, you can select just the postal code from the shipping address using the following: ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/select/description.md: ########## @@ -0,0 +1,122 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Select + +The `Select` transform allows one to easily project out only the fields of interest. The resulting `PCollection` has a schema containing each selected field as a top-level field. Both top-level and nested fields can be selected. + +The output of this transform is of type Row, though that can be converted into any other type with matching schema using the `Convert` transform. + +Sometimes different nested rows will have fields with the same name. Selecting multiple of these fields would result in a name conflict, as all selected fields are put in the same row schema. When this situation arises, the Select.withFieldNameAs builder method can be used to provide an alternate name for the selected field. + +### Top-level fields + +In order to select a field at the top level of a schema, the name of the field is specified. For example, to select just the user ids from a `PCollection` of purchases one would write (using the Select transform). + +``` +PCollection<Row> rows = purchases.apply(Select.fieldNames("userId", "shippingAddress.postCode")); +``` + +Will result in the following schema: + +``` +Field Name Field Type +userId STRING +``` + +### Nested fields + +Individual nested fields can be specified using the dot operator. For example, to select just the postal code from the shipping address one would write. + +``` +PCollection<Row> rows = purchases.apply(Select.fieldNames("shippingAddress.postCode")); +``` + +Will result in the following schema: + +``` +Field Name Field Type +postCode STRING +``` + +### Wildcards + +The `*` operator can be specified at any nesting level to represent all fields at that level. For example, to select all shipping-address fields one would write. + +The same is true for wildcard selections. The following: + +``` +PCollection<Row> rows = purchases.apply(Select.fieldNames("shippingAddress.*")); +``` + +Will result in the following schema: + +``` +Field Name Field Type +streetAddress STRING +city STRING +state nullable STRING +country STRING +postCode STRING + +``` + +### Select array + +When selecting fields nested inside of an array, the same rule applies that each selected field appears separately as a top-level field in the resulting row. This means that if multiple fields are selected from the same nested row, each selected field will appear as its own array field. Review Comment: When selecting fields nested inside an array, the same rule applies: each chosen field appears separately as a top-level field in the resulting row. This means that if you select multiple fields from the same nested row, each selected field will appear as its own array field. ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/select/description.md: ########## @@ -0,0 +1,122 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Select + +The `Select` transform allows one to easily project out only the fields of interest. The resulting `PCollection` has a schema containing each selected field as a top-level field. Both top-level and nested fields can be selected. + +The output of this transform is of type Row, though that can be converted into any other type with matching schema using the `Convert` transform. + +Sometimes different nested rows will have fields with the same name. Selecting multiple of these fields would result in a name conflict, as all selected fields are put in the same row schema. When this situation arises, the Select.withFieldNameAs builder method can be used to provide an alternate name for the selected field. + +### Top-level fields + +In order to select a field at the top level of a schema, the name of the field is specified. For example, to select just the user ids from a `PCollection` of purchases one would write (using the Select transform). Review Comment: To select a field at the top level of a schema, you need to specify their names. For example, using the following code, you can choose just user ids from a 'PCollection' of purchases: ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/co-group/description.md: ########## @@ -0,0 +1,91 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# CoGroup + +A transform that performs equijoins across multiple schema PCollections. + +This transform has similarities to `CoGroupByKey`, however works on PCollections that have schemas. This allows users of the transform to simply specify schema fields to join on. The output type of the transform is Row that contains one row field for the key and an ITERABLE field for each input containing the rows that joined on that key; by default the cross product is not expanded, but the cross product can be optionally expanded. By default the key field is named "key" (the name can be overridden using `withKeyField`) and has index 0. The tags in the `PCollectionTuple` control the names of the value fields in the `Row`. + +For example, the following demonstrates joining three PCollections on the "user" and "country" fields: + + +``` +PCollection<Row> joined = +PCollectionTuple.of("input1", input1, "input2", input2, "input3", input3) +.apply(CoGroup.join(By.fieldNames("user", "country"))); +``` + +### JOIN DIFFERENT FIELDS + +It's also possible to join between different fields in two inputs, as long as the types of those fields match. In this case, fields must be specified for every input PCollection. For example: + +For example, consider the SQL join: `SELECT * FROM input1Tag JOIN input2Tag ON input1Tag.referringUser = input2Tag.user` + +``` +PCollection joined = PCollectionTuple.of("input1Tag", input1, "input2Tag", input2) + .apply(CoGroup + .join("input1Tag", By.fieldNames("referringUser"))) + .join("input2Tag", By.fieldNames("user"))); +``` + + +### INNER JOIN + +For example, consider the SQL join: `SELECT * FROM input1 INNER JOIN input2 ON input1.user = input2.user` + +``` +PCollection joined = PCollectionTuple.of("input1", input1, "input2", input2) + .apply(CoGroup.join(By.fieldNames("user")).crossProductJoin(); +``` + +### LEFT OUTER JOIN + +For example, consider the SQL join: `SELECT * FROM input1 LEFT OUTER JOIN input2 ON input1.user = input2.user` + +``` +PCollection joined = PCollectionTuple.of("input1", input1, "input2", input2) + .apply(CoGroup.join("input1", By.fieldNames("user").withOptionalParticipation()) + .join("input2", By.fieldNames("user")) + .crossProductJoin(); +``` + +### RIGHT OUTER JOIN + +For example, consider the SQL join: `SELECT * FROM input1 RIGHT OUTER JOIN input2 ON input1.user = input2.user` + +``` +PCollection joined = PCollectionTuple.of("input1", input1, "input2", input2) + .apply(CoGroup.join("input1", By.fieldNames("user")) + .join("input2", By.fieldNames("user").withOptionalParticipation()) + .crossProductJoin(); +``` + +### FULL OUTER JOIN + +For example, consider the SQL join: `SELECT * FROM input1 FULL OUTER JOIN input2 ON input1.user = input2.user` + +``` +PCollection joined = PCollectionTuple.of("input1", input1, "input2", input2) + .apply(CoGroup.join("input1", By.fieldNames("user").withOptionalParticipation()) + .join("input2", By.fieldNames("user").withOptionalParticipation()) + .crossProductJoin(); +``` + +### Playground exercise + +You can find the complete code of this example in the playground window you can run and experiment with. + +One of the differences you will notice is that it also contains the part to output `PCollection` elements to the console. +Do you also notice in what order elements of PCollection appear in the console? Why is that? You can also run the example several times to see if the output stays the same or changes. Review Comment: Remove this sentence ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/select/description.md: ########## @@ -0,0 +1,122 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Select + +The `Select` transform allows one to easily project out only the fields of interest. The resulting `PCollection` has a schema containing each selected field as a top-level field. Both top-level and nested fields can be selected. + +The output of this transform is of type Row, though that can be converted into any other type with matching schema using the `Convert` transform. + +Sometimes different nested rows will have fields with the same name. Selecting multiple of these fields would result in a name conflict, as all selected fields are put in the same row schema. When this situation arises, the Select.withFieldNameAs builder method can be used to provide an alternate name for the selected field. + +### Top-level fields + +In order to select a field at the top level of a schema, the name of the field is specified. For example, to select just the user ids from a `PCollection` of purchases one would write (using the Select transform). + +``` +PCollection<Row> rows = purchases.apply(Select.fieldNames("userId", "shippingAddress.postCode")); +``` + +Will result in the following schema: + +``` +Field Name Field Type +userId STRING +``` + +### Nested fields + +Individual nested fields can be specified using the dot operator. For example, to select just the postal code from the shipping address one would write. + +``` +PCollection<Row> rows = purchases.apply(Select.fieldNames("shippingAddress.postCode")); +``` + +Will result in the following schema: + +``` +Field Name Field Type +postCode STRING +``` + +### Wildcards + +The `*` operator can be specified at any nesting level to represent all fields at that level. For example, to select all shipping-address fields one would write. + +The same is true for wildcard selections. The following: + +``` +PCollection<Row> rows = purchases.apply(Select.fieldNames("shippingAddress.*")); +``` + +Will result in the following schema: + +``` +Field Name Field Type +streetAddress STRING +city STRING +state nullable STRING +country STRING +postCode STRING + +``` + +### Select array + +When selecting fields nested inside of an array, the same rule applies that each selected field appears separately as a top-level field in the resulting row. This means that if multiple fields are selected from the same nested row, each selected field will appear as its own array field. + +``` +PCollection<Row> rows = purchases.apply(Select.fieldNames( "transactions.bank", "transactions.purchaseAmount")); +``` + +Will result in the following schema: + +``` +Field Name Field Type +bank ARRAY[STRING] +purchaseAmount ARRAY[DOUBLE] +``` + +### Flatten schema + +Another use of the `Select` transform is to flatten a nested schema into a single flat schema. + +``` +PCollection<Row> rows = purchases.apply(Select.flattenedSchema()); +``` + +Will result in the following schema: + +``` +Field Name Field Type +userId STRING +itemId STRING +shippingAddress_streetAddress STRING +shippingAddress_city nullable STRING +shippingAddress_state STRING +shippingAddress_country STRING +shippingAddress_postCode STRING +costCents INT64 +transactions_bank ARRAY[STRING] +transactions_purchaseAmount ARRAY[DOUBLE] + +``` + +### Playground exercise + +You can find the complete code of this example in the playground window you can run and experiment with. + +One of the differences you will notice is that it also contains the part to output `PCollection` elements to the console. + +Do you also notice in what order elements of PCollection appear in the console? Why is that? You can also run the example several times to see if the output stays the same or changes. Review Comment: Remove this sentence. Not applicable here. ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/co-group/description.md: ########## @@ -0,0 +1,91 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# CoGroup + +A transform that performs equijoins across multiple schema PCollections. Review Comment: CoGroup is a transform that performs equijoins across multiple schema PCollections. This transform is similar to `CoGroupByKey`; however, it works on PCollections with schemas. With 'CoGroup', you can specify schema fields to join on. The output type of the transform is Row, which contains a field for the key and an ITERABLE field for each input containing the rows that joined on that key; by default, the cross product is not expanded, but you can expand it. By default, the key field is named "key" (the name can be overridden using `withKeyField`) and has an index 0. The tags in the `PCollectionTuple` control the names of the value fields in the `Row`. ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/co-group/description.md: ########## @@ -0,0 +1,91 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# CoGroup + +A transform that performs equijoins across multiple schema PCollections. + +This transform has similarities to `CoGroupByKey`, however works on PCollections that have schemas. This allows users of the transform to simply specify schema fields to join on. The output type of the transform is Row that contains one row field for the key and an ITERABLE field for each input containing the rows that joined on that key; by default the cross product is not expanded, but the cross product can be optionally expanded. By default the key field is named "key" (the name can be overridden using `withKeyField`) and has index 0. The tags in the `PCollectionTuple` control the names of the value fields in the `Row`. + +For example, the following demonstrates joining three PCollections on the "user" and "country" fields: + + +``` +PCollection<Row> joined = +PCollectionTuple.of("input1", input1, "input2", input2, "input3", input3) +.apply(CoGroup.join(By.fieldNames("user", "country"))); +``` + +### JOIN DIFFERENT FIELDS + +It's also possible to join between different fields in two inputs, as long as the types of those fields match. In this case, fields must be specified for every input PCollection. For example: Review Comment: It's also possible to join between different fields in two inputs, as long as the types of those fields match. In this case, fields must be specified for every input PCollection. For example, consider the SQL join: `SELECT * FROM input1Tag JOIN input2Tag ON input1Tag.referringUser = input2Tag.user`. You can perform a similar join in Beam using the following: ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/co-group/description.md: ########## @@ -0,0 +1,91 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# CoGroup + +A transform that performs equijoins across multiple schema PCollections. + +This transform has similarities to `CoGroupByKey`, however works on PCollections that have schemas. This allows users of the transform to simply specify schema fields to join on. The output type of the transform is Row that contains one row field for the key and an ITERABLE field for each input containing the rows that joined on that key; by default the cross product is not expanded, but the cross product can be optionally expanded. By default the key field is named "key" (the name can be overridden using `withKeyField`) and has index 0. The tags in the `PCollectionTuple` control the names of the value fields in the `Row`. + +For example, the following demonstrates joining three PCollections on the "user" and "country" fields: + + +``` +PCollection<Row> joined = +PCollectionTuple.of("input1", input1, "input2", input2, "input3", input3) +.apply(CoGroup.join(By.fieldNames("user", "country"))); +``` + +### JOIN DIFFERENT FIELDS + +It's also possible to join between different fields in two inputs, as long as the types of those fields match. In this case, fields must be specified for every input PCollection. For example: + +For example, consider the SQL join: `SELECT * FROM input1Tag JOIN input2Tag ON input1Tag.referringUser = input2Tag.user` + +``` +PCollection joined = PCollectionTuple.of("input1Tag", input1, "input2Tag", input2) + .apply(CoGroup + .join("input1Tag", By.fieldNames("referringUser"))) + .join("input2Tag", By.fieldNames("user"))); +``` + + +### INNER JOIN + +For example, consider the SQL join: `SELECT * FROM input1 INNER JOIN input2 ON input1.user = input2.user` Review Comment: You can also do inner joins in Beam. For example, consider the following SQL join: `SELECT * FROM input1 INNER JOIN input2 ON input1.user = input2.user`. In Beam, you can do a similar join as follows: ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/co-group/description.md: ########## @@ -0,0 +1,91 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# CoGroup + +A transform that performs equijoins across multiple schema PCollections. + +This transform has similarities to `CoGroupByKey`, however works on PCollections that have schemas. This allows users of the transform to simply specify schema fields to join on. The output type of the transform is Row that contains one row field for the key and an ITERABLE field for each input containing the rows that joined on that key; by default the cross product is not expanded, but the cross product can be optionally expanded. By default the key field is named "key" (the name can be overridden using `withKeyField`) and has index 0. The tags in the `PCollectionTuple` control the names of the value fields in the `Row`. + +For example, the following demonstrates joining three PCollections on the "user" and "country" fields: + + +``` +PCollection<Row> joined = +PCollectionTuple.of("input1", input1, "input2", input2, "input3", input3) +.apply(CoGroup.join(By.fieldNames("user", "country"))); +``` + +### JOIN DIFFERENT FIELDS + +It's also possible to join between different fields in two inputs, as long as the types of those fields match. In this case, fields must be specified for every input PCollection. For example: + +For example, consider the SQL join: `SELECT * FROM input1Tag JOIN input2Tag ON input1Tag.referringUser = input2Tag.user` + +``` +PCollection joined = PCollectionTuple.of("input1Tag", input1, "input2Tag", input2) + .apply(CoGroup + .join("input1Tag", By.fieldNames("referringUser"))) + .join("input2Tag", By.fieldNames("user"))); +``` + + +### INNER JOIN + +For example, consider the SQL join: `SELECT * FROM input1 INNER JOIN input2 ON input1.user = input2.user` + +``` +PCollection joined = PCollectionTuple.of("input1", input1, "input2", input2) + .apply(CoGroup.join(By.fieldNames("user")).crossProductJoin(); +``` + +### LEFT OUTER JOIN + +For example, consider the SQL join: `SELECT * FROM input1 LEFT OUTER JOIN input2 ON input1.user = input2.user` + +``` +PCollection joined = PCollectionTuple.of("input1", input1, "input2", input2) + .apply(CoGroup.join("input1", By.fieldNames("user").withOptionalParticipation()) + .join("input2", By.fieldNames("user")) + .crossProductJoin(); +``` + +### RIGHT OUTER JOIN + +For example, consider the SQL join: `SELECT * FROM input1 RIGHT OUTER JOIN input2 ON input1.user = input2.user` Review Comment: You can also do right outer join in Beam. For example, consider the following SQL join: `SELECT * FROM input1 RIGHT OUTER JOIN input2 ON input1.user = input2.user`. In Beam, you can do a similar join as follows: ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/co-group/description.md: ########## @@ -0,0 +1,91 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# CoGroup + +A transform that performs equijoins across multiple schema PCollections. + +This transform has similarities to `CoGroupByKey`, however works on PCollections that have schemas. This allows users of the transform to simply specify schema fields to join on. The output type of the transform is Row that contains one row field for the key and an ITERABLE field for each input containing the rows that joined on that key; by default the cross product is not expanded, but the cross product can be optionally expanded. By default the key field is named "key" (the name can be overridden using `withKeyField`) and has index 0. The tags in the `PCollectionTuple` control the names of the value fields in the `Row`. + +For example, the following demonstrates joining three PCollections on the "user" and "country" fields: + + +``` +PCollection<Row> joined = +PCollectionTuple.of("input1", input1, "input2", input2, "input3", input3) +.apply(CoGroup.join(By.fieldNames("user", "country"))); +``` + +### JOIN DIFFERENT FIELDS + +It's also possible to join between different fields in two inputs, as long as the types of those fields match. In this case, fields must be specified for every input PCollection. For example: + +For example, consider the SQL join: `SELECT * FROM input1Tag JOIN input2Tag ON input1Tag.referringUser = input2Tag.user` + +``` +PCollection joined = PCollectionTuple.of("input1Tag", input1, "input2Tag", input2) + .apply(CoGroup + .join("input1Tag", By.fieldNames("referringUser"))) + .join("input2Tag", By.fieldNames("user"))); +``` + + +### INNER JOIN + +For example, consider the SQL join: `SELECT * FROM input1 INNER JOIN input2 ON input1.user = input2.user` + +``` +PCollection joined = PCollectionTuple.of("input1", input1, "input2", input2) + .apply(CoGroup.join(By.fieldNames("user")).crossProductJoin(); +``` + +### LEFT OUTER JOIN + +For example, consider the SQL join: `SELECT * FROM input1 LEFT OUTER JOIN input2 ON input1.user = input2.user` + +``` +PCollection joined = PCollectionTuple.of("input1", input1, "input2", input2) + .apply(CoGroup.join("input1", By.fieldNames("user").withOptionalParticipation()) + .join("input2", By.fieldNames("user")) + .crossProductJoin(); +``` + +### RIGHT OUTER JOIN + +For example, consider the SQL join: `SELECT * FROM input1 RIGHT OUTER JOIN input2 ON input1.user = input2.user` + +``` +PCollection joined = PCollectionTuple.of("input1", input1, "input2", input2) + .apply(CoGroup.join("input1", By.fieldNames("user")) + .join("input2", By.fieldNames("user").withOptionalParticipation()) + .crossProductJoin(); +``` + +### FULL OUTER JOIN + +For example, consider the SQL join: `SELECT * FROM input1 FULL OUTER JOIN input2 ON input1.user = input2.user` + +``` +PCollection joined = PCollectionTuple.of("input1", input1, "input2", input2) + .apply(CoGroup.join("input1", By.fieldNames("user").withOptionalParticipation()) + .join("input2", By.fieldNames("user").withOptionalParticipation()) + .crossProductJoin(); +``` + +### Playground exercise + +You can find the complete code of this example in the playground window you can run and experiment with. Review Comment: In the playground window, you can find examples of CoGroup usage on a schema that consists of the User, UserPurchase, and Location objects. Later two have userId field to identify the User. Try to run the example and experiment with it. For example, can you change it to output only users who made purchases? ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/co-group/description.md: ########## @@ -0,0 +1,91 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# CoGroup + +A transform that performs equijoins across multiple schema PCollections. + +This transform has similarities to `CoGroupByKey`, however works on PCollections that have schemas. This allows users of the transform to simply specify schema fields to join on. The output type of the transform is Row that contains one row field for the key and an ITERABLE field for each input containing the rows that joined on that key; by default the cross product is not expanded, but the cross product can be optionally expanded. By default the key field is named "key" (the name can be overridden using `withKeyField`) and has index 0. The tags in the `PCollectionTuple` control the names of the value fields in the `Row`. + +For example, the following demonstrates joining three PCollections on the "user" and "country" fields: + + +``` +PCollection<Row> joined = +PCollectionTuple.of("input1", input1, "input2", input2, "input3", input3) +.apply(CoGroup.join(By.fieldNames("user", "country"))); +``` + +### JOIN DIFFERENT FIELDS + +It's also possible to join between different fields in two inputs, as long as the types of those fields match. In this case, fields must be specified for every input PCollection. For example: + +For example, consider the SQL join: `SELECT * FROM input1Tag JOIN input2Tag ON input1Tag.referringUser = input2Tag.user` + +``` +PCollection joined = PCollectionTuple.of("input1Tag", input1, "input2Tag", input2) + .apply(CoGroup + .join("input1Tag", By.fieldNames("referringUser"))) + .join("input2Tag", By.fieldNames("user"))); +``` + + +### INNER JOIN + +For example, consider the SQL join: `SELECT * FROM input1 INNER JOIN input2 ON input1.user = input2.user` + +``` +PCollection joined = PCollectionTuple.of("input1", input1, "input2", input2) + .apply(CoGroup.join(By.fieldNames("user")).crossProductJoin(); +``` + +### LEFT OUTER JOIN + +For example, consider the SQL join: `SELECT * FROM input1 LEFT OUTER JOIN input2 ON input1.user = input2.user` Review Comment: You can also do left outer joins in Beam. For example, consider the following SQL join: `SELECT * FROM input1 LEFT OUTER JOIN input2 ON input1.user = input2.user`. In Beam, you can do a similar join as follows: ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/coder/description.md: ########## @@ -0,0 +1,120 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Coder + +A Coder<T> defines how to encode and decode values of type T into byte streams. + +Coder instances are serialized during job creation and deserialized before use. This will generally be performed by serializing the object via Java Serialization. + +Coder classes for compound types are often composed of coder classes for types contains therein. The composition of Coder instances into a coder for the compound class is the subject of the Coder Provider type, which enables automatic generic composition of Coder classes within the CoderRegistry. See Coder Provider and CoderRegistry for more information about how coders are inferred. + +When you write your Custom code, you inherit from Coder and you have to implement its methods. +* `encode` - converting objects to bytes +* `decode` - converting bytes to objects +* `getCoderArguments` - If it is a `Coder` for a parameterized type, returns a list of `Coders` used for each of the parameters, in the same order in which they appear in the type signature of the parameterized type. +* `verifyDeterministic` - throw the `Coder.NonDeterministicException`, if the encoding is not deterministic. + +Entity `VendorToPassengerDTO` to work with pipeline: + +``` +@DefaultSchema(JavaFieldSchema.class) +class VendorToPassengerDTO { + @JsonProperty(value = "PassengerCount") + Integer PassengerCount; + @JsonProperty(value = "VendorID") + Integer VendorID; + + @SchemaCreate + public VendorToPassengerDTO(Integer passengerCount, Integer vendorID) { + this.PassengerCount = passengerCount; + this.VendorID = vendorID; + } + + public static VendorToPassengerDTO of(final Integer passengerCount, final Integer vendorID) { + return new VendorToPassengerDTO(passengerCount, vendorID); + } + + public void setPassengerCount(final Integer passengerCount) { + this.PassengerCount = passengerCount; + } + + public void setVendorID(final Integer vendorID) { + this.VendorID = vendorID; + } + + public Integer getVendorID() { + return this.VendorID; + } + + public Integer getPassengerCount() { + return this.PassengerCount; + } + + @Override + public String toString() { + return String.format("{\"PassengerCount\":%d,\"VendorID\":%d}", PassengerCount, VendorID); + } +} +``` + +Explicit `scheme` for converting to `Row`: +``` +final Schema schema = Schema.builder() + .addInt32Field("PassengerCount") + .addInt32Field("VendorID") + .build(); +``` + +`Coder` for `VendorToPassengerDTO`: +``` +class CustomCoderSecond extends Coder<VendorToPassengerDTO> { + final ObjectMapper om = new ObjectMapper(); + + private static final CustomCoderSecond INSTANCE = new CustomCoderSecond(); + + public static CustomCoderSecond of() { + return INSTANCE; + } + + @Override + public void encode(VendorToPassengerDTO dto, OutputStream outStream) throws IOException { + final String result = dto.toString(); + outStream.write(result.getBytes()); + } + + @Override + public VendorToPassengerDTO decode(InputStream inStream) throws IOException { + final String serializedDTOs = new String(StreamUtils.getBytesWithoutClosing(inStream)); + return om.readValue(serializedDTOs, VendorToPassengerDTO.class); + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return Collections.emptyList(); + } + + @Override + public void verifyDeterministic() { + } +} +``` + +### Playground exercise + +You can find the complete code of this example in the playground window you can run and experiment with. + +One of the differences you will notice is that it also contains the part to output `PCollection` elements to the console. Review Comment: Remove ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/coder/description.md: ########## @@ -0,0 +1,120 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Coder + +A Coder<T> defines how to encode and decode values of type T into byte streams. Review Comment: When Beam runners execute your pipeline, they often need to convert an object to and from a binary format. The Beam SDKs use objects called Coders to perform coding and decoding. In Beam SDK for Java, the base type `Coder` contains conversion methods. SDK also contains Coder subclasses that work with various standard Java types, such as Integer, Long, Double, StringUtf8, and more. You can find all the available Coder subclasses in the [Coder package](https://github.com/apache/beam/tree/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders). ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/coder/description.md: ########## @@ -0,0 +1,120 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Coder + +A Coder<T> defines how to encode and decode values of type T into byte streams. + +Coder instances are serialized during job creation and deserialized before use. This will generally be performed by serializing the object via Java Serialization. + +Coder classes for compound types are often composed of coder classes for types contains therein. The composition of Coder instances into a coder for the compound class is the subject of the Coder Provider type, which enables automatic generic composition of Coder classes within the CoderRegistry. See Coder Provider and CoderRegistry for more information about how coders are inferred. + +When you write your Custom code, you inherit from Coder and you have to implement its methods. +* `encode` - converting objects to bytes +* `decode` - converting bytes to objects +* `getCoderArguments` - If it is a `Coder` for a parameterized type, returns a list of `Coders` used for each of the parameters, in the same order in which they appear in the type signature of the parameterized type. +* `verifyDeterministic` - throw the `Coder.NonDeterministicException`, if the encoding is not deterministic. + +Entity `VendorToPassengerDTO` to work with pipeline: + +``` +@DefaultSchema(JavaFieldSchema.class) +class VendorToPassengerDTO { + @JsonProperty(value = "PassengerCount") + Integer PassengerCount; + @JsonProperty(value = "VendorID") + Integer VendorID; + + @SchemaCreate + public VendorToPassengerDTO(Integer passengerCount, Integer vendorID) { + this.PassengerCount = passengerCount; + this.VendorID = vendorID; + } + + public static VendorToPassengerDTO of(final Integer passengerCount, final Integer vendorID) { + return new VendorToPassengerDTO(passengerCount, vendorID); + } + + public void setPassengerCount(final Integer passengerCount) { + this.PassengerCount = passengerCount; + } + + public void setVendorID(final Integer vendorID) { + this.VendorID = vendorID; + } + + public Integer getVendorID() { + return this.VendorID; + } + + public Integer getPassengerCount() { + return this.PassengerCount; + } + + @Override + public String toString() { + return String.format("{\"PassengerCount\":%d,\"VendorID\":%d}", PassengerCount, VendorID); + } +} +``` + +Explicit `scheme` for converting to `Row`: +``` +final Schema schema = Schema.builder() + .addInt32Field("PassengerCount") + .addInt32Field("VendorID") + .build(); +``` + +`Coder` for `VendorToPassengerDTO`: +``` +class CustomCoderSecond extends Coder<VendorToPassengerDTO> { + final ObjectMapper om = new ObjectMapper(); + + private static final CustomCoderSecond INSTANCE = new CustomCoderSecond(); + + public static CustomCoderSecond of() { + return INSTANCE; + } + + @Override + public void encode(VendorToPassengerDTO dto, OutputStream outStream) throws IOException { + final String result = dto.toString(); + outStream.write(result.getBytes()); + } + + @Override + public VendorToPassengerDTO decode(InputStream inStream) throws IOException { + final String serializedDTOs = new String(StreamUtils.getBytesWithoutClosing(inStream)); + return om.readValue(serializedDTOs, VendorToPassengerDTO.class); + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return Collections.emptyList(); + } + + @Override + public void verifyDeterministic() { + } +} +``` + +### Playground exercise + +You can find the complete code of this example in the playground window you can run and experiment with. + +One of the differences you will notice is that it also contains the part to output `PCollection` elements to the console. + +Do you also notice in what order elements of PCollection appear in the console? Why is that? You can also run the example several times to see if the output stays the same or changes. Review Comment: Remove ########## learning/tour-of-beam/learning-content/java/schema-based-transforms/coder/description.md: ########## @@ -0,0 +1,120 @@ +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Coder + +A Coder<T> defines how to encode and decode values of type T into byte streams. + +Coder instances are serialized during job creation and deserialized before use. This will generally be performed by serializing the object via Java Serialization. + +Coder classes for compound types are often composed of coder classes for types contains therein. The composition of Coder instances into a coder for the compound class is the subject of the Coder Provider type, which enables automatic generic composition of Coder classes within the CoderRegistry. See Coder Provider and CoderRegistry for more information about how coders are inferred. + +When you write your Custom code, you inherit from Coder and you have to implement its methods. +* `encode` - converting objects to bytes +* `decode` - converting bytes to objects +* `getCoderArguments` - If it is a `Coder` for a parameterized type, returns a list of `Coders` used for each of the parameters, in the same order in which they appear in the type signature of the parameterized type. +* `verifyDeterministic` - throw the `Coder.NonDeterministicException`, if the encoding is not deterministic. + +Entity `VendorToPassengerDTO` to work with pipeline: + +``` +@DefaultSchema(JavaFieldSchema.class) +class VendorToPassengerDTO { + @JsonProperty(value = "PassengerCount") + Integer PassengerCount; + @JsonProperty(value = "VendorID") + Integer VendorID; + + @SchemaCreate + public VendorToPassengerDTO(Integer passengerCount, Integer vendorID) { + this.PassengerCount = passengerCount; + this.VendorID = vendorID; + } + + public static VendorToPassengerDTO of(final Integer passengerCount, final Integer vendorID) { + return new VendorToPassengerDTO(passengerCount, vendorID); + } + + public void setPassengerCount(final Integer passengerCount) { + this.PassengerCount = passengerCount; + } + + public void setVendorID(final Integer vendorID) { + this.VendorID = vendorID; + } + + public Integer getVendorID() { + return this.VendorID; + } + + public Integer getPassengerCount() { + return this.PassengerCount; + } + + @Override + public String toString() { + return String.format("{\"PassengerCount\":%d,\"VendorID\":%d}", PassengerCount, VendorID); + } +} +``` + +Explicit `scheme` for converting to `Row`: +``` +final Schema schema = Schema.builder() + .addInt32Field("PassengerCount") + .addInt32Field("VendorID") + .build(); +``` + +`Coder` for `VendorToPassengerDTO`: +``` +class CustomCoderSecond extends Coder<VendorToPassengerDTO> { + final ObjectMapper om = new ObjectMapper(); + + private static final CustomCoderSecond INSTANCE = new CustomCoderSecond(); + + public static CustomCoderSecond of() { + return INSTANCE; + } + + @Override + public void encode(VendorToPassengerDTO dto, OutputStream outStream) throws IOException { + final String result = dto.toString(); + outStream.write(result.getBytes()); + } + + @Override + public VendorToPassengerDTO decode(InputStream inStream) throws IOException { + final String serializedDTOs = new String(StreamUtils.getBytesWithoutClosing(inStream)); + return om.readValue(serializedDTOs, VendorToPassengerDTO.class); + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return Collections.emptyList(); + } + + @Override + public void verifyDeterministic() { + } +} +``` + +### Playground exercise + +You can find the complete code of this example in the playground window you can run and experiment with. Review Comment: Excercise description and a small challenge to invite user to experiment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
