[FLINK-2689] [runtime] Fix reuse of null object for solution set Joins and CoGroups.
This closes #1136 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0e5cdfb3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0e5cdfb3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0e5cdfb3 Branch: refs/heads/release-0.10.0-milestone-1 Commit: 0e5cdfb30cbdd4fbfded4644706fa9b85a956451 Parents: 7a11a90 Author: Fabian Hueske <fhue...@apache.org> Authored: Wed Sep 16 16:56:06 2015 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Thu Sep 17 11:56:09 2015 +0200 ---------------------------------------------------------------------- .../runtime/operators/CoGroupWithSolutionSetFirstDriver.java | 6 +++--- .../runtime/operators/CoGroupWithSolutionSetSecondDriver.java | 6 +++--- .../runtime/operators/JoinWithSolutionSetFirstDriver.java | 4 ++-- .../runtime/operators/JoinWithSolutionSetSecondDriver.java | 4 ++-- 4 files changed, 10 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0e5cdfb3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java index b27b6b9..97d6e51 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java @@ -175,9 +175,9 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements Resettab while (this.running && probeSideInput.nextKey()) { IT2 current = probeSideInput.getCurrent(); - buildSideRecord = prober.getMatchFor(current, buildSideRecord); - if (buildSideRecord != null) { - siIter.set(buildSideRecord); + IT1 matchedRecord = prober.getMatchFor(current, buildSideRecord); + if (matchedRecord != null) { + siIter.set(matchedRecord); coGroupStub.coGroup(siIter, probeSideInput.getValues(), collector); } else { coGroupStub.coGroup(emptySolutionSide, probeSideInput.getValues(), collector); http://git-wip-us.apache.org/repos/asf/flink/blob/0e5cdfb3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java index ba0f8f9..9e8a81c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java @@ -174,9 +174,9 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resetta while (this.running && probeSideInput.nextKey()) { IT1 current = probeSideInput.getCurrent(); - buildSideRecord = prober.getMatchFor(current, buildSideRecord); - if (buildSideRecord != null) { - siIter.set(buildSideRecord); + IT2 matchedRecord = prober.getMatchFor(current, buildSideRecord); + if (matchedRecord != null) { + siIter.set(matchedRecord); coGroupStub.coGroup(probeSideInput.getValues(), siIter, collector); } else { coGroupStub.coGroup(probeSideInput.getValues(), emptySolutionSide, collector); http://git-wip-us.apache.org/repos/asf/flink/blob/0e5cdfb3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java index a1c8a4a..fe926cb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java @@ -166,8 +166,8 @@ public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettableP IT1 buildSideRecord = this.solutionSideRecord; while (this.running && ((probeSideRecord = probeSideInput.next(probeSideRecord)) != null)) { - buildSideRecord = prober.getMatchFor(probeSideRecord, buildSideRecord); - joinFunction.join(buildSideRecord, probeSideRecord, collector); + IT1 matchedRecord = prober.getMatchFor(probeSideRecord, buildSideRecord); + joinFunction.join(matchedRecord, probeSideRecord, collector); } } else if (objectMap != null) { final JoinHashMap<IT1> hashTable = this.objectMap; http://git-wip-us.apache.org/repos/asf/flink/blob/0e5cdfb3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java index 32a75dc..20079fc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java @@ -168,8 +168,8 @@ public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resettable IT2 buildSideRecord = this.solutionSideRecord; while (this.running && ((probeSideRecord = probeSideInput.next(probeSideRecord)) != null)) { - buildSideRecord = prober.getMatchFor(probeSideRecord, buildSideRecord); - joinFunction.join(probeSideRecord, buildSideRecord, collector); + IT2 matchedRecord = prober.getMatchFor(probeSideRecord, buildSideRecord); + joinFunction.join(probeSideRecord, matchedRecord, collector); } } else if (objectMap != null) { final JoinHashMap<IT2> hashTable = this.objectMap;