[FLINK-2576] Add outer join base operator.

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b222276
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b222276
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b222276

Branch: refs/heads/master
Commit: 6b2222762fc38d84b31170216d6b6ae0c272af9b
Parents: 0455857
Author: r-pogalz <r.pog...@campus.tu-berlin.de>
Authored: Tue Jul 7 21:40:04 2015 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Fri Oct 9 16:19:21 2015 +0200

----------------------------------------------------------------------
 .../operators/base/OuterJoinOperatorBase.java   | 314 +++++++++++++++++++
 .../base/OuterJoinOperatorBaseTest.java         | 150 +++++++++
 .../runtime/operators/FullOuterJoinDriver.java  |   2 +-
 .../runtime/operators/LeftOuterJoinDriver.java  |   2 +-
 .../runtime/operators/RightOuterJoinDriver.java |   2 +-
 .../sort/AbstractMergeOuterJoinIterator.java    |   3 +-
 .../sort/NonReusingMergeOuterJoinIterator.java  |   1 +
 .../sort/ReusingMergeOuterJoinIterator.java     |   1 +
 ...bstractSortMergeOuterJoinIteratorITCase.java |   2 +-
 ...ReusingSortMergeOuterJoinIteratorITCase.java |   2 +-
 ...ReusingSortMergeOuterJoinIteratorITCase.java |   2 +-
 11 files changed, 473 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
