This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 75ea89a [SPARK-18161][PYTHON] Update cloudpickle to v0.6.1 75ea89a is described below commit 75ea89ad94ca76646e4697cf98c78d14c6e2695f Author: Boris Shminke <bo...@shminke.me> AuthorDate: Sat Feb 2 10:49:45 2019 +0800 [SPARK-18161][PYTHON] Update cloudpickle to v0.6.1 ## What changes were proposed in this pull request? In this PR we've done two things: 1) updated the Spark's copy of cloudpickle to 0.6.1 (current stable) The main reason Spark stayed with cloudpickle 0.4.x was that the default pickle protocol was changed in later versions. 2) started using pickle.HIGHEST_PROTOCOL for both Python 2 and Python 3 for serializers and broadcast [Pyrolite](https://github.com/irmen/Pyrolite) has such Pickle protocol version support: reading: 0,1,2,3,4; writing: 2. ## How was this patch tested? Jenkins tests. Authors: Sloane Simmons, Boris Shminke This contribution is original work of Sloane Simmons and Boris Shminke and they licensed it to the project under the project's open source license. Closes #20691 from inpefess/pickle_protocol_4. Lead-authored-by: Boris Shminke <bo...@shminke.me> Co-authored-by: singularperturbation <sloane...@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/broadcast.py | 4 +- python/pyspark/cloudpickle.py | 259 ++++++++++++++++++++++++++++----------- python/pyspark/serializers.py | 7 +- python/pyspark/tests/test_rdd.py | 2 +- 4 files changed, 194 insertions(+), 78 deletions(-) diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py index 43a5ead..cca64b5 100644 --- a/python/pyspark/broadcast.py +++ b/python/pyspark/broadcast.py @@ -23,7 +23,7 @@ import threading from pyspark.cloudpickle import print_exec from pyspark.java_gateway import local_connect_and_auth -from pyspark.serializers import ChunkedStream +from pyspark.serializers import ChunkedStream, pickle_protocol from pyspark.util import _exception_message if sys.version < '3': @@ -109,7 +109,7 @@ class Broadcast(object): def dump(self, value, f): try: - pickle.dump(value, f, 2) + pickle.dump(value, f, pickle_protocol) except pickle.PickleError: raise except Exception as e: diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py index 88519d7..bf92569 100644 --- a/python/pyspark/cloudpickle.py +++ b/python/pyspark/cloudpickle.py @@ -42,20 +42,26 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ from __future__ import print_function -import dis -from functools import partial -import imp import io -import itertools -import logging +import dis +import sys +import types import opcode -import operator import pickle import struct -import sys -import traceback -import types +import logging import weakref +import operator +import importlib +import itertools +import traceback +from functools import partial + + +# cloudpickle is meant for inter process communication: we expect all +# communicating processes to run the same Python version hence we favor +# communication speed over compatibility: +DEFAULT_PROTOCOL = pickle.HIGHEST_PROTOCOL if sys.version < '3': @@ -72,6 +78,22 @@ else: PY3 = True +# Container for the global namespace to ensure consistent unpickling of +# functions defined in dynamic modules (modules not registed in sys.modules). +_dynamic_modules_globals = weakref.WeakValueDictionary() + + +class _DynamicModuleFuncGlobals(dict): + """Global variables referenced by a function defined in a dynamic module + + To avoid leaking references we store such context in a WeakValueDictionary + instance. However instances of python builtin types such as dict cannot + be used directly as values in such a construct, hence the need for a + derived class. + """ + pass + + def _make_cell_set_template_code(): """Get the Python compiler to emit LOAD_FAST(arg); STORE_DEREF @@ -157,7 +179,7 @@ def cell_set(cell, value): )(value) -#relevant opcodes +# relevant opcodes STORE_GLOBAL = opcode.opmap['STORE_GLOBAL'] DELETE_GLOBAL = opcode.opmap['DELETE_GLOBAL'] LOAD_GLOBAL = opcode.opmap['LOAD_GLOBAL'] @@ -167,7 +189,7 @@ EXTENDED_ARG = dis.EXTENDED_ARG def islambda(func): - return getattr(func,'__name__') == '<lambda>' + return getattr(func, '__name__') == '<lambda>' _BUILTIN_TYPE_NAMES = {} @@ -248,7 +270,9 @@ class CloudPickler(Pickler): dispatch = Pickler.dispatch.copy() def __init__(self, file, protocol=None): - Pickler.__init__(self, file, protocol) + if protocol is None: + protocol = DEFAULT_PROTOCOL + Pickler.__init__(self, file, protocol=protocol) # set of modules to unpickle self.modules = set() # map ids to dictionary. used to ensure that functions can share global env @@ -267,42 +291,26 @@ class CloudPickler(Pickler): def save_memoryview(self, obj): self.save(obj.tobytes()) + dispatch[memoryview] = save_memoryview if not PY3: def save_buffer(self, obj): self.save(str(obj)) - dispatch[buffer] = save_buffer # noqa: F821 'buffer' was removed in Python 3 - - def save_unsupported(self, obj): - raise pickle.PicklingError("Cannot pickle objects of type %s" % type(obj)) - dispatch[types.GeneratorType] = save_unsupported - # itertools objects do not pickle! - for v in itertools.__dict__.values(): - if type(v) is type: - dispatch[v] = save_unsupported + dispatch[buffer] = save_buffer # noqa: F821 'buffer' was removed in Python 3 def save_module(self, obj): """ Save a module as an import """ - mod_name = obj.__name__ - # If module is successfully found then it is not a dynamically created module - if hasattr(obj, '__file__'): - is_dynamic = False - else: - try: - _find_module(mod_name) - is_dynamic = False - except ImportError: - is_dynamic = True - self.modules.add(obj) - if is_dynamic: - self.save_reduce(dynamic_subimport, (obj.__name__, vars(obj)), obj=obj) + if _is_dynamic(obj): + self.save_reduce(dynamic_subimport, (obj.__name__, vars(obj)), + obj=obj) else: self.save_reduce(subimport, (obj.__name__,), obj=obj) + dispatch[types.ModuleType] = save_module def save_codeobject(self, obj): @@ -323,6 +331,7 @@ class CloudPickler(Pickler): obj.co_firstlineno, obj.co_lnotab, obj.co_freevars, obj.co_cellvars ) self.save_reduce(types.CodeType, args, obj=obj) + dispatch[types.CodeType] = save_codeobject def save_function(self, obj, name=None): @@ -369,9 +378,14 @@ class CloudPickler(Pickler): if modname == '__main__': themodule = None + try: + lookedup_by_name = getattr(themodule, name, None) + except Exception: + lookedup_by_name = None + if themodule: self.modules.add(themodule) - if getattr(themodule, name, None) is obj: + if lookedup_by_name is obj: return self.save_global(obj, name) # a builtin_function_or_method which comes in as an attribute of some @@ -401,8 +415,7 @@ class CloudPickler(Pickler): return else: # func is nested - klass = getattr(themodule, name, None) - if klass is None or klass is not obj: + if lookedup_by_name is None or lookedup_by_name is not obj: self.save_function_tuple(obj) return @@ -416,6 +429,7 @@ class CloudPickler(Pickler): else: write(pickle.GLOBAL + modname + '\n' + name + '\n') self.memoize(obj) + dispatch[types.FunctionType] = save_function def _save_subimports(self, code, top_level_dependencies): @@ -423,19 +437,22 @@ class CloudPickler(Pickler): Ensure de-pickler imports any package child-modules that are needed by the function """ + # check if any known dependency is an imported package for x in top_level_dependencies: if isinstance(x, types.ModuleType) and hasattr(x, '__package__') and x.__package__: # check if the package has any currently loaded sub-imports prefix = x.__name__ + '.' - for name, module in sys.modules.items(): + # A concurrent thread could mutate sys.modules, + # make sure we iterate over a copy to avoid exceptions + for name in list(sys.modules): # Older versions of pytest will add a "None" module to sys.modules. if name is not None and name.startswith(prefix): # check whether the function can address the sub-module tokens = set(name[len(prefix):].split('.')) if not tokens - set(code.co_names): # ensure unpickler executes this import - self.save(module) + self.save(sys.modules[name]) # then discards the reference to it self.write(pickle.POP) @@ -450,6 +467,15 @@ class CloudPickler(Pickler): clsdict = dict(obj.__dict__) # copy dict proxy to a dict clsdict.pop('__weakref__', None) + # For ABCMeta in python3.7+, remove _abc_impl as it is not picklable. + # This is a fix which breaks the cache but this only makes the first + # calls to issubclass slower. + if "_abc_impl" in clsdict: + import abc + (registry, _, _, _) = abc._get_dump(obj) + clsdict["_abc_impl"] = [subclass_weakref() + for subclass_weakref in registry] + # On PyPy, __doc__ is a readonly attribute, so we need to include it in # the initial skeleton class. This is safe because we know that the # doc can't participate in a cycle with the original class. @@ -541,9 +567,13 @@ class CloudPickler(Pickler): 'globals': f_globals, 'defaults': defaults, 'dict': dct, - 'module': func.__module__, 'closure_values': closure_values, + 'module': func.__module__, + 'name': func.__name__, + 'doc': func.__doc__, } + if hasattr(func, '__annotations__') and sys.version_info >= (3, 7): + state['annotations'] = func.__annotations__ if hasattr(func, '__qualname__'): state['qualname'] = func.__qualname__ save(state) @@ -568,8 +598,7 @@ class CloudPickler(Pickler): # PyPy "builtin-code" object out_names = set() else: - out_names = set(names[oparg] - for op, oparg in _walk_global_ops(co)) + out_names = {names[oparg] for _, oparg in _walk_global_ops(co)} # see if nested function have any global refs if co.co_consts: @@ -610,7 +639,16 @@ class CloudPickler(Pickler): # save the dict dct = func.__dict__ - base_globals = self.globals_ref.get(id(func.__globals__), {}) + base_globals = self.globals_ref.get(id(func.__globals__), None) + if base_globals is None: + # For functions defined in a well behaved module use + # vars(func.__module__) for base_globals. This is necessary to + # share the global variables across multiple pickled functions from + # this module. + if hasattr(func, '__module__') and func.__module__ is not None: + base_globals = func.__module__ + else: + base_globals = {} self.globals_ref[id(func.__globals__)] = base_globals return (code, f_globals, defaults, closure, dct, base_globals) @@ -619,6 +657,7 @@ class CloudPickler(Pickler): if obj.__module__ == "__builtin__": return self.save_global(obj) return self.save_function(obj) + dispatch[types.BuiltinFunctionType] = save_builtin_function def save_global(self, obj, name=None, pack=struct.pack): @@ -628,6 +667,13 @@ class CloudPickler(Pickler): The name of this method is somewhat misleading: all types get dispatched here. """ + if obj is type(None): + return self.save_reduce(type, (None,), obj=obj) + elif obj is type(Ellipsis): + return self.save_reduce(type, (Ellipsis,), obj=obj) + elif obj is type(NotImplemented): + return self.save_reduce(type, (NotImplemented,), obj=obj) + if obj.__module__ == "__main__": return self.save_dynamic_class(obj) @@ -657,7 +703,8 @@ class CloudPickler(Pickler): self.save_reduce(types.MethodType, (obj.__func__, obj.__self__), obj=obj) else: self.save_reduce(types.MethodType, (obj.__func__, obj.__self__, obj.__self__.__class__), - obj=obj) + obj=obj) + dispatch[types.MethodType] = save_instancemethod def save_inst(self, obj): @@ -711,11 +758,13 @@ class CloudPickler(Pickler): def save_property(self, obj): # properties not correctly saved in python self.save_reduce(property, (obj.fget, obj.fset, obj.fdel, obj.__doc__), obj=obj) + dispatch[property] = save_property def save_classmethod(self, obj): orig_func = obj.__func__ self.save_reduce(type(obj), (orig_func,), obj=obj) + dispatch[classmethod] = save_classmethod dispatch[staticmethod] = save_classmethod @@ -726,7 +775,7 @@ class CloudPickler(Pickler): return item items = obj(Dummy()) if not isinstance(items, tuple): - items = (items, ) + items = (items,) return self.save_reduce(operator.itemgetter, items) if type(operator.itemgetter) is type: @@ -757,16 +806,16 @@ class CloudPickler(Pickler): def save_file(self, obj): """Save a file""" try: - import StringIO as pystringIO #we can't use cStringIO as it lacks the name attribute + import StringIO as pystringIO # we can't use cStringIO as it lacks the name attribute except ImportError: import io as pystringIO - if not hasattr(obj, 'name') or not hasattr(obj, 'mode'): + if not hasattr(obj, 'name') or not hasattr(obj, 'mode'): raise pickle.PicklingError("Cannot pickle files that do not map to an actual file") if obj is sys.stdout: - return self.save_reduce(getattr, (sys,'stdout'), obj=obj) + return self.save_reduce(getattr, (sys, 'stdout'), obj=obj) if obj is sys.stderr: - return self.save_reduce(getattr, (sys,'stderr'), obj=obj) + return self.save_reduce(getattr, (sys, 'stderr'), obj=obj) if obj is sys.stdin: raise pickle.PicklingError("Cannot pickle standard input") if obj.closed: @@ -845,6 +894,7 @@ def is_tornado_coroutine(func): return False return gen.is_coroutine_function(func) + def _rebuild_tornado_coroutine(func): from tornado import gen return gen.coroutine(func) @@ -852,36 +902,55 @@ def _rebuild_tornado_coroutine(func): # Shorthands for legacy support -def dump(obj, file, protocol=2): - CloudPickler(file, protocol).dump(obj) +def dump(obj, file, protocol=None): + """Serialize obj as bytes streamed into file + protocol defaults to cloudpickle.DEFAULT_PROTOCOL which is an alias to + pickle.HIGHEST_PROTOCOL. This setting favors maximum communication speed + between processes running the same Python version. + + Set protocol=pickle.DEFAULT_PROTOCOL instead if you need to ensure + compatibility with older versions of Python. + """ + CloudPickler(file, protocol=protocol).dump(obj) -def dumps(obj, protocol=2): + +def dumps(obj, protocol=None): + """Serialize obj as a string of bytes allocated in memory + + protocol defaults to cloudpickle.DEFAULT_PROTOCOL which is an alias to + pickle.HIGHEST_PROTOCOL. This setting favors maximum communication speed + between processes running the same Python version. + + Set protocol=pickle.DEFAULT_PROTOCOL instead if you need to ensure + compatibility with older versions of Python. + """ file = StringIO() try: - cp = CloudPickler(file,protocol) + cp = CloudPickler(file, protocol=protocol) cp.dump(obj) return file.getvalue() finally: file.close() + # including pickles unloading functions in this namespace load = pickle.load loads = pickle.loads -#hack for __import__ not working as desired +# hack for __import__ not working as desired def subimport(name): __import__(name) return sys.modules[name] def dynamic_subimport(name, vars): - mod = imp.new_module(name) + mod = types.ModuleType(name) mod.__dict__.update(vars) - sys.modules[name] = mod return mod + # restores function attributes def _restore_attr(obj, attr): for key, val in attr.items(): @@ -908,7 +977,7 @@ def _modules_to_main(modList): if type(modname) is str: try: mod = __import__(modname) - except Exception as e: + except Exception: sys.stderr.write('warning: could not import %s\n. ' 'Your function may unexpectedly error due to this import failing;' 'A version mismatch is likely. Specific error was:\n' % modname) @@ -917,7 +986,7 @@ def _modules_to_main(modList): setattr(main, mod.__name__, mod) -#object generators: +# object generators: def _genpartial(func, args, kwds): if not args: args = () @@ -925,9 +994,11 @@ def _genpartial(func, args, kwds): kwds = {} return partial(func, *args, **kwds) + def _gen_ellipsis(): return Ellipsis + def _gen_not_implemented(): return NotImplemented @@ -988,9 +1059,19 @@ def _fill_function(*args): else: raise ValueError('Unexpected _fill_value arguments: %r' % (args,)) - func.__globals__.update(state['globals']) + # Only set global variables that do not exist. + for k, v in state['globals'].items(): + if k not in func.__globals__: + func.__globals__[k] = v + func.__defaults__ = state['defaults'] func.__dict__ = state['dict'] + if 'annotations' in state: + func.__annotations__ = state['annotations'] + if 'doc' in state: + func.__doc__ = state['doc'] + if 'name' in state: + func.__name__ = state['name'] if 'module' in state: func.__module__ = state['module'] if 'qualname' in state: @@ -1021,6 +1102,20 @@ def _make_skel_func(code, cell_count, base_globals=None): """ if base_globals is None: base_globals = {} + elif isinstance(base_globals, str): + base_globals_name = base_globals + try: + # First try to reuse the globals from the module containing the + # function. If it is not possible to retrieve it, fallback to an + # empty dictionary. + base_globals = vars(importlib.import_module(base_globals)) + except ImportError: + base_globals = _dynamic_modules_globals.get( + base_globals_name, None) + if base_globals is None: + base_globals = _DynamicModuleFuncGlobals() + _dynamic_modules_globals[base_globals_name] = base_globals + base_globals['__builtins__'] = __builtins__ closure = ( @@ -1036,28 +1131,50 @@ def _rehydrate_skeleton_class(skeleton_class, class_dict): See CloudPickler.save_dynamic_class for more info. """ + registry = None for attrname, attr in class_dict.items(): - setattr(skeleton_class, attrname, attr) + if attrname == "_abc_impl": + registry = attr + else: + setattr(skeleton_class, attrname, attr) + if registry is not None: + for subclass in registry: + skeleton_class.register(subclass) + return skeleton_class -def _find_module(mod_name): +def _is_dynamic(module): """ - Iterate over each part instead of calling imp.find_module directly. - This function is able to find submodules (e.g. sickit.tree) + Return True if the module is special module that cannot be imported by its + name. """ - path = None - for part in mod_name.split('.'): - if path is not None: - path = [path] - file, path, description = imp.find_module(part, path) - if file is not None: - file.close() - return path, description + # Quick check: module that have __file__ attribute are not dynamic modules. + if hasattr(module, '__file__'): + return False + + if hasattr(module, '__spec__'): + return module.__spec__ is None + else: + # Backward compat for Python 2 + import imp + try: + path = None + for part in module.__name__.split('.'): + if path is not None: + path = [path] + f, path, description = imp.find_module(part, path) + if f is not None: + f.close() + except ImportError: + return True + return False + """Constructors for 3rd party libraries Note: These can never be renamed due to client compatibility issues""" + def _getobject(modname, attribute): mod = __import__(modname, fromlist=[attribute]) return mod.__dict__[attribute] diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 1d17053..3db2595 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -61,12 +61,11 @@ import itertools if sys.version < '3': import cPickle as pickle - protocol = 2 from itertools import izip as zip, imap as map else: import pickle - protocol = 3 xrange = range +pickle_protocol = pickle.HIGHEST_PROTOCOL from pyspark import cloudpickle from pyspark.util import _exception_message @@ -617,7 +616,7 @@ class PickleSerializer(FramedSerializer): """ def dumps(self, obj): - return pickle.dumps(obj, protocol) + return pickle.dumps(obj, pickle_protocol) if sys.version >= '3': def loads(self, obj, encoding="bytes"): @@ -631,7 +630,7 @@ class CloudPickleSerializer(PickleSerializer): def dumps(self, obj): try: - return cloudpickle.dumps(obj, 2) + return cloudpickle.dumps(obj, pickle_protocol) except pickle.PickleError: raise except Exception as e: diff --git a/python/pyspark/tests/test_rdd.py b/python/pyspark/tests/test_rdd.py index bac2f78..9e4ac6c 100644 --- a/python/pyspark/tests/test_rdd.py +++ b/python/pyspark/tests/test_rdd.py @@ -605,7 +605,7 @@ class RDDTests(ReusedPySparkTestCase): def test_external_group_by_key(self): self.sc._conf.set("spark.python.worker.memory", "1m") - N = 200001 + N = 2000001 kv = self.sc.parallelize(xrange(N)).map(lambda x: (x % 3, x)) gkv = kv.groupByKey().cache() self.assertEqual(3, gkv.count()) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org