Polber commented on code in PR #32956:
URL: https://github.com/apache/beam/pull/32956#discussion_r1826237987
##########
sdks/python/apache_beam/yaml/json_utils.py:
##########
@@ -267,3 +279,52 @@ def json_formater(
convert = row_to_json(
schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=beam_schema)))
return lambda row: json.dumps(convert(row), sort_keys=True).encode('utf-8')
+
+
+def _validate_compatible(weak_schema, strong_schema):
+ if not weak_schema:
+ return
Review Comment:
Similar to my other comment, but wouldn't this pass silently? I'm not sure
how it would reach this state in the first place, but if the incoming
PCollection does not have a schema (perhaps if preceded by a transform that
does not output Row?), this would pass validation even if given a json schema
to validate against.
##########
sdks/python/apache_beam/yaml/json_utils.py:
##########
@@ -267,3 +279,52 @@ def json_formater(
convert = row_to_json(
schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=beam_schema)))
return lambda row: json.dumps(convert(row), sort_keys=True).encode('utf-8')
+
+
+def _validate_compatible(weak_schema, strong_schema):
+ if not weak_schema:
+ return
+ if weak_schema['type'] != strong_schema['type']:
+ raise ValueError(
+ 'Incompatible types: %r vs %r' %
+ (weak_schema['type'] != strong_schema['type']))
+ if weak_schema['type'] == 'array':
+ _validate_compatible(weak_schema['items'], strong_schema['items'])
+ elif weak_schema == 'object':
+ for required in strong_schema.get('required', []):
+ if required not in weak_schema['properties']:
+ raise ValueError('Missing or unkown property %r' % required)
+ for name, spec in weak_schema.get('properties', {}):
+ if name in strong_schema['properties']:
+ try:
+ _validate_compatible(spec, strong_schema['properties'][name])
+ except Exception as exn:
+ raise ValueError('Incompatible schema for %r' % name) from exn
+ elif not strong_schema.get('additionalProperties'):
+ raise ValueError(
+ 'Prohibited property: {property}; '
+ 'perhaps additionalProperties: False is missing?')
+
+
+def row_validator(beam_schema: schema_pb2.Schema,
+ json_schema: Dict[str, Any]) -> Callable[[Any], Any]:
+ """Returns a callable that will fail on elements not respecting json_schema.
+ """
+ if not json_schema:
+ return lambda x: None
+
+ # Validate that this compiles, but avoid pickling the validator itself.
+ _ = jsonschema.validators.validator_for(json_schema)(json_schema)
+ _validate_compatible(beam_schema_to_json_schema(beam_schema), json_schema)
+ validator_ptr = [None]
+
+ convert = row_to_json(
+ schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=beam_schema)))
+
+ def validate(row):
+ if validator_ptr[0] is None:
+ validator_ptr[0] = jsonschema.validators.validator_for(json_schema)(
+ json_schema)
+ validator_ptr[0].validate(convert(row))
Review Comment:
I was recently doing something similar in Python and was informed about [PEP
3104](http://www.python.org/dev/peps/pep-3104/)
```suggestion
nonlocal validator_ptr
if not validator_ptr:
validator_ptr = jsonschema.validators.validator_for(json_schema)(
json_schema)
validator_ptr.validate(convert(row))
```
##########
sdks/python/apache_beam/yaml/json_utils.py:
##########
@@ -267,3 +279,52 @@ def json_formater(
convert = row_to_json(
schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=beam_schema)))
return lambda row: json.dumps(convert(row), sort_keys=True).encode('utf-8')
+
+
+def _validate_compatible(weak_schema, strong_schema):
+ if not weak_schema:
+ return
+ if weak_schema['type'] != strong_schema['type']:
+ raise ValueError(
+ 'Incompatible types: %r vs %r' %
+ (weak_schema['type'] != strong_schema['type']))
+ if weak_schema['type'] == 'array':
+ _validate_compatible(weak_schema['items'], strong_schema['items'])
+ elif weak_schema == 'object':
+ for required in strong_schema.get('required', []):
+ if required not in weak_schema['properties']:
+ raise ValueError('Missing or unkown property %r' % required)
+ for name, spec in weak_schema.get('properties', {}):
+ if name in strong_schema['properties']:
+ try:
+ _validate_compatible(spec, strong_schema['properties'][name])
+ except Exception as exn:
+ raise ValueError('Incompatible schema for %r' % name) from exn
+ elif not strong_schema.get('additionalProperties'):
+ raise ValueError(
+ 'Prohibited property: {property}; '
+ 'perhaps additionalProperties: False is missing?')
+
+
+def row_validator(beam_schema: schema_pb2.Schema,
+ json_schema: Dict[str, Any]) -> Callable[[Any], Any]:
+ """Returns a callable that will fail on elements not respecting json_schema.
+ """
+ if not json_schema:
+ return lambda x: None
Review Comment:
Presumably this never happens since json_schema is a required parameter to
the Validate transform, but wouldn't this also imply that if a json_schema is
not given, it will pass silently?
Nit, but perhaps having an error or warn log on compilation here instead
would make sense.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]