damccorm commented on code in PR #35952: URL: https://github.com/apache/beam/pull/35952#discussion_r2301377499
########## website/www/site/content/en/documentation/sdks/yaml-schema.md: ########## @@ -0,0 +1,105 @@ +--- +type: languages +title: "Apache Beam YAML Schema" +--- +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# Beam YAML Schema + +As pipelines grow in size and complexity, it becomes more common to encounter +data that is malformed, doesn't meet preconditions, or otherwise causes issues +during processing. + +Beam YAML helps the user detect and capture these issues by using the optional +`output_schema` configuration, which is available for any transform in the YAML +SDK. For example, the following code creates a few "good" records and specifies +that the output schema from the `Create` transform should have records that +follow the expected schema: `sdk` as a string and `year` as an integer. + +```yaml +pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {sdk: MapReduce, year: 2004} + - {sdk: MillWheel, year: 2008} + output_schema: + type: object + properties: + sdk: + type: string + year: + type: integer + - type: AssertEqual + config: + elements: + - {sdk: MapReduce, year: 2004} + - {sdk: MillWheel, year: 2008} +``` + +However, a user will more likely want to detect and handle schema errors. This is where adding an `error_handling` configuration inside the `output_schema` comes into play. For example, the following code will Review Comment: If we stay with the current approach, we should call out the difference between normal error handling and schema error handling here. ########## sdks/python/apache_beam/yaml/json_utils.py: ########## @@ -287,8 +287,9 @@ def row_to_json(beam_type: schema_pb2.FieldType) -> Callable[[Any], Any]: for field in beam_type.row_type.schema.fields } return lambda row: { - name: convert(getattr(row, name)) + name: converted for (name, convert) in converters.items() + if (converted := convert(getattr(row, name, None))) is not None Review Comment: When would this be None? What condition are we guarding against? In theory I'd expect the previous conversion to be a bit more correct. ########## sdks/python/apache_beam/yaml/tests/assign_timestamps.yaml: ########## @@ -82,3 +82,118 @@ pipelines: config: elements: - {user: bob, timestamp: 3} + +# Assign timestamp to beam row element with error handling and output schema +# check. + - pipeline: + type: composite + transforms: + - type: Create + name: CreateVisits + config: + elements: + - {user: alice, timestamp: "not-valid"} + - {user: bob, timestamp: 3} + - type: AssignTimestamps + input: CreateVisits + config: + timestamp: timestamp + error_handling: + output: invalid_rows + output_schema: + type: object + properties: + user: + type: string + timestamp: + type: integer + error_handling: + output: invalid_schema_rows Review Comment: This could be done with a flatten step where you unify the error output from schema validation and normal exception handling. ########## sdks/python/apache_beam/yaml/tests/assign_timestamps.yaml: ########## @@ -82,3 +82,118 @@ pipelines: config: elements: - {user: bob, timestamp: 3} + +# Assign timestamp to beam row element with error handling and output schema +# check. + - pipeline: + type: composite + transforms: + - type: Create + name: CreateVisits + config: + elements: + - {user: alice, timestamp: "not-valid"} + - {user: bob, timestamp: 3} + - type: AssignTimestamps + input: CreateVisits + config: + timestamp: timestamp + error_handling: + output: invalid_rows + output_schema: + type: object + properties: + user: + type: string + timestamp: + type: integer + error_handling: + output: invalid_schema_rows Review Comment: Haven't gotten to the point where we actually implement this, but it seems to me like it might be more useful to just have a single error_handling output associated with a transform where we capture all problematic records. In most cases, a user is going to want to write these somewhere for manual inspection/reprocessing and I don't think it matters too much whether it is because the output is an error or the output doesn't match the expected schema. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
