Polber commented on code in PR #32956:
URL: https://github.com/apache/beam/pull/32956#discussion_r1826329038
##########
sdks/python/apache_beam/yaml/json_utils.py:
##########
@@ -267,3 +279,52 @@ def json_formater(
convert = row_to_json(
schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=beam_schema)))
return lambda row: json.dumps(convert(row), sort_keys=True).encode('utf-8')
+
+
+def _validate_compatible(weak_schema, strong_schema):
+ if not weak_schema:
+ return
+ if weak_schema['type'] != strong_schema['type']:
+ raise ValueError(
+ 'Incompatible types: %r vs %r' %
+ (weak_schema['type'] != strong_schema['type']))
+ if weak_schema['type'] == 'array':
+ _validate_compatible(weak_schema['items'], strong_schema['items'])
+ elif weak_schema == 'object':
+ for required in strong_schema.get('required', []):
+ if required not in weak_schema['properties']:
+ raise ValueError('Missing or unkown property %r' % required)
+ for name, spec in weak_schema.get('properties', {}):
+ if name in strong_schema['properties']:
+ try:
+ _validate_compatible(spec, strong_schema['properties'][name])
+ except Exception as exn:
+ raise ValueError('Incompatible schema for %r' % name) from exn
+ elif not strong_schema.get('additionalProperties'):
+ raise ValueError(
+ 'Prohibited property: {property}; '
+ 'perhaps additionalProperties: False is missing?')
+
+
+def row_validator(beam_schema: schema_pb2.Schema,
+ json_schema: Dict[str, Any]) -> Callable[[Any], Any]:
+ """Returns a callable that will fail on elements not respecting json_schema.
+ """
+ if not json_schema:
+ return lambda x: None
Review Comment:
Ah ok I was unaware of the `{}` case - thanks for the explanation!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]