new file mode 100644
index 0000000..7666d10
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators.base;
+
+import org.apache.commons.collections.ResettableIterator;
+import org.apache.commons.collections.iterators.ListIteratorWrapper;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.CopyingListCollector;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.operators.util.ListKeyGroupedIterator;
+import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
+import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+public class OuterJoinOperatorBase<IN1, IN2, OUT, FT extends 
FlatJoinFunction<IN1, IN2, OUT>> extends AbstractJoinOperatorBase<IN1, IN2, 
OUT, FT> {
+
+       public static enum OuterJoinType {LEFT, RIGHT, FULL}
+
+       private OuterJoinType outerJoinType;
+
+       public OuterJoinOperatorBase(UserCodeWrapper<FT> udf, 
BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo,
+                       int[] keyPositions1, int[] keyPositions2, String name, 
OuterJoinType outerJoinType) {
+               super(udf, operatorInfo, keyPositions1, keyPositions2, name);
+               this.outerJoinType = outerJoinType;
+       }
+
+       public OuterJoinOperatorBase(FT udf, BinaryOperatorInformation<IN1, 
IN2, OUT> operatorInfo,
+                       int[] keyPositions1, int[] keyPositions2, String name, 
OuterJoinType outerJoinType) {
+               super(new UserCodeObjectWrapper<FT>(udf), operatorInfo, 
keyPositions1, keyPositions2, name);
+               this.outerJoinType = outerJoinType;
+       }
+
+       public OuterJoinOperatorBase(Class<? extends FT> udf, 
BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo,
+                       int[] keyPositions1, int[] keyPositions2, String name, 
OuterJoinType outerJoinType) {
+               super(new UserCodeClassWrapper<FT>(udf), operatorInfo, 
keyPositions1, keyPositions2, name);
+               this.outerJoinType = outerJoinType;
+       }
+
+       public void setOuterJoinType(OuterJoinType outerJoinType) {
+               this.outerJoinType = outerJoinType;
+       }
+
+       public OuterJoinType getOuterJoinType() {
+               return outerJoinType;
+       }
+
+       @Override
+       protected List<OUT> executeOnCollections(List<IN1> leftInput, List<IN2> 
rightInput, RuntimeContext runtimeContext, ExecutionConfig executionConfig) 
throws Exception {
+               TypeInformation<IN1> leftInformation = 
getOperatorInfo().getFirstInputType();
+               TypeInformation<IN2> rightInformation = 
getOperatorInfo().getSecondInputType();
+               TypeInformation<OUT> outInformation = 
getOperatorInfo().getOutputType();
+
+               TypeComparator<IN1> leftComparator = buildComparatorFor(0, 
executionConfig, leftInformation);
+               TypeComparator<IN2> rightComparator = buildComparatorFor(1, 
executionConfig, rightInformation);
+
+               TypeSerializer<IN1> leftSerializer = 
leftInformation.createSerializer(executionConfig);
+               TypeSerializer<IN2> rightSerializer = 
rightInformation.createSerializer(executionConfig);
+
+               OuterJoinListIterator<IN1, IN2> outerJoinIterator =
+                               new OuterJoinListIterator<>(leftInput, 
leftSerializer, leftComparator,
+                                               rightInput, rightSerializer, 
rightComparator, outerJoinType);
+
+               // 
--------------------------------------------------------------------
+               // Run UDF
+               // 
--------------------------------------------------------------------
+               FlatJoinFunction<IN1, IN2, OUT> function = 
userFunction.getUserCodeObject();
+
+               FunctionUtils.setFunctionRuntimeContext(function, 
runtimeContext);
+               FunctionUtils.openFunction(function, this.parameters);
+
+
+               List<OUT> result = new ArrayList<>();
+               Collector<OUT> collector = new CopyingListCollector<>(result, 
outInformation.createSerializer(executionConfig));
+
+               while (outerJoinIterator.next()) {
+                       IN1 left = outerJoinIterator.getLeft();
+                       IN2 right = outerJoinIterator.getRight();
+                       function.join(left == null ? null : 
leftSerializer.copy(left), right == null ? null : rightSerializer.copy(right), 
collector);
+               }
+
+               return result;
+       }
+
+       @SuppressWarnings("unchecked")
+       private <T> TypeComparator<T> buildComparatorFor(int input, 
ExecutionConfig executionConfig, TypeInformation<T> typeInformation) {
+               TypeComparator<T> comparator;
+               if (typeInformation instanceof AtomicType) {
+                       comparator = ((AtomicType<T>) 
typeInformation).createComparator(true, executionConfig);
+               } else if (typeInformation instanceof CompositeType) {
+                       int[] keyPositions = getKeyColumns(input);
+                       boolean[] orders = new boolean[keyPositions.length];
+                       Arrays.fill(orders, true);
+
+                       comparator = ((CompositeType<T>) 
typeInformation).createComparator(keyPositions, orders, 0, executionConfig);
+               } else {
+                       throw new RuntimeException("Type information for input 
of type " + typeInformation.getClass()
+                                       .getCanonicalName() + " is not 
supported. Could not generate a comparator.");
+               }
+               return comparator;
+       }
+
+       private static class OuterJoinListIterator<IN1, IN2> {
+
+
+               private static enum MatchStatus {
+                       NONE_REMAINED, FIRST_REMAINED, SECOND_REMAINED, 
FIRST_EMPTY, SECOND_EMPTY
+               }
+
+               private OuterJoinType outerJoinType;
+
+               private ListKeyGroupedIterator<IN1> leftGroupedIterator;
+               private ListKeyGroupedIterator<IN2> rightGroupedIterator;
+               private Iterable<IN1> currLeftSubset;
+               private ResettableIterator currLeftIterator;
+               private Iterable<IN2> currRightSubset;
+               private ResettableIterator currRightIterator;
+
+               private MatchStatus matchStatus;
+               private GenericPairComparator<IN1, IN2> pairComparator;
+
+               private IN1 leftReturn;
+               private IN2 rightReturn;
+
+               public OuterJoinListIterator(List<IN1> leftInput, 
TypeSerializer<IN1> leftSerializer, final TypeComparator<IN1> leftComparator,
+                               List<IN2> rightInput, TypeSerializer<IN2> 
rightSerializer, final TypeComparator<IN2> rightComparator,
+                               OuterJoinType outerJoinType) {
+                       this.outerJoinType = outerJoinType;
+                       pairComparator = new 
GenericPairComparator<>(leftComparator, rightComparator);
+                       leftGroupedIterator = new 
ListKeyGroupedIterator<>(leftInput, leftSerializer, leftComparator);
+                       rightGroupedIterator = new 
ListKeyGroupedIterator<>(rightInput, rightSerializer, rightComparator);
+                       // 
----------------------------------------------------------------
+                       // Sort
+                       // 
----------------------------------------------------------------
+                       Collections.sort(leftInput, new Comparator<IN1>() {
+                               @Override
+                               public int compare(IN1 o1, IN1 o2) {
+                                       return leftComparator.compare(o1, o2);
+                               }
+                       });
+
+                       Collections.sort(rightInput, new Comparator<IN2>() {
+                               @Override
+                               public int compare(IN2 o1, IN2 o2) {
+                                       return rightComparator.compare(o1, o2);
+                               }
+                       });
+
+               }
+
+               @SuppressWarnings("unchecked")
+               private boolean next() throws IOException {
+                       boolean hasMoreElements;
+                       if ((currLeftIterator == null || 
!currLeftIterator.hasNext()) && (currRightIterator == null || 
!currRightIterator.hasNext())) {
+                               hasMoreElements = nextGroups(outerJoinType);
+                               if (hasMoreElements) {
+                                       if (outerJoinType != 
OuterJoinType.LEFT) {
+                                               currLeftIterator = new 
ListIteratorWrapper(currLeftSubset.iterator());
+                                       }
+                                       leftReturn = (IN1) 
currLeftIterator.next();
+                                       if (outerJoinType != 
OuterJoinType.RIGHT) {
+                                               currRightIterator = new 
ListIteratorWrapper(currRightSubset.iterator());
+                                       }
+                                       rightReturn = (IN2) 
currRightIterator.next();
+                                       return true;
+                               } else {
+                                       //no more elements
+                                       return false;
+                               }
+                       } else if (currLeftIterator.hasNext() && 
!currRightIterator.hasNext()) {
+                               leftReturn = (IN1) currLeftIterator.next();
+                               currRightIterator.reset();
+                               rightReturn = (IN2) currRightIterator.next();
+                               return true;
+                       } else {
+                               rightReturn = (IN2) currRightIterator.next();
+                               return true;
+                       }
+               }
+
+               private boolean nextGroups(OuterJoinType outerJoinType) throws 
IOException {
+                       if (outerJoinType == OuterJoinType.FULL) {
+                               return nextGroups();
+                       } else if (outerJoinType == OuterJoinType.LEFT) {
+                               boolean leftContainsElements = false;
+                               while (!leftContainsElements && nextGroups()) {
+                                       currLeftIterator = new 
ListIteratorWrapper(currLeftSubset.iterator());
+                                       if (currLeftIterator.next() != null) {
+                                               leftContainsElements = true;
+                                       }
+                                       currLeftIterator.reset();
+                               }
+                               return leftContainsElements;
+                       } else if (outerJoinType == OuterJoinType.RIGHT) {
+                               boolean rightContainsElements = false;
+                               while (!rightContainsElements && nextGroups()) {
+                                       currRightIterator = new 
ListIteratorWrapper(currRightSubset.iterator());
+                                       if (currRightIterator.next() != null) {
+                                               rightContainsElements = true;
+                                       }
+                                       currRightIterator.reset();
+                               }
+                               return rightContainsElements;
+                       } else {
+                               throw new IllegalArgumentException("Outer join 
of type '" + outerJoinType + "' not supported.");
+                       }
+               }
+
+               private boolean nextGroups() throws IOException {
+                       boolean firstEmpty = true;
+                       boolean secondEmpty = true;
+
+                       if (this.matchStatus != MatchStatus.FIRST_EMPTY) {
+                               if (this.matchStatus == 
MatchStatus.FIRST_REMAINED) {
+                                       // comparator is still set correctly
+                                       firstEmpty = false;
+                               } else {
+                                       if (this.leftGroupedIterator.nextKey()) 
{
+                                               
this.pairComparator.setReference(leftGroupedIterator.getValues().getCurrent());
+                                               firstEmpty = false;
+                                       }
+                               }
+                       }
+
+                       if (this.matchStatus != MatchStatus.SECOND_EMPTY) {
+                               if (this.matchStatus == 
MatchStatus.SECOND_REMAINED) {
+                                       secondEmpty = false;
+                               } else {
+                                       if (rightGroupedIterator.nextKey()) {
+                                               secondEmpty = false;
+                                       }
+                               }
+                       }
+
+                       if (firstEmpty && secondEmpty) {
+                               // both inputs are empty
+                               return false;
+                       } else if (firstEmpty && !secondEmpty) {
+                               // input1 is empty, input2 not
+                               this.currLeftSubset = 
Collections.singleton(null);
+                               this.currRightSubset = 
this.rightGroupedIterator.getValues();
+                               this.matchStatus = MatchStatus.FIRST_EMPTY;
+                               return true;
+                       } else if (!firstEmpty && secondEmpty) {
+                               // input1 is not empty, input 2 is empty
+                               this.currLeftSubset = 
this.leftGroupedIterator.getValues();
+                               this.currRightSubset = 
Collections.singleton(null);
+                               this.matchStatus = MatchStatus.SECOND_EMPTY;
+                               return true;
+                       } else {
+                               // both inputs are not empty
+                               final int comp = 
this.pairComparator.compareToReference(rightGroupedIterator.getValues().getCurrent());
+
+                               if (0 == comp) {
+                                       // keys match
+                                       this.currLeftSubset = 
this.leftGroupedIterator.getValues();
+                                       this.currRightSubset = 
this.rightGroupedIterator.getValues();
+                                       this.matchStatus = 
MatchStatus.NONE_REMAINED;
+                               } else if (0 < comp) {
+                                       // key1 goes first
+                                       this.currLeftSubset = 
this.leftGroupedIterator.getValues();
+                                       this.currRightSubset = 
Collections.singleton(null);
+                                       this.matchStatus = 
MatchStatus.SECOND_REMAINED;
+                               } else {
+                                       // key 2 goes first
+                                       this.currLeftSubset = 
Collections.singleton(null);
+                                       this.currRightSubset = 
this.rightGroupedIterator.getValues();
+                                       this.matchStatus = 
MatchStatus.FIRST_REMAINED;
+                               }
+                               return true;
+                       }
+               }
+
+               private IN1 getLeft() {
+                       return leftReturn;
+               }
+
+               private IN2 getRight() {
+                       return rightReturn;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
new file mode 100644
index 0000000..679e4ce
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators.base;
+
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+import com.google.common.base.Joiner;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+@SuppressWarnings("serial")
+public class OuterJoinOperatorBaseTest implements Serializable {
+
+       private final FlatJoinFunction<String, String, String> joiner = new 
FlatJoinFunction<String, String, String>() {
+               @Override
+               public void join(String first, String second, Collector<String> 
out) throws Exception {
+                       out.collect(Joiner.on(',').join(String.valueOf(first), 
String.valueOf(second)));
+               }
+       };
+
+       @SuppressWarnings({"rawtypes", "unchecked"})
+       private final OuterJoinOperatorBase<String, String, String, 
FlatJoinFunction<String, String, String>> baseOperator =
+                       new OuterJoinOperatorBase(joiner,
+                                       new 
BinaryOperatorInformation(BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO,
+                                                       
BasicTypeInfo.STRING_TYPE_INFO), new int[0], new int[0], "TestJoiner", null);
+
+       @Test
+       public void testFullOuterJoinWithoutMatchingPartners() throws Exception 
{
+               final List<String> leftInput = Arrays.asList("foo", "bar", 
"foobar");
+               final List<String> rightInput = Arrays.asList("oof", "rab", 
"raboof");
+               
baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.FULL);
+               List<String> expected = Arrays.asList("bar,null", "foo,null", 
"foobar,null", "null,oof", "null,rab", "null,raboof");
+               testOuterJoin(leftInput, rightInput, expected);
+       }
+
+       @Test
+       public void testFullOuterJoinWithFullMatchingKeys() throws Exception {
+               final List<String> leftInput = Arrays.asList("foo", "bar", 
"foobar");
+               final List<String> rightInput = Arrays.asList("bar", "foobar", 
"foo");
+               
baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.FULL);
+               List<String> expected = Arrays.asList("bar,bar", "foo,foo", 
"foobar,foobar");
+               testOuterJoin(leftInput, rightInput, expected);
+       }
+
+       @Test
+       public void testFullOuterJoinWithEmptyLeftInput() throws Exception {
+               final List<String> leftInput = Arrays.asList();
+               final List<String> rightInput = Arrays.asList("foo", "bar", 
"foobar");
+               
baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.FULL);
+               List<String> expected = Arrays.asList("null,bar", "null,foo", 
"null,foobar");
+               testOuterJoin(leftInput, rightInput, expected);
+       }
+
+       @Test
+       public void testFullOuterJoinWithEmptyRightInput() throws Exception {
+               final List<String> leftInput = Arrays.asList("foo", "bar", 
"foobar");
+               final List<String> rightInput = Arrays.asList();
+               
baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.FULL);
+               List<String> expected = Arrays.asList("bar,null", "foo,null", 
"foobar,null");
+               testOuterJoin(leftInput, rightInput, expected);
+       }
+
+       @Test
+       public void testFullOuterJoinWithPartialMatchingKeys() throws Exception 
{
+               final List<String> leftInput = Arrays.asList("foo", "bar", 
"foobar");
+               final List<String> rightInput = Arrays.asList("bar", "foo", 
"barfoo");
+               
baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.FULL);
+               List<String> expected = Arrays.asList("bar,bar", "null,barfoo", 
"foo,foo", "foobar,null");
+               testOuterJoin(leftInput, rightInput, expected);
+       }
+
+       @Test
+       public void testFullOuterJoinBuildingCorrectCrossProducts() throws 
Exception {
+               final List<String> leftInput = Arrays.asList("foo", "foo", 
"foo", "bar","bar", "foobar", "foobar");
+               final List<String> rightInput = Arrays.asList("foo", "foo", 
"bar", "bar", "bar", "barfoo", "barfoo");
+               
baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.FULL);
+               List<String> expected = Arrays.asList("bar,bar", "bar,bar", 
"bar,bar", "bar,bar", "bar,bar", "bar,bar",
+                               "null,barfoo", "null,barfoo", "foo,foo", 
"foo,foo", "foo,foo", "foo,foo", "foo,foo", "foo,foo",
+                               "foobar,null", "foobar,null");
+               testOuterJoin(leftInput, rightInput, expected);
+       }
+
+       @Test
+       public void testLeftOuterJoin() throws Exception {
+               final List<String> leftInput = Arrays.asList("foo", "foo", 
"foo", "bar","bar", "foobar", "foobar");
+               final List<String> rightInput = Arrays.asList("foo", "foo", 
"bar", "bar", "bar", "barfoo", "barfoo");
+               
baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.LEFT);
+               List<String> expected = Arrays.asList("bar,bar", "bar,bar", 
"bar,bar", "bar,bar", "bar,bar", "bar,bar",
+                               "foo,foo", "foo,foo", "foo,foo", "foo,foo", 
"foo,foo", "foo,foo", "foobar,null", "foobar,null");
+               testOuterJoin(leftInput, rightInput, expected);
+       }
+
+       @Test
+       public void testRightOuterJoin() throws Exception {
+               final List<String> leftInput = Arrays.asList("foo", "foo", 
"foo", "bar","bar", "foobar", "foobar");
+               final List<String> rightInput = Arrays.asList("foo", "foo", 
"bar", "bar", "bar", "barfoo", "barfoo");
+               
baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.RIGHT);
+               List<String> expected = Arrays.asList("bar,bar", "bar,bar", 
"bar,bar", "bar,bar", "bar,bar", "bar,bar",
+                               "null,barfoo", "null,barfoo", "foo,foo", 
"foo,foo", "foo,foo", "foo,foo", "foo,foo", "foo,foo");
+               testOuterJoin(leftInput, rightInput, expected);
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testThatExceptionIsThrownForOuterJoinTypeNull() throws 
Exception {
+               final List<String> leftInput = Arrays.asList("foo", "bar", 
"foobar");
+               final List<String> rightInput = Arrays.asList("bar", "foobar", 
"foo");
+
+               baseOperator.setOuterJoinType(null);
+               ExecutionConfig executionConfig = new ExecutionConfig();
+               executionConfig.disableObjectReuse();
+               baseOperator.executeOnCollections(leftInput, rightInput, null, 
executionConfig);
+       }
+
+       private void testOuterJoin(List<String> leftInput, List<String> 
rightInput, List<String> expected) throws Exception {
+               ExecutionConfig executionConfig = new ExecutionConfig();
+               executionConfig.disableObjectReuse();
+               List<String> resultSafe = 
baseOperator.executeOnCollections(leftInput, rightInput, null, executionConfig);
+               executionConfig.enableObjectReuse();
+               List<String> resultRegular = 
baseOperator.executeOnCollections(leftInput, rightInput, null, executionConfig);
+
+               assertEquals(expected, resultSafe);
+               assertEquals(expected, resultRegular);
+       }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java
index 30786aa..d942b72 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.runtime.operators;
 
+import 
org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memory.MemoryManager;
-import 
org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType;
 import 
org.apache.flink.runtime.operators.sort.NonReusingMergeOuterJoinIterator;
 import org.apache.flink.runtime.operators.sort.ReusingMergeOuterJoinIterator;
 import org.apache.flink.runtime.operators.util.JoinTaskIterator;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java
index 3cccab8..ae05d1e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.runtime.operators;
 
+import 
org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memory.MemoryManager;
-import 
org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType;
 import 
org.apache.flink.runtime.operators.sort.NonReusingMergeOuterJoinIterator;
 import org.apache.flink.runtime.operators.sort.ReusingMergeOuterJoinIterator;
 import org.apache.flink.runtime.operators.util.JoinTaskIterator;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java
index c93637e..6fc8abd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.runtime.operators;
 
+import 
org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memory.MemoryManager;
-import 
org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType;
 import 
org.apache.flink.runtime.operators.sort.NonReusingMergeOuterJoinIterator;
 import org.apache.flink.runtime.operators.sort.ReusingMergeOuterJoinIterator;
 import org.apache.flink.runtime.operators.util.JoinTaskIterator;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
index d109cf8..74faeb3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.operators.sort;
 
 import org.apache.flink.api.common.functions.FlatJoinFunction;
+import 
org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -37,8 +38,6 @@ import java.util.Iterator;
  */
 public abstract class AbstractMergeOuterJoinIterator<T1, T2, O> extends 
AbstractMergeIterator<T1, T2, O> {
 
-       public enum OuterJoinType {LEFT, RIGHT, FULL}
-
        private final OuterJoinType outerJoinType;
 
        private boolean initialized = false;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java
index db47f16..f2faa2e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.operators.sort;
 
+import 
org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java
index 8382b86..33d72d2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.operators.sort;
 
+import 
org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
index 0c0e836..7b27fa9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators.sort;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.util.ListCollector;
+import 
org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType;
 import org.apache.flink.api.common.typeutils.GenericPairComparator;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
@@ -37,7 +38,6 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
-import 
org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType;
 import org.apache.flink.runtime.operators.testutils.CollectionIterator;
 import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java
index 7272595..e930317 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.runtime.operators.sort;
 
+import 
org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
-import 
org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java
index 2cec393..cca1b76 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.runtime.operators.sort;
 
+import 
org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
-import 
org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.Test;
 

Reply via email to