[FLINK-2689] [runtime] Fix reuse of null object for solution set Joins and 
CoGroups.

This closes #1136


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0e5cdfb3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0e5cdfb3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0e5cdfb3

Branch: refs/heads/release-0.10.0-milestone-1
Commit: 0e5cdfb30cbdd4fbfded4644706fa9b85a956451
Parents: 7a11a90
Author: Fabian Hueske <fhue...@apache.org>
Authored: Wed Sep 16 16:56:06 2015 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Thu Sep 17 11:56:09 2015 +0200

----------------------------------------------------------------------
 .../runtime/operators/CoGroupWithSolutionSetFirstDriver.java   | 6 +++---
 .../runtime/operators/CoGroupWithSolutionSetSecondDriver.java  | 6 +++---
 .../runtime/operators/JoinWithSolutionSetFirstDriver.java      | 4 ++--
 .../runtime/operators/JoinWithSolutionSetSecondDriver.java     | 4 ++--
 4 files changed, 10 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0e5cdfb3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
index b27b6b9..97d6e51 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
@@ -175,9 +175,9 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, 
OT> implements Resettab
                                while (this.running && 
probeSideInput.nextKey()) {
                                        IT2 current = 
probeSideInput.getCurrent();
 
-                                       buildSideRecord = 
prober.getMatchFor(current, buildSideRecord);
-                                       if (buildSideRecord != null) {
-                                               siIter.set(buildSideRecord);
+                                       IT1 matchedRecord = 
prober.getMatchFor(current, buildSideRecord);
+                                       if (matchedRecord != null) {
+                                               siIter.set(matchedRecord);
                                                coGroupStub.coGroup(siIter, 
probeSideInput.getValues(), collector);
                                        } else {
                                                
coGroupStub.coGroup(emptySolutionSide, probeSideInput.getValues(), collector);

http://git-wip-us.apache.org/repos/asf/flink/blob/0e5cdfb3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
index ba0f8f9..9e8a81c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
@@ -174,9 +174,9 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, 
OT> implements Resetta
                                while (this.running && 
probeSideInput.nextKey()) {
                                        IT1 current = 
probeSideInput.getCurrent();
 
-                                       buildSideRecord = 
prober.getMatchFor(current, buildSideRecord);
-                                       if (buildSideRecord != null) {
-                                               siIter.set(buildSideRecord);
+                                       IT2 matchedRecord = 
prober.getMatchFor(current, buildSideRecord);
+                                       if (matchedRecord != null) {
+                                               siIter.set(matchedRecord);
                                                
coGroupStub.coGroup(probeSideInput.getValues(), siIter, collector);
                                        } else {
                                                
coGroupStub.coGroup(probeSideInput.getValues(), emptySolutionSide, collector);

http://git-wip-us.apache.org/repos/asf/flink/blob/0e5cdfb3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
index a1c8a4a..fe926cb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
@@ -166,8 +166,8 @@ public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> 
implements ResettableP
                                IT1 buildSideRecord = this.solutionSideRecord;
 
                                while (this.running && ((probeSideRecord = 
probeSideInput.next(probeSideRecord)) != null)) {
-                                       buildSideRecord = 
prober.getMatchFor(probeSideRecord, buildSideRecord);
-                                       joinFunction.join(buildSideRecord, 
probeSideRecord, collector);
+                                       IT1 matchedRecord = 
prober.getMatchFor(probeSideRecord, buildSideRecord);
+                                       joinFunction.join(matchedRecord, 
probeSideRecord, collector);
                                }
                        } else if (objectMap != null) {
                                final JoinHashMap<IT1> hashTable = 
this.objectMap;

http://git-wip-us.apache.org/repos/asf/flink/blob/0e5cdfb3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
index 32a75dc..20079fc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
@@ -168,8 +168,8 @@ public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> 
implements Resettable
                                IT2 buildSideRecord = this.solutionSideRecord;
 
                                while (this.running && ((probeSideRecord = 
probeSideInput.next(probeSideRecord)) != null)) {
-                                       buildSideRecord = 
prober.getMatchFor(probeSideRecord, buildSideRecord);
-                                       joinFunction.join(probeSideRecord, 
buildSideRecord, collector);
+                                       IT2 matchedRecord = 
prober.getMatchFor(probeSideRecord, buildSideRecord);
+                                       joinFunction.join(probeSideRecord, 
matchedRecord, collector);
                                }
                        } else if (objectMap != null) {
                                final JoinHashMap<IT2> hashTable = 
this.objectMap;

Reply via email to