This is an automated email from the ASF dual-hosted git repository. goenka pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new b73fcc6 [BEAM-11113] Switch default pickler compressor back to zlib for Coders in python sdk new 38feb03 Merge pull request #13183 from y1chi/BEAM-11113 b73fcc6 is described below commit b73fcc6502624a1b19db28ec108db4a1424df2d4 Author: Yichi Zhang <zyi...@google.com> AuthorDate: Fri Oct 23 13:41:37 2020 -0700 [BEAM-11113] Switch default pickler compressor back to zlib for Coders in python sdk --- sdks/python/apache_beam/coders/coders.py | 5 +++-- sdks/python/apache_beam/internal/pickler.py | 19 +++++++++++++++---- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index bc35779..1641ae1 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -107,12 +107,13 @@ ConstructorFn = Callable[[Optional[Any], List['Coder'], 'PipelineContext'], Any] def serialize_coder(coder): from apache_beam.internal import pickler return b'%s$%s' % ( - coder.__class__.__name__.encode('utf-8'), pickler.dumps(coder)) + coder.__class__.__name__.encode('utf-8'), + pickler.dumps(coder, use_zlib=True)) def deserialize_coder(serialized): from apache_beam.internal import pickler - return pickler.loads(serialized.split(b'$', 1)[1]) + return pickler.loads(serialized.split(b'$', 1)[1], use_zlib=True) # pylint: enable=wrong-import-order, wrong-import-position diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index c4bfb44..395d511 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -39,6 +39,7 @@ import sys import threading import traceback import types +import zlib from typing import Any from typing import Dict from typing import Tuple @@ -241,7 +242,7 @@ if 'save_module' in dir(dill.dill): logging.getLogger('dill').setLevel(logging.WARN) -def dumps(o, enable_trace=True): +def dumps(o, enable_trace=True, use_zlib=False): # type: (...) -> bytes """For internal use only; no backwards-compatibility guarantees.""" @@ -260,18 +261,28 @@ def dumps(o, enable_trace=True): # Compress as compactly as possible (compresslevel=9) to decrease peak memory # usage (of multiple in-memory copies) and to avoid hitting protocol buffer # limits. - c = bz2.compress(s, compresslevel=9) + # WARNING: Be cautious about compressor change since it can lead to pipeline + # representation change, and can break streaming job update compatibility on + # runners such as Dataflow. + if use_zlib: + c = zlib.compress(s, 9) + else: + c = bz2.compress(s, compresslevel=9) del s # Free up some possibly large and no-longer-needed memory. return base64.b64encode(c) -def loads(encoded, enable_trace=True): +def loads(encoded, enable_trace=True, use_zlib=False): """For internal use only; no backwards-compatibility guarantees.""" c = base64.b64decode(encoded) - s = bz2.decompress(c) + if use_zlib: + s = zlib.decompress(c) + else: + s = bz2.decompress(c) + del c # Free up some possibly large and no-longer-needed memory. with _pickle_lock_unless_py2: