Repository: spark
Updated Branches:
  refs/heads/branch-0.9 c93f4a0c0 -> ece411921


[SPARK-2494] [PySpark] make hash of None consistant cross machines

In CPython, hash of None is different cross machines, it will cause wrong 
result during shuffle. This PR will fix this.

Author: Davies Liu <davies....@gmail.com>

Closes #1371 from davies/hash_of_none and squashes the following commits:

d01745f [Davies Liu] add comments, remove outdated unit tests
5467141 [Davies Liu] disable hijack of hash, use it only for partitionBy()
b7118aa [Davies Liu] use __builtin__ instead of __builtins__
839e417 [Davies Liu] hijack hash to make hash of None consistant cross machines

(cherry picked from commit 872538c600a452ead52638c1ccba90643a9fa41c)
Signed-off-by: Matei Zaharia <ma...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ece41192
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ece41192
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ece41192

Branch: refs/heads/branch-0.9
Commit: ece411921c6442387c18707baaf5011698c8fc1d
Parents: c93f4a0
Author: Davies Liu <davies....@gmail.com>
Authored: Mon Jul 21 11:59:54 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Mon Jul 21 12:01:17 2014 -0700

----------------------------------------------------------------------
 python/pyspark/rdd.py | 35 ++++++++++++++++++++++++++++++++---
 1 file changed, 32 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ece41192/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 06a390b..6f876fc 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -43,6 +43,35 @@ from py4j.java_collections import ListConverter, MapConverter
 __all__ = ["RDD"]
 
 
+# TODO: for Python 3.3+, PYTHONHASHSEED should be reset to disable randomized
+# hash for string
+def portable_hash(x):
+    """
+    This function returns consistant hash code for builtin types, especially
+    for None and tuple with None.
+
+    The algrithm is similar to that one used by CPython 2.7
+
+    >>> portable_hash(None)
+    0
+    >>> portable_hash((None, 1))
+    219750521
+    """
+    if x is None:
+        return 0
+    if isinstance(x, tuple):
+        h = 0x345678
+        for i in x:
+            h ^= portable_hash(i)
+            h *= 1000003
+            h &= 0xffffffff
+        h ^= len(x)
+        if h == -1:
+            h = -2
+        return h
+    return hash(x)
+
+
 def _extract_concise_traceback():
     tb = traceback.extract_stack()
     if len(tb) == 0:
@@ -926,7 +955,9 @@ class RDD(object):
         return python_right_outer_join(self, other, numPartitions)
 
     # TODO: add option to control map-side combining
-    def partitionBy(self, numPartitions, partitionFunc=None):
+    # portable_hash is used as default, because builtin hash of None is 
different
+    # cross machines.
+    def partitionBy(self, numPartitions, partitionFunc=portable_hash):
         """
         Return a copy of the RDD partitioned using the specified partitioner.
 
@@ -938,8 +969,6 @@ class RDD(object):
         if numPartitions is None:
             numPartitions = self.ctx.defaultParallelism
 
-        if partitionFunc is None:
-            partitionFunc = lambda x: 0 if x is None else hash(x)
         # Transferring O(n) objects to Java is too expensive.  Instead, we'll
         # form the hash buckets in Python, transferring O(numPartitions) 
objects
         # to Java.  Each object is a (splitNumber, [objects]) pair.

Reply via email to