[FLINK-2576] Add outer join base operator.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b222276 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b222276 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b222276 Branch: refs/heads/master Commit: 6b2222762fc38d84b31170216d6b6ae0c272af9b Parents: 0455857 Author: r-pogalz <r.pog...@campus.tu-berlin.de> Authored: Tue Jul 7 21:40:04 2015 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Fri Oct 9 16:19:21 2015 +0200 ---------------------------------------------------------------------- .../operators/base/OuterJoinOperatorBase.java | 314 +++++++++++++++++++ .../base/OuterJoinOperatorBaseTest.java | 150 +++++++++ .../runtime/operators/FullOuterJoinDriver.java | 2 +- .../runtime/operators/LeftOuterJoinDriver.java | 2 +- .../runtime/operators/RightOuterJoinDriver.java | 2 +- .../sort/AbstractMergeOuterJoinIterator.java | 3 +- .../sort/NonReusingMergeOuterJoinIterator.java | 1 + .../sort/ReusingMergeOuterJoinIterator.java | 1 + ...bstractSortMergeOuterJoinIteratorITCase.java | 2 +- ...ReusingSortMergeOuterJoinIteratorITCase.java | 2 +- ...ReusingSortMergeOuterJoinIteratorITCase.java | 2 +- 11 files changed, 473 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java new file mode 100644 index 0000000..7666d10 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.operators.base; + +import org.apache.commons.collections.ResettableIterator; +import org.apache.commons.collections.iterators.ListIteratorWrapper; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.CopyingListCollector; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.operators.BinaryOperatorInformation; +import org.apache.flink.api.common.operators.util.ListKeyGroupedIterator; +import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; +import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; +import org.apache.flink.api.common.operators.util.UserCodeWrapper; +import org.apache.flink.api.common.typeinfo.AtomicType; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.common.typeutils.GenericPairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +public class OuterJoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN2, OUT>> extends AbstractJoinOperatorBase<IN1, IN2, OUT, FT> { + + public static enum OuterJoinType {LEFT, RIGHT, FULL} + + private OuterJoinType outerJoinType; + + public OuterJoinOperatorBase(UserCodeWrapper<FT> udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo, + int[] keyPositions1, int[] keyPositions2, String name, OuterJoinType outerJoinType) { + super(udf, operatorInfo, keyPositions1, keyPositions2, name); + this.outerJoinType = outerJoinType; + } + + public OuterJoinOperatorBase(FT udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo, + int[] keyPositions1, int[] keyPositions2, String name, OuterJoinType outerJoinType) { + super(new UserCodeObjectWrapper<FT>(udf), operatorInfo, keyPositions1, keyPositions2, name); + this.outerJoinType = outerJoinType; + } + + public OuterJoinOperatorBase(Class<? extends FT> udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo, + int[] keyPositions1, int[] keyPositions2, String name, OuterJoinType outerJoinType) { + super(new UserCodeClassWrapper<FT>(udf), operatorInfo, keyPositions1, keyPositions2, name); + this.outerJoinType = outerJoinType; + } + + public void setOuterJoinType(OuterJoinType outerJoinType) { + this.outerJoinType = outerJoinType; + } + + public OuterJoinType getOuterJoinType() { + return outerJoinType; + } + + @Override + protected List<OUT> executeOnCollections(List<IN1> leftInput, List<IN2> rightInput, RuntimeContext runtimeContext, ExecutionConfig executionConfig) throws Exception { + TypeInformation<IN1> leftInformation = getOperatorInfo().getFirstInputType(); + TypeInformation<IN2> rightInformation = getOperatorInfo().getSecondInputType(); + TypeInformation<OUT> outInformation = getOperatorInfo().getOutputType(); + + TypeComparator<IN1> leftComparator = buildComparatorFor(0, executionConfig, leftInformation); + TypeComparator<IN2> rightComparator = buildComparatorFor(1, executionConfig, rightInformation); + + TypeSerializer<IN1> leftSerializer = leftInformation.createSerializer(executionConfig); + TypeSerializer<IN2> rightSerializer = rightInformation.createSerializer(executionConfig); + + OuterJoinListIterator<IN1, IN2> outerJoinIterator = + new OuterJoinListIterator<>(leftInput, leftSerializer, leftComparator, + rightInput, rightSerializer, rightComparator, outerJoinType); + + // -------------------------------------------------------------------- + // Run UDF + // -------------------------------------------------------------------- + FlatJoinFunction<IN1, IN2, OUT> function = userFunction.getUserCodeObject(); + + FunctionUtils.setFunctionRuntimeContext(function, runtimeContext); + FunctionUtils.openFunction(function, this.parameters); + + + List<OUT> result = new ArrayList<>(); + Collector<OUT> collector = new CopyingListCollector<>(result, outInformation.createSerializer(executionConfig)); + + while (outerJoinIterator.next()) { + IN1 left = outerJoinIterator.getLeft(); + IN2 right = outerJoinIterator.getRight(); + function.join(left == null ? null : leftSerializer.copy(left), right == null ? null : rightSerializer.copy(right), collector); + } + + return result; + } + + @SuppressWarnings("unchecked") + private <T> TypeComparator<T> buildComparatorFor(int input, ExecutionConfig executionConfig, TypeInformation<T> typeInformation) { + TypeComparator<T> comparator; + if (typeInformation instanceof AtomicType) { + comparator = ((AtomicType<T>) typeInformation).createComparator(true, executionConfig); + } else if (typeInformation instanceof CompositeType) { + int[] keyPositions = getKeyColumns(input); + boolean[] orders = new boolean[keyPositions.length]; + Arrays.fill(orders, true); + + comparator = ((CompositeType<T>) typeInformation).createComparator(keyPositions, orders, 0, executionConfig); + } else { + throw new RuntimeException("Type information for input of type " + typeInformation.getClass() + .getCanonicalName() + " is not supported. Could not generate a comparator."); + } + return comparator; + } + + private static class OuterJoinListIterator<IN1, IN2> { + + + private static enum MatchStatus { + NONE_REMAINED, FIRST_REMAINED, SECOND_REMAINED, FIRST_EMPTY, SECOND_EMPTY + } + + private OuterJoinType outerJoinType; + + private ListKeyGroupedIterator<IN1> leftGroupedIterator; + private ListKeyGroupedIterator<IN2> rightGroupedIterator; + private Iterable<IN1> currLeftSubset; + private ResettableIterator currLeftIterator; + private Iterable<IN2> currRightSubset; + private ResettableIterator currRightIterator; + + private MatchStatus matchStatus; + private GenericPairComparator<IN1, IN2> pairComparator; + + private IN1 leftReturn; + private IN2 rightReturn; + + public OuterJoinListIterator(List<IN1> leftInput, TypeSerializer<IN1> leftSerializer, final TypeComparator<IN1> leftComparator, + List<IN2> rightInput, TypeSerializer<IN2> rightSerializer, final TypeComparator<IN2> rightComparator, + OuterJoinType outerJoinType) { + this.outerJoinType = outerJoinType; + pairComparator = new GenericPairComparator<>(leftComparator, rightComparator); + leftGroupedIterator = new ListKeyGroupedIterator<>(leftInput, leftSerializer, leftComparator); + rightGroupedIterator = new ListKeyGroupedIterator<>(rightInput, rightSerializer, rightComparator); + // ---------------------------------------------------------------- + // Sort + // ---------------------------------------------------------------- + Collections.sort(leftInput, new Comparator<IN1>() { + @Override + public int compare(IN1 o1, IN1 o2) { + return leftComparator.compare(o1, o2); + } + }); + + Collections.sort(rightInput, new Comparator<IN2>() { + @Override + public int compare(IN2 o1, IN2 o2) { + return rightComparator.compare(o1, o2); + } + }); + + } + + @SuppressWarnings("unchecked") + private boolean next() throws IOException { + boolean hasMoreElements; + if ((currLeftIterator == null || !currLeftIterator.hasNext()) && (currRightIterator == null || !currRightIterator.hasNext())) { + hasMoreElements = nextGroups(outerJoinType); + if (hasMoreElements) { + if (outerJoinType != OuterJoinType.LEFT) { + currLeftIterator = new ListIteratorWrapper(currLeftSubset.iterator()); + } + leftReturn = (IN1) currLeftIterator.next(); + if (outerJoinType != OuterJoinType.RIGHT) { + currRightIterator = new ListIteratorWrapper(currRightSubset.iterator()); + } + rightReturn = (IN2) currRightIterator.next(); + return true; + } else { + //no more elements + return false; + } + } else if (currLeftIterator.hasNext() && !currRightIterator.hasNext()) { + leftReturn = (IN1) currLeftIterator.next(); + currRightIterator.reset(); + rightReturn = (IN2) currRightIterator.next(); + return true; + } else { + rightReturn = (IN2) currRightIterator.next(); + return true; + } + } + + private boolean nextGroups(OuterJoinType outerJoinType) throws IOException { + if (outerJoinType == OuterJoinType.FULL) { + return nextGroups(); + } else if (outerJoinType == OuterJoinType.LEFT) { + boolean leftContainsElements = false; + while (!leftContainsElements && nextGroups()) { + currLeftIterator = new ListIteratorWrapper(currLeftSubset.iterator()); + if (currLeftIterator.next() != null) { + leftContainsElements = true; + } + currLeftIterator.reset(); + } + return leftContainsElements; + } else if (outerJoinType == OuterJoinType.RIGHT) { + boolean rightContainsElements = false; + while (!rightContainsElements && nextGroups()) { + currRightIterator = new ListIteratorWrapper(currRightSubset.iterator()); + if (currRightIterator.next() != null) { + rightContainsElements = true; + } + currRightIterator.reset(); + } + return rightContainsElements; + } else { + throw new IllegalArgumentException("Outer join of type '" + outerJoinType + "' not supported."); + } + } + + private boolean nextGroups() throws IOException { + boolean firstEmpty = true; + boolean secondEmpty = true; + + if (this.matchStatus != MatchStatus.FIRST_EMPTY) { + if (this.matchStatus == MatchStatus.FIRST_REMAINED) { + // comparator is still set correctly + firstEmpty = false; + } else { + if (this.leftGroupedIterator.nextKey()) { + this.pairComparator.setReference(leftGroupedIterator.getValues().getCurrent()); + firstEmpty = false; + } + } + } + + if (this.matchStatus != MatchStatus.SECOND_EMPTY) { + if (this.matchStatus == MatchStatus.SECOND_REMAINED) { + secondEmpty = false; + } else { + if (rightGroupedIterator.nextKey()) { + secondEmpty = false; + } + } + } + + if (firstEmpty && secondEmpty) { + // both inputs are empty + return false; + } else if (firstEmpty && !secondEmpty) { + // input1 is empty, input2 not + this.currLeftSubset = Collections.singleton(null); + this.currRightSubset = this.rightGroupedIterator.getValues(); + this.matchStatus = MatchStatus.FIRST_EMPTY; + return true; + } else if (!firstEmpty && secondEmpty) { + // input1 is not empty, input 2 is empty + this.currLeftSubset = this.leftGroupedIterator.getValues(); + this.currRightSubset = Collections.singleton(null); + this.matchStatus = MatchStatus.SECOND_EMPTY; + return true; + } else { + // both inputs are not empty + final int comp = this.pairComparator.compareToReference(rightGroupedIterator.getValues().getCurrent()); + + if (0 == comp) { + // keys match + this.currLeftSubset = this.leftGroupedIterator.getValues(); + this.currRightSubset = this.rightGroupedIterator.getValues(); + this.matchStatus = MatchStatus.NONE_REMAINED; + } else if (0 < comp) { + // key1 goes first + this.currLeftSubset = this.leftGroupedIterator.getValues(); + this.currRightSubset = Collections.singleton(null); + this.matchStatus = MatchStatus.SECOND_REMAINED; + } else { + // key 2 goes first + this.currLeftSubset = Collections.singleton(null); + this.currRightSubset = this.rightGroupedIterator.getValues(); + this.matchStatus = MatchStatus.FIRST_REMAINED; + } + return true; + } + } + + private IN1 getLeft() { + return leftReturn; + } + + private IN2 getRight() { + return rightReturn; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java new file mode 100644 index 0000000..679e4ce --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.operators.base; + + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; + +import com.google.common.base.Joiner; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.operators.BinaryOperatorInformation; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.util.Collector; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +@SuppressWarnings("serial") +public class OuterJoinOperatorBaseTest implements Serializable { + + private final FlatJoinFunction<String, String, String> joiner = new FlatJoinFunction<String, String, String>() { + @Override + public void join(String first, String second, Collector<String> out) throws Exception { + out.collect(Joiner.on(',').join(String.valueOf(first), String.valueOf(second))); + } + }; + + @SuppressWarnings({"rawtypes", "unchecked"}) + private final OuterJoinOperatorBase<String, String, String, FlatJoinFunction<String, String, String>> baseOperator = + new OuterJoinOperatorBase(joiner, + new BinaryOperatorInformation(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO), new int[0], new int[0], "TestJoiner", null); + + @Test + public void testFullOuterJoinWithoutMatchingPartners() throws Exception { + final List<String> leftInput = Arrays.asList("foo", "bar", "foobar"); + final List<String> rightInput = Arrays.asList("oof", "rab", "raboof"); + baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.FULL); + List<String> expected = Arrays.asList("bar,null", "foo,null", "foobar,null", "null,oof", "null,rab", "null,raboof"); + testOuterJoin(leftInput, rightInput, expected); + } + + @Test + public void testFullOuterJoinWithFullMatchingKeys() throws Exception { + final List<String> leftInput = Arrays.asList("foo", "bar", "foobar"); + final List<String> rightInput = Arrays.asList("bar", "foobar", "foo"); + baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.FULL); + List<String> expected = Arrays.asList("bar,bar", "foo,foo", "foobar,foobar"); + testOuterJoin(leftInput, rightInput, expected); + } + + @Test + public void testFullOuterJoinWithEmptyLeftInput() throws Exception { + final List<String> leftInput = Arrays.asList(); + final List<String> rightInput = Arrays.asList("foo", "bar", "foobar"); + baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.FULL); + List<String> expected = Arrays.asList("null,bar", "null,foo", "null,foobar"); + testOuterJoin(leftInput, rightInput, expected); + } + + @Test + public void testFullOuterJoinWithEmptyRightInput() throws Exception { + final List<String> leftInput = Arrays.asList("foo", "bar", "foobar"); + final List<String> rightInput = Arrays.asList(); + baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.FULL); + List<String> expected = Arrays.asList("bar,null", "foo,null", "foobar,null"); + testOuterJoin(leftInput, rightInput, expected); + } + + @Test + public void testFullOuterJoinWithPartialMatchingKeys() throws Exception { + final List<String> leftInput = Arrays.asList("foo", "bar", "foobar"); + final List<String> rightInput = Arrays.asList("bar", "foo", "barfoo"); + baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.FULL); + List<String> expected = Arrays.asList("bar,bar", "null,barfoo", "foo,foo", "foobar,null"); + testOuterJoin(leftInput, rightInput, expected); + } + + @Test + public void testFullOuterJoinBuildingCorrectCrossProducts() throws Exception { + final List<String> leftInput = Arrays.asList("foo", "foo", "foo", "bar","bar", "foobar", "foobar"); + final List<String> rightInput = Arrays.asList("foo", "foo", "bar", "bar", "bar", "barfoo", "barfoo"); + baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.FULL); + List<String> expected = Arrays.asList("bar,bar", "bar,bar", "bar,bar", "bar,bar", "bar,bar", "bar,bar", + "null,barfoo", "null,barfoo", "foo,foo", "foo,foo", "foo,foo", "foo,foo", "foo,foo", "foo,foo", + "foobar,null", "foobar,null"); + testOuterJoin(leftInput, rightInput, expected); + } + + @Test + public void testLeftOuterJoin() throws Exception { + final List<String> leftInput = Arrays.asList("foo", "foo", "foo", "bar","bar", "foobar", "foobar"); + final List<String> rightInput = Arrays.asList("foo", "foo", "bar", "bar", "bar", "barfoo", "barfoo"); + baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.LEFT); + List<String> expected = Arrays.asList("bar,bar", "bar,bar", "bar,bar", "bar,bar", "bar,bar", "bar,bar", + "foo,foo", "foo,foo", "foo,foo", "foo,foo", "foo,foo", "foo,foo", "foobar,null", "foobar,null"); + testOuterJoin(leftInput, rightInput, expected); + } + + @Test + public void testRightOuterJoin() throws Exception { + final List<String> leftInput = Arrays.asList("foo", "foo", "foo", "bar","bar", "foobar", "foobar"); + final List<String> rightInput = Arrays.asList("foo", "foo", "bar", "bar", "bar", "barfoo", "barfoo"); + baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.RIGHT); + List<String> expected = Arrays.asList("bar,bar", "bar,bar", "bar,bar", "bar,bar", "bar,bar", "bar,bar", + "null,barfoo", "null,barfoo", "foo,foo", "foo,foo", "foo,foo", "foo,foo", "foo,foo", "foo,foo"); + testOuterJoin(leftInput, rightInput, expected); + } + + @Test(expected = IllegalArgumentException.class) + public void testThatExceptionIsThrownForOuterJoinTypeNull() throws Exception { + final List<String> leftInput = Arrays.asList("foo", "bar", "foobar"); + final List<String> rightInput = Arrays.asList("bar", "foobar", "foo"); + + baseOperator.setOuterJoinType(null); + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.disableObjectReuse(); + baseOperator.executeOnCollections(leftInput, rightInput, null, executionConfig); + } + + private void testOuterJoin(List<String> leftInput, List<String> rightInput, List<String> expected) throws Exception { + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.disableObjectReuse(); + List<String> resultSafe = baseOperator.executeOnCollections(leftInput, rightInput, null, executionConfig); + executionConfig.enableObjectReuse(); + List<String> resultRegular = baseOperator.executeOnCollections(leftInput, rightInput, null, executionConfig); + + assertEquals(expected, resultSafe); + assertEquals(expected, resultRegular); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java index 30786aa..d942b72 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java @@ -18,12 +18,12 @@ package org.apache.flink.runtime.operators; +import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparatorFactory; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType; import org.apache.flink.runtime.operators.sort.NonReusingMergeOuterJoinIterator; import org.apache.flink.runtime.operators.sort.ReusingMergeOuterJoinIterator; import org.apache.flink.runtime.operators.util.JoinTaskIterator; http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java index 3cccab8..ae05d1e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java @@ -18,12 +18,12 @@ package org.apache.flink.runtime.operators; +import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparatorFactory; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType; import org.apache.flink.runtime.operators.sort.NonReusingMergeOuterJoinIterator; import org.apache.flink.runtime.operators.sort.ReusingMergeOuterJoinIterator; import org.apache.flink.runtime.operators.util.JoinTaskIterator; http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java index c93637e..6fc8abd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java @@ -18,12 +18,12 @@ package org.apache.flink.runtime.operators; +import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparatorFactory; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType; import org.apache.flink.runtime.operators.sort.NonReusingMergeOuterJoinIterator; import org.apache.flink.runtime.operators.sort.ReusingMergeOuterJoinIterator; import org.apache.flink.runtime.operators.util.JoinTaskIterator; http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java index d109cf8..74faeb3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.operators.sort; import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -37,8 +38,6 @@ import java.util.Iterator; */ public abstract class AbstractMergeOuterJoinIterator<T1, T2, O> extends AbstractMergeIterator<T1, T2, O> { - public enum OuterJoinType {LEFT, RIGHT, FULL} - private final OuterJoinType outerJoinType; private boolean initialized = false; http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java index db47f16..f2faa2e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.operators.sort; +import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java index 8382b86..33d72d2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.operators.sort; +import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java index 0c0e836..7b27fa9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators.sort; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.functions.util.ListCollector; +import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType; import org.apache.flink.api.common.typeutils.GenericPairComparator; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparator; @@ -37,7 +38,6 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType; import org.apache.flink.runtime.operators.testutils.CollectionIterator; import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector; import org.apache.flink.runtime.operators.testutils.DummyInvokable; http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java index 7272595..e930317 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java @@ -18,13 +18,13 @@ package org.apache.flink.runtime.operators.sort; +import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType; import org.apache.flink.util.MutableObjectIterator; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java index 2cec393..cca1b76 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java @@ -18,13 +18,13 @@ package org.apache.flink.runtime.operators.sort; +import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType; import org.apache.flink.util.MutableObjectIterator; import org.junit.Test;