This is an automated email from the ASF dual-hosted git repository.

goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b73fcc6  [BEAM-11113] Switch default pickler compressor back to zlib 
for Coders in python sdk
     new 38feb03  Merge pull request #13183 from y1chi/BEAM-11113
b73fcc6 is described below

commit b73fcc6502624a1b19db28ec108db4a1424df2d4
Author: Yichi Zhang <zyi...@google.com>
AuthorDate: Fri Oct 23 13:41:37 2020 -0700

    [BEAM-11113] Switch default pickler compressor back to zlib for Coders in 
python sdk
---
 sdks/python/apache_beam/coders/coders.py    |  5 +++--
 sdks/python/apache_beam/internal/pickler.py | 19 +++++++++++++++----
 2 files changed, 18 insertions(+), 6 deletions(-)

diff --git a/sdks/python/apache_beam/coders/coders.py 
b/sdks/python/apache_beam/coders/coders.py
index bc35779..1641ae1 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -107,12 +107,13 @@ ConstructorFn = Callable[[Optional[Any], List['Coder'], 
'PipelineContext'], Any]
 def serialize_coder(coder):
   from apache_beam.internal import pickler
   return b'%s$%s' % (
-      coder.__class__.__name__.encode('utf-8'), pickler.dumps(coder))
+      coder.__class__.__name__.encode('utf-8'),
+      pickler.dumps(coder, use_zlib=True))
 
 
 def deserialize_coder(serialized):
   from apache_beam.internal import pickler
-  return pickler.loads(serialized.split(b'$', 1)[1])
+  return pickler.loads(serialized.split(b'$', 1)[1], use_zlib=True)
 
 
 # pylint: enable=wrong-import-order, wrong-import-position
diff --git a/sdks/python/apache_beam/internal/pickler.py 
b/sdks/python/apache_beam/internal/pickler.py
index c4bfb44..395d511 100644
--- a/sdks/python/apache_beam/internal/pickler.py
+++ b/sdks/python/apache_beam/internal/pickler.py
@@ -39,6 +39,7 @@ import sys
 import threading
 import traceback
 import types
+import zlib
 from typing import Any
 from typing import Dict
 from typing import Tuple
@@ -241,7 +242,7 @@ if 'save_module' in dir(dill.dill):
 logging.getLogger('dill').setLevel(logging.WARN)
 
 
-def dumps(o, enable_trace=True):
+def dumps(o, enable_trace=True, use_zlib=False):
   # type: (...) -> bytes
 
   """For internal use only; no backwards-compatibility guarantees."""
@@ -260,18 +261,28 @@ def dumps(o, enable_trace=True):
   # Compress as compactly as possible (compresslevel=9) to decrease peak memory
   # usage (of multiple in-memory copies) and to avoid hitting protocol buffer
   # limits.
-  c = bz2.compress(s, compresslevel=9)
+  # WARNING: Be cautious about compressor change since it can lead to pipeline
+  # representation change, and can break streaming job update compatibility on
+  # runners such as Dataflow.
+  if use_zlib:
+    c = zlib.compress(s, 9)
+  else:
+    c = bz2.compress(s, compresslevel=9)
   del s  # Free up some possibly large and no-longer-needed memory.
 
   return base64.b64encode(c)
 
 
-def loads(encoded, enable_trace=True):
+def loads(encoded, enable_trace=True, use_zlib=False):
   """For internal use only; no backwards-compatibility guarantees."""
 
   c = base64.b64decode(encoded)
 
-  s = bz2.decompress(c)
+  if use_zlib:
+    s = zlib.decompress(c)
+  else:
+    s = bz2.decompress(c)
+
   del c  # Free up some possibly large and no-longer-needed memory.
 
   with _pickle_lock_unless_py2:

Reply via email to