Revert "[FLINK-4154] [core] Correction of murmur hash breaks backwards 
compatibility"

This reverts commit 81cf2296683a473db4061dd3bed0aeb249e05058.

We had an incorrent implementation of Murmur hash in Flink 1.0. This
was fixed in 641a0d4 for Flink 1.1. Then we thought that we need to
revert this in order to ensure backwards compatability between Flink
1.0 and 1.1 savepoints (81cf22). Turns out, savepoint backwards
compatability is broken for other reasons, too. Therefore, we revert
81cf22 here, ending up with a correct implementation of Murmur hash
again.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ac7028e7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ac7028e7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ac7028e7

Branch: refs/heads/release-1.1
Commit: ac7028e7a341973e045dff6d02214e9546de5ed2
Parents: 12bf7c1
Author: Ufuk Celebi <u...@apache.org>
Authored: Mon Aug 1 17:57:18 2016 +0200
Committer: Ufuk Celebi <u...@apache.org>
Committed: Tue Aug 2 20:24:35 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/storm/tests/StormFieldsGroupingITCase.java    | 6 +++---
 flink-core/src/main/java/org/apache/flink/util/MathUtils.java  | 5 +----
 .../flink/streaming/api/scala/StreamingOperatorsITCase.scala   | 3 ++-
 .../flink/test/streaming/api/StreamingOperatorsITCase.java     | 3 ++-
 4 files changed, 8 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ac7028e7/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
index a0121c7..b43b24d 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
@@ -49,9 +49,9 @@ public class StormFieldsGroupingITCase extends 
StreamingProgramTestBase {
 
        @Override
        protected void postSubmit() throws Exception {
-               compareResultsByLinesInMemory("3> -1155484576\n" + "3> 
1033096058\n" + "3> -1930858313\n" +
-                       "3> 1431162155\n" + "4> -1557280266\n" + "4> 
-1728529858\n" + "4> 1654374947\n" +
-                       "4> -65105105\n" + "4> -518907128\n" + "4> 
-252332814\n", this.resultPath);
+               compareResultsByLinesInMemory("4> -1155484576\n" + "3> 
1033096058\n" + "3> -1930858313\n" +
+                       "4> 1431162155\n" + "3> -1557280266\n" + "4> 
-1728529858\n" + "3> 1654374947\n" +
+                       "3> -65105105\n" + "3> -518907128\n" + "4> 
-252332814\n", this.resultPath);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ac7028e7/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
index e9d9df7..f40c83a 100644
--- a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
@@ -135,10 +135,7 @@ public final class MathUtils {
                code *= 0x1b873593;
 
                code = Integer.rotateLeft(code, 13);
-               // By the MurmurHash algorithm the following should be "code = 
code * 5 + 0xe6546b64;" (see FLINK-3623)
-               // but correcting the algorithm is a breaking change (see 
FLINK-4154). The effect of the resulting skew
-               // increases with increased parallelism (see FLINK-4154).
-               code *= 0xe6546b64;
+               code = code * 5 + 0xe6546b64;
 
                code ^= 4;
                code ^= code >>> 16;

http://git-wip-us.apache.org/repos/asf/flink/blob/ac7028e7/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
index 5579797..d353468 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
@@ -74,7 +74,8 @@ class StreamingOperatorsITCase extends 
ScalaStreamingMultipleProgramsTestBase {
 
       override def run(ctx: SourceContext[(Int, Int)]): Unit = {
         0 until numElements foreach {
-          i => ctx.collect((MathUtils.murmurHash(i) % numKeys, i))
+          // keys '1' and '2' hash to different buckets
+          i => ctx.collect((1 + (MathUtils.murmurHash(i)) % numKeys, i))
         }
       }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ac7028e7/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
index 0c0db08..5d99de4 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
@@ -223,7 +223,8 @@ public class StreamingOperatorsITCase extends 
StreamingMultipleProgramsTestBase
                @Override
                public void run(SourceContext<Tuple2<Integer, Integer>> ctx) 
throws Exception {
                        for (int i = 0; i < numElements; i++) {
-                               Tuple2<Integer, Integer> result = new 
Tuple2<>(MathUtils.murmurHash(i) % numKeys, i);
+                               // keys '1' and '2' hash to different buckets
+                               Tuple2<Integer, Integer> result = new 
Tuple2<>(1 + (MathUtils.murmurHash(i) % numKeys), i);
                                ctx.collect(result);
                        }
                }

Reply via email to