claudevdm commented on code in PR #37855:
URL: https://github.com/apache/beam/pull/37855#discussion_r2962731002


##########
sdks/python/apache_beam/typehints/native_type_compatibility.py:
##########
@@ -176,8 +176,50 @@ def match_is_named_tuple(user_type):
       hasattr(user_type, '__annotations__') and hasattr(user_type, '_fields'))
 
 
-def match_is_dataclass(user_type):
-  return dataclasses.is_dataclass(user_type) and isinstance(user_type, type)
+def match_dataclass_for_row(user_type):
+  """Match whether the type is a dataclass handled by row coder.
+
+  For frozen dataclasses, only true when explicitly registered with row coder:
+
+    beam.coders.typecoders.registry.register_coder(
+        MyDataClass, beam.coders.RowCoder)
+
+  (for backward-compatibility reason).
+
+  For non-frozen dataclasses, default to true otherwise explicitly registered
+  with a coder other than the row coder.
+  """
+
+  if not dataclasses.is_dataclass(user_type):
+    return False
+
+  # pylint: disable=wrong-import-position
+  try:
+    from apache_beam.options.pipeline_options_context import 
get_pipeline_options  # pylint: disable=line-too-long
+  except AttributeError:
+    pass
+  else:
+    opts = get_pipeline_options()
+    if opts and opts.is_compat_version_prior_to("2.73.0"):
+      return False
+
+  is_frozen = user_type.__dataclass_params__.frozen
+  # avoid circular import
+  try:
+    from apache_beam.coders.typecoders import registry as coders_registry
+    from apache_beam.coders import RowCoder
+  except AttributeError:
+    # coder registery not yet initialized so it must be absent
+    return not is_frozen
+
+  if is_frozen:
+    return (
+        user_type in coders_registry._coders and
+        coders_registry._coders[user_type] == RowCoder)

Review Comment:
   Is this still necessary if we are already doing the 
is_compat_version_prior_to check?
   
   I guess if we do not want to change the users type from dataclass -> named 
tuple (unless explicitly using row coder) then this check makes sense. But not 
necessarily for upgrade compatibility.



##########
sdks/python/apache_beam/typehints/native_type_compatibility.py:
##########
@@ -176,8 +176,30 @@ def match_is_named_tuple(user_type):
       hasattr(user_type, '__annotations__') and hasattr(user_type, '_fields'))
 
 
-def match_is_dataclass(user_type):
-  return dataclasses.is_dataclass(user_type) and isinstance(user_type, type)
+def match_dataclass_for_row(user_type):

Review Comment:
   It wasn't clear to me why default coders were affected at all.
   
   So to be clear, coders are only changed when schema_from_element_type is 
called e.g. for union types (if it can be normalized 
https://github.com/apache/beam/blob/153875eaf2439b7afc7635c690b5a56e4dadd4b5/sdks/python/apache_beam/typehints/typehints.py#L612
   
   



##########
sdks/python/apache_beam/typehints/native_type_compatibility.py:
##########
@@ -176,8 +176,50 @@ def match_is_named_tuple(user_type):
       hasattr(user_type, '__annotations__') and hasattr(user_type, '_fields'))
 
 
-def match_is_dataclass(user_type):
-  return dataclasses.is_dataclass(user_type) and isinstance(user_type, type)
+def match_dataclass_for_row(user_type):
+  """Match whether the type is a dataclass handled by row coder.
+
+  For frozen dataclasses, only true when explicitly registered with row coder:
+
+    beam.coders.typecoders.registry.register_coder(
+        MyDataClass, beam.coders.RowCoder)
+
+  (for backward-compatibility reason).
+
+  For non-frozen dataclasses, default to true otherwise explicitly registered
+  with a coder other than the row coder.
+  """
+
+  if not dataclasses.is_dataclass(user_type):
+    return False
+
+  # pylint: disable=wrong-import-position
+  try:
+    from apache_beam.options.pipeline_options_context import 
get_pipeline_options  # pylint: disable=line-too-long
+  except AttributeError:
+    pass
+  else:
+    opts = get_pipeline_options()
+    if opts and opts.is_compat_version_prior_to("2.73.0"):

Review Comment:
   Can we add a test for this? Something like 
https://github.com/apache/beam/blob/153875eaf2439b7afc7635c690b5a56e4dadd4b5/sdks/python/apache_beam/coders/coders_test_common.py#L276



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to