Revert "[FLINK-4154] [core] Correction of murmur hash breaks backwards compatibility"
This reverts commit 81cf2296683a473db4061dd3bed0aeb249e05058. We had an incorrent implementation of Murmur hash in Flink 1.0. This was fixed in 641a0d4 for Flink 1.1. Then we thought that we need to revert this in order to ensure backwards compatability between Flink 1.0 and 1.1 savepoints (81cf22). Turns out, savepoint backwards compatability is broken for other reasons, too. Therefore, we revert 81cf22 here, ending up with a correct implementation of Murmur hash again. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ac7028e7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ac7028e7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ac7028e7 Branch: refs/heads/release-1.1 Commit: ac7028e7a341973e045dff6d02214e9546de5ed2 Parents: 12bf7c1 Author: Ufuk Celebi <u...@apache.org> Authored: Mon Aug 1 17:57:18 2016 +0200 Committer: Ufuk Celebi <u...@apache.org> Committed: Tue Aug 2 20:24:35 2016 +0200 ---------------------------------------------------------------------- .../apache/flink/storm/tests/StormFieldsGroupingITCase.java | 6 +++--- flink-core/src/main/java/org/apache/flink/util/MathUtils.java | 5 +---- .../flink/streaming/api/scala/StreamingOperatorsITCase.scala | 3 ++- .../flink/test/streaming/api/StreamingOperatorsITCase.java | 3 ++- 4 files changed, 8 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ac7028e7/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java index a0121c7..b43b24d 100644 --- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java +++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java @@ -49,9 +49,9 @@ public class StormFieldsGroupingITCase extends StreamingProgramTestBase { @Override protected void postSubmit() throws Exception { - compareResultsByLinesInMemory("3> -1155484576\n" + "3> 1033096058\n" + "3> -1930858313\n" + - "3> 1431162155\n" + "4> -1557280266\n" + "4> -1728529858\n" + "4> 1654374947\n" + - "4> -65105105\n" + "4> -518907128\n" + "4> -252332814\n", this.resultPath); + compareResultsByLinesInMemory("4> -1155484576\n" + "3> 1033096058\n" + "3> -1930858313\n" + + "4> 1431162155\n" + "3> -1557280266\n" + "4> -1728529858\n" + "3> 1654374947\n" + + "3> -65105105\n" + "3> -518907128\n" + "4> -252332814\n", this.resultPath); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/ac7028e7/flink-core/src/main/java/org/apache/flink/util/MathUtils.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java index e9d9df7..f40c83a 100644 --- a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java @@ -135,10 +135,7 @@ public final class MathUtils { code *= 0x1b873593; code = Integer.rotateLeft(code, 13); - // By the MurmurHash algorithm the following should be "code = code * 5 + 0xe6546b64;" (see FLINK-3623) - // but correcting the algorithm is a breaking change (see FLINK-4154). The effect of the resulting skew - // increases with increased parallelism (see FLINK-4154). - code *= 0xe6546b64; + code = code * 5 + 0xe6546b64; code ^= 4; code ^= code >>> 16; http://git-wip-us.apache.org/repos/asf/flink/blob/ac7028e7/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala index 5579797..d353468 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala @@ -74,7 +74,8 @@ class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase { override def run(ctx: SourceContext[(Int, Int)]): Unit = { 0 until numElements foreach { - i => ctx.collect((MathUtils.murmurHash(i) % numKeys, i)) + // keys '1' and '2' hash to different buckets + i => ctx.collect((1 + (MathUtils.murmurHash(i)) % numKeys, i)) } } http://git-wip-us.apache.org/repos/asf/flink/blob/ac7028e7/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java index 0c0db08..5d99de4 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java @@ -223,7 +223,8 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase @Override public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { for (int i = 0; i < numElements; i++) { - Tuple2<Integer, Integer> result = new Tuple2<>(MathUtils.murmurHash(i) % numKeys, i); + // keys '1' and '2' hash to different buckets + Tuple2<Integer, Integer> result = new Tuple2<>(1 + (MathUtils.murmurHash(i) % numKeys), i); ctx.collect(result); } }