robertwb commented on code in PR #32956:
URL: https://github.com/apache/beam/pull/32956#discussion_r1826307057


##########
sdks/python/apache_beam/yaml/json_utils.py:
##########
@@ -267,3 +279,52 @@ def json_formater(
   convert = row_to_json(
       schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=beam_schema)))
   return lambda row: json.dumps(convert(row), sort_keys=True).encode('utf-8')
+
+
+def _validate_compatible(weak_schema, strong_schema):
+  if not weak_schema:
+    return
+  if weak_schema['type'] != strong_schema['type']:
+    raise ValueError(
+        'Incompatible types: %r vs %r' %
+        (weak_schema['type'] != strong_schema['type']))
+  if weak_schema['type'] == 'array':
+    _validate_compatible(weak_schema['items'], strong_schema['items'])
+  elif weak_schema == 'object':
+    for required in strong_schema.get('required', []):
+      if required not in weak_schema['properties']:
+        raise ValueError('Missing or unkown property %r' % required)
+    for name, spec in weak_schema.get('properties', {}):
+      if name in strong_schema['properties']:
+        try:
+          _validate_compatible(spec, strong_schema['properties'][name])
+        except Exception as exn:
+          raise ValueError('Incompatible schema for %r' % name) from exn
+      elif not strong_schema.get('additionalProperties'):
+        raise ValueError(
+            'Prohibited property: {property}; '
+            'perhaps additionalProperties: False is missing?')
+
+
+def row_validator(beam_schema: schema_pb2.Schema,
+                  json_schema: Dict[str, Any]) -> Callable[[Any], Any]:
+  """Returns a callable that will fail on elements not respecting json_schema.
+  """
+  if not json_schema:
+    return lambda x: None
+
+  # Validate that this compiles, but avoid pickling the validator itself.
+  _ = jsonschema.validators.validator_for(json_schema)(json_schema)
+  _validate_compatible(beam_schema_to_json_schema(beam_schema), json_schema)
+  validator_ptr = [None]
+
+  convert = row_to_json(
+      schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=beam_schema)))
+
+  def validate(row):
+    if validator_ptr[0] is None:
+      validator_ptr[0] = jsonschema.validators.validator_for(json_schema)(
+          json_schema)
+    validator_ptr[0].validate(convert(row))

Review Comment:
   Ah, yes. Good call. 



##########
sdks/python/apache_beam/yaml/json_utils.py:
##########
@@ -267,3 +279,52 @@ def json_formater(
   convert = row_to_json(
       schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=beam_schema)))
   return lambda row: json.dumps(convert(row), sort_keys=True).encode('utf-8')
+
+
+def _validate_compatible(weak_schema, strong_schema):
+  if not weak_schema:
+    return
+  if weak_schema['type'] != strong_schema['type']:
+    raise ValueError(
+        'Incompatible types: %r vs %r' %
+        (weak_schema['type'] != strong_schema['type']))
+  if weak_schema['type'] == 'array':
+    _validate_compatible(weak_schema['items'], strong_schema['items'])
+  elif weak_schema == 'object':
+    for required in strong_schema.get('required', []):
+      if required not in weak_schema['properties']:
+        raise ValueError('Missing or unkown property %r' % required)
+    for name, spec in weak_schema.get('properties', {}):
+      if name in strong_schema['properties']:
+        try:
+          _validate_compatible(spec, strong_schema['properties'][name])
+        except Exception as exn:
+          raise ValueError('Incompatible schema for %r' % name) from exn
+      elif not strong_schema.get('additionalProperties'):
+        raise ValueError(
+            'Prohibited property: {property}; '
+            'perhaps additionalProperties: False is missing?')
+
+
+def row_validator(beam_schema: schema_pb2.Schema,
+                  json_schema: Dict[str, Any]) -> Callable[[Any], Any]:
+  """Returns a callable that will fail on elements not respecting json_schema.
+  """
+  if not json_schema:
+    return lambda x: None

Review Comment:
   This handles the degenerate case where the schema is `{}`, which in json 
schema means "anything goes."
   
   Though one could ask why one would even have this transform at all, it's 
quite possible the schema is provided elsewhere and we want to handle this case 
gracefully (similar to how empty lists are handled despite being degenerate). 



##########
sdks/python/apache_beam/yaml/json_utils.py:
##########
@@ -267,3 +279,52 @@ def json_formater(
   convert = row_to_json(
       schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=beam_schema)))
   return lambda row: json.dumps(convert(row), sort_keys=True).encode('utf-8')
+
+
+def _validate_compatible(weak_schema, strong_schema):
+  if not weak_schema:
+    return

Review Comment:
   Again, weak_schema could be `{}`. This comes up in practice if we don't know 
anything about this part of the input (e.g. it's `Any`), which is also the 
fallback for `beam_type_to_json_type`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to