claudevdm commented on code in PR #36271: URL: https://github.com/apache/beam/pull/36271#discussion_r2392187581
########## sdks/python/apache_beam/internal/cloudpickle_pickler.py: ########## @@ -196,12 +196,35 @@ def _lock_reducer(obj): def dump_session(file_path): - # It is possible to dump session with cloudpickle. However, since references - # are saved it should not be necessary. See https://s.apache.org/beam-picklers - pass + # Since References are saved (https://s.apache.org/beam-picklers), we only + # dump supported Beam Registries (currently only logical type registry) + from apache_beam.typehints import schemas + from apache_beam.coders import typecoders Review Comment: Can you please explain how coder registry is related here? Is it a separate issue or the same logical type issue? I believe all custom coders are fully pickled in the pipeline, and there are currently no problems with custom coders? ########## sdks/python/apache_beam/internal/cloudpickle_pickler.py: ########## @@ -196,12 +196,35 @@ def _lock_reducer(obj): def dump_session(file_path): - # It is possible to dump session with cloudpickle. However, since references - # are saved it should not be necessary. See https://s.apache.org/beam-picklers - pass + # Since References are saved (https://s.apache.org/beam-picklers), we only + # dump supported Beam Registries (currently only logical type registry) + from apache_beam.typehints import schemas + from apache_beam.coders import typecoders + + with _pickle_lock, open(file_path, 'wb') as file: + coder_reg = typecoders.registry.get_custom_type_coder_tuples() + logicaltype_reg = schemas.LogicalType._known_logical_types.copy() + + pickler = cloudpickle.CloudPickler(file) + # TODO(https://github.com/apache/beam/issues/18500) add file system registry + # once implemented + pickler.dump({"coder": coder_reg, "logicaltype": logicaltype_reg}) Review Comment: What do you think of only pickling/loading the coders that are not registered in schemas.py? Maybe it does not matter though, but everything registered in schemas.py will be registered on import of that file. ########## CHANGES.md: ########## @@ -94,6 +94,10 @@ * PulsarIO has now changed support status from incomplete to experimental. Both read and writes should now minimally function (un-partitioned topics, without schema support, timestamp ordered messages for read) (Java) ([#36141](https://github.com/apache/beam/issues/36141)). +* (Python) Logical type and coder registry are saved for pipelines with `save_main_session` pipeline option enabled in Review Comment: I think we should use a new flag for this, or no flag at all (if we can infer that a user is using coders that are not registered in schemas.py). `save_main_session` is misleading, we are not really doing that. Can we determine if a coder was added from anywhere that is not schemas.py, and then automatically pickle only those coders that are not registered in schemas.py? A lot of users still have `save_main_session` set on their pipelines that used to run on dill and now we will pickle coder registry even if they don't really need this functionality. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
