claudevdm commented on code in PR #36271:
URL: https://github.com/apache/beam/pull/36271#discussion_r2392187581


##########
sdks/python/apache_beam/internal/cloudpickle_pickler.py:
##########
@@ -196,12 +196,35 @@ def _lock_reducer(obj):
 
 
 def dump_session(file_path):
-  # It is possible to dump session with cloudpickle. However, since references
-  # are saved it should not be necessary. See 
https://s.apache.org/beam-picklers
-  pass
+  # Since References are saved (https://s.apache.org/beam-picklers), we only
+  # dump supported Beam Registries (currently only logical type registry)
+  from apache_beam.typehints import schemas
+  from apache_beam.coders import typecoders

Review Comment:
   Can you please explain how coder registry is related here? Is it a separate 
issue or the same logical type issue?
   
   I believe all custom coders are fully pickled in the pipeline, and there are 
currently no problems with custom coders?



##########
sdks/python/apache_beam/internal/cloudpickle_pickler.py:
##########
@@ -196,12 +196,35 @@ def _lock_reducer(obj):
 
 
 def dump_session(file_path):
-  # It is possible to dump session with cloudpickle. However, since references
-  # are saved it should not be necessary. See 
https://s.apache.org/beam-picklers
-  pass
+  # Since References are saved (https://s.apache.org/beam-picklers), we only
+  # dump supported Beam Registries (currently only logical type registry)
+  from apache_beam.typehints import schemas
+  from apache_beam.coders import typecoders
+
+  with _pickle_lock, open(file_path, 'wb') as file:
+    coder_reg = typecoders.registry.get_custom_type_coder_tuples()
+    logicaltype_reg = schemas.LogicalType._known_logical_types.copy()
+
+    pickler = cloudpickle.CloudPickler(file)
+    # TODO(https://github.com/apache/beam/issues/18500) add file system 
registry
+    # once implemented
+    pickler.dump({"coder": coder_reg, "logicaltype": logicaltype_reg})

Review Comment:
   What do you think of only pickling/loading the coders that are not 
registered in schemas.py? Maybe it does not matter though, but everything 
registered in schemas.py will be registered on import of that file.



##########
CHANGES.md:
##########
@@ -94,6 +94,10 @@
 * PulsarIO has now changed support status from incomplete to experimental. 
Both read and writes should now minimally
   function (un-partitioned topics, without schema support, timestamp ordered 
messages for read) (Java)
   ([#36141](https://github.com/apache/beam/issues/36141)).
+* (Python) Logical type and coder registry are saved for pipelines with 
`save_main_session` pipeline option enabled in

Review Comment:
   I think we should use a new flag for this, or no flag at all (if we can 
infer that a user is using coders that are not registered in schemas.py). 
`save_main_session` is misleading, we are not really doing that.
   
   Can we determine if a coder was added from anywhere that is not schemas.py, 
and then automatically pickle only those coders that are not registered in 
schemas.py?
   
   A lot of users still have `save_main_session` set on their pipelines that 
used to run on dill and now we will pickle coder registry even if they don't 
really need this functionality.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to