Polber commented on code in PR #32956:
URL: https://github.com/apache/beam/pull/32956#discussion_r1826237987


##########
sdks/python/apache_beam/yaml/json_utils.py:
##########
@@ -267,3 +279,52 @@ def json_formater(
   convert = row_to_json(
       schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=beam_schema)))
   return lambda row: json.dumps(convert(row), sort_keys=True).encode('utf-8')
+
+
+def _validate_compatible(weak_schema, strong_schema):
+  if not weak_schema:
+    return

Review Comment:
   Similar to my other comment, but wouldn't this pass silently? I'm not sure 
how it would reach this state in the first place, but if the incoming 
PCollection does not have a schema (perhaps if preceded by a transform that 
does not output Row?), this would pass validation even if given a json schema 
to validate against.



##########
sdks/python/apache_beam/yaml/json_utils.py:
##########
@@ -267,3 +279,52 @@ def json_formater(
   convert = row_to_json(
       schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=beam_schema)))
   return lambda row: json.dumps(convert(row), sort_keys=True).encode('utf-8')
+
+
+def _validate_compatible(weak_schema, strong_schema):
+  if not weak_schema:
+    return
+  if weak_schema['type'] != strong_schema['type']:
+    raise ValueError(
+        'Incompatible types: %r vs %r' %
+        (weak_schema['type'] != strong_schema['type']))
+  if weak_schema['type'] == 'array':
+    _validate_compatible(weak_schema['items'], strong_schema['items'])
+  elif weak_schema == 'object':
+    for required in strong_schema.get('required', []):
+      if required not in weak_schema['properties']:
+        raise ValueError('Missing or unkown property %r' % required)
+    for name, spec in weak_schema.get('properties', {}):
+      if name in strong_schema['properties']:
+        try:
+          _validate_compatible(spec, strong_schema['properties'][name])
+        except Exception as exn:
+          raise ValueError('Incompatible schema for %r' % name) from exn
+      elif not strong_schema.get('additionalProperties'):
+        raise ValueError(
+            'Prohibited property: {property}; '
+            'perhaps additionalProperties: False is missing?')
+
+
+def row_validator(beam_schema: schema_pb2.Schema,
+                  json_schema: Dict[str, Any]) -> Callable[[Any], Any]:
+  """Returns a callable that will fail on elements not respecting json_schema.
+  """
+  if not json_schema:
+    return lambda x: None
+
+  # Validate that this compiles, but avoid pickling the validator itself.
+  _ = jsonschema.validators.validator_for(json_schema)(json_schema)
+  _validate_compatible(beam_schema_to_json_schema(beam_schema), json_schema)
+  validator_ptr = [None]
+
+  convert = row_to_json(
+      schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=beam_schema)))
+
+  def validate(row):
+    if validator_ptr[0] is None:
+      validator_ptr[0] = jsonschema.validators.validator_for(json_schema)(
+          json_schema)
+    validator_ptr[0].validate(convert(row))

Review Comment:
   I was recently doing something similar in Python and was informed about [PEP 
3104](http://www.python.org/dev/peps/pep-3104/)
   ```suggestion
       nonlocal validator_ptr
       if not validator_ptr:
         validator_ptr = jsonschema.validators.validator_for(json_schema)(
             json_schema)
       validator_ptr.validate(convert(row))
   ```



##########
sdks/python/apache_beam/yaml/json_utils.py:
##########
@@ -267,3 +279,52 @@ def json_formater(
   convert = row_to_json(
       schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=beam_schema)))
   return lambda row: json.dumps(convert(row), sort_keys=True).encode('utf-8')
+
+
+def _validate_compatible(weak_schema, strong_schema):
+  if not weak_schema:
+    return
+  if weak_schema['type'] != strong_schema['type']:
+    raise ValueError(
+        'Incompatible types: %r vs %r' %
+        (weak_schema['type'] != strong_schema['type']))
+  if weak_schema['type'] == 'array':
+    _validate_compatible(weak_schema['items'], strong_schema['items'])
+  elif weak_schema == 'object':
+    for required in strong_schema.get('required', []):
+      if required not in weak_schema['properties']:
+        raise ValueError('Missing or unkown property %r' % required)
+    for name, spec in weak_schema.get('properties', {}):
+      if name in strong_schema['properties']:
+        try:
+          _validate_compatible(spec, strong_schema['properties'][name])
+        except Exception as exn:
+          raise ValueError('Incompatible schema for %r' % name) from exn
+      elif not strong_schema.get('additionalProperties'):
+        raise ValueError(
+            'Prohibited property: {property}; '
+            'perhaps additionalProperties: False is missing?')
+
+
+def row_validator(beam_schema: schema_pb2.Schema,
+                  json_schema: Dict[str, Any]) -> Callable[[Any], Any]:
+  """Returns a callable that will fail on elements not respecting json_schema.
+  """
+  if not json_schema:
+    return lambda x: None

Review Comment:
   Presumably this never happens since json_schema is a required parameter to 
the Validate transform, but wouldn't this also imply that if a json_schema is 
not given, it will pass silently?
   
   Nit, but perhaps having an error or warn log on compilation here instead 
would make sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to