[FLINK-2105] Implement Sort-Merge Outer Join algorithm This closes #907
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/941ac6df Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/941ac6df Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/941ac6df Branch: refs/heads/master Commit: 941ac6dfd446d8e97e2fe2f589164978602adf94 Parents: df9f481 Author: r-pogalz <r.pog...@campus.tu-berlin.de> Authored: Mon Aug 3 12:59:48 2015 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Tue Aug 4 23:51:27 2015 +0200 ---------------------------------------------------------------------- .../operators/sort/AbstractMergeIterator.java | 58 +-- .../sort/AbstractMergeOuterJoinIterator.java | 189 ++++++++ .../sort/NonReusingMergeOuterJoinIterator.java | 60 +++ .../sort/ReusingMergeOuterJoinIterator.java | 63 +++ ...bstractSortMergeOuterJoinIteratorITCase.java | 462 +++++++++++++++++++ ...ReusingSortMergeInnerJoinIteratorITCase.java | 4 +- ...ReusingSortMergeOuterJoinIteratorITCase.java | 82 ++++ ...ReusingSortMergeInnerJoinIteratorITCase.java | 4 +- ...ReusingSortMergeOuterJoinIteratorITCase.java | 82 ++++ .../runtime/operators/testutils/Match.java | 2 +- .../testutils/MatchRemovingJoiner.java | 58 +++ .../testutils/MatchRemovingMatcher.java | 58 --- 12 files changed, 1030 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeIterator.java index 9a61c14..c01afc7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeIterator.java @@ -115,20 +115,20 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat } /** - * Calls the <code>JoinFunction#match()</code> method for all two key-value pairs that share the same key and come - * from different inputs. The output of the <code>match()</code> method is forwarded. + * Calls the <code>JoinFunction#join()</code> method for all two key-value pairs that share the same key and come + * from different inputs. The output of the <code>join()</code> method is forwarded. * <p> * This method first zig-zags between the two sorted inputs in order to find a common - * key, and then calls the match stub with the cross product of the values. + * key, and then calls the join stub with the cross product of the values. * * @throws Exception Forwards all exceptions from the user code and the I/O system. * @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction, org.apache.flink.util.Collector) */ @Override - public abstract boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector) + public abstract boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> joinFunction, final Collector<O> collector) throws Exception; - protected void crossMatchingGroup(Iterator<T1> values1, Iterator<T2> values2, FlatJoinFunction<T1, T2, O> matchFunction, Collector<O> collector) throws Exception { + protected void crossMatchingGroup(Iterator<T1> values1, Iterator<T2> values2, FlatJoinFunction<T1, T2, O> joinFunction, Collector<O> collector) throws Exception { final T1 firstV1 = values1.next(); final T2 firstV2 = values2.next(); @@ -143,23 +143,23 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat if (v2HasNext) { // both sides contain more than one value // TODO: Decide which side to spill and which to block! - crossMwithNValues(firstV1, values1, firstV2, values2, matchFunction, collector); + crossMwithNValues(firstV1, values1, firstV2, values2, joinFunction, collector); } else { - crossSecond1withNValues(firstV2, firstV1, values1, matchFunction, collector); + crossSecond1withNValues(firstV2, firstV1, values1, joinFunction, collector); } } else { if (v2HasNext) { - crossFirst1withNValues(firstV1, firstV2, values2, matchFunction, collector); + crossFirst1withNValues(firstV1, firstV2, values2, joinFunction, collector); } else { // both sides contain only one value - matchFunction.join(firstV1, firstV2, collector); + joinFunction.join(firstV1, firstV2, collector); } } } /** * Crosses a single value from the first input with N values, all sharing a common key. - * Effectively realizes a <i>1:N</i> match (join). + * Effectively realizes a <i>1:N</i> join. * * @param val1 The value form the <i>1</i> side. * @param firstValN The first of the values from the <i>N</i> side. @@ -167,21 +167,21 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat * @throws Exception Forwards all exceptions thrown by the stub. */ private void crossFirst1withNValues(final T1 val1, final T2 firstValN, - final Iterator<T2> valsN, final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector) + final Iterator<T2> valsN, final FlatJoinFunction<T1, T2, O> joinFunction, final Collector<O> collector) throws Exception { T1 copy1 = createCopy(serializer1, val1, this.copy1); - matchFunction.join(copy1, firstValN, collector); + joinFunction.join(copy1, firstValN, collector); - // set copy and match first element + // set copy and join first element boolean more = true; do { final T2 nRec = valsN.next(); if (valsN.hasNext()) { copy1 = createCopy(serializer1, val1, this.copy1); - matchFunction.join(copy1, nRec, collector); + joinFunction.join(copy1, nRec, collector); } else { - matchFunction.join(val1, nRec, collector); + joinFunction.join(val1, nRec, collector); more = false; } } @@ -190,7 +190,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat /** * Crosses a single value from the second side with N values, all sharing a common key. - * Effectively realizes a <i>N:1</i> match (join). + * Effectively realizes a <i>N:1</i> join. * * @param val1 The value form the <i>1</i> side. * @param firstValN The first of the values from the <i>N</i> side. @@ -198,20 +198,20 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat * @throws Exception Forwards all exceptions thrown by the stub. */ private void crossSecond1withNValues(T2 val1, T1 firstValN, - Iterator<T1> valsN, FlatJoinFunction<T1, T2, O> matchFunction, Collector<O> collector) throws Exception { + Iterator<T1> valsN, FlatJoinFunction<T1, T2, O> joinFunction, Collector<O> collector) throws Exception { T2 copy2 = createCopy(serializer2, val1, this.copy2); - matchFunction.join(firstValN, copy2, collector); + joinFunction.join(firstValN, copy2, collector); - // set copy and match first element + // set copy and join first element boolean more = true; do { final T1 nRec = valsN.next(); if (valsN.hasNext()) { copy2 = createCopy(serializer2, val1, this.copy2); - matchFunction.join(nRec, copy2, collector); + joinFunction.join(nRec, copy2, collector); } else { - matchFunction.join(nRec, val1, collector); + joinFunction.join(nRec, val1, collector); more = false; } } @@ -220,7 +220,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat private void crossMwithNValues(final T1 firstV1, Iterator<T1> spillVals, final T2 firstV2, final Iterator<T2> blockVals, - final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector) throws Exception { + final FlatJoinFunction<T1, T2, O> joinFunction, final Collector<O> collector) throws Exception { // ================================================== // We have one first (head) element from both inputs (firstV1 and firstV2) // We have an iterator for both inputs. @@ -237,13 +237,13 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat // 5) cross the head of the spilling side with the next block // 6) cross the spilling iterator with the next block. - // match the first values first + // join the first values first T1 copy1 = this.createCopy(serializer1, firstV1, this.copy1); T2 blockHeadCopy = this.createCopy(serializer2, firstV2, this.blockHeadCopy); T1 spillHeadCopy = null; // --------------- 1) Cross the heads ------------------- - matchFunction.join(copy1, firstV2, collector); + joinFunction.join(copy1, firstV2, collector); // for the remaining values, we do a block-nested-loops join SpillingResettableIterator<T1> spillIt = null; @@ -256,7 +256,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat while (this.blockIt.hasNext()) { final T2 nextBlockRec = this.blockIt.next(); copy1 = this.createCopy(serializer1, firstV1, this.copy1); - matchFunction.join(copy1, nextBlockRec, collector); + joinFunction.join(copy1, nextBlockRec, collector); } this.blockIt.reset(); @@ -286,7 +286,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat // -------- 3) cross the iterator of the spilling side with the head of the block side -------- T2 copy2 = this.createCopy(serializer2, blockHeadCopy, this.copy2); - matchFunction.join(copy1, copy2, collector); + joinFunction.join(copy1, copy2, collector); // -------- 4) cross the iterator of the spilling side with the first block -------- while (this.blockIt.hasNext()) { @@ -294,7 +294,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat // get instances of key and block value copy1 = this.createCopy(serializer1, nextSpillVal, this.copy1); - matchFunction.join(copy1, nextBlockRec, collector); + joinFunction.join(copy1, nextBlockRec, collector); } // reset block iterator this.blockIt.reset(); @@ -316,7 +316,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat while (this.blockIt.hasNext()) { copy1 = this.createCopy(serializer1, spillHeadCopy, this.copy1); final T2 nextBlockVal = blockIt.next(); - matchFunction.join(copy1, nextBlockVal, collector); + joinFunction.join(copy1, nextBlockVal, collector); } this.blockIt.reset(); @@ -329,7 +329,7 @@ public abstract class AbstractMergeIterator<T1, T2, O> implements JoinTaskIterat // get instances of key and block value final T2 nextBlockVal = this.blockIt.next(); copy1 = this.createCopy(serializer1, nextSpillVal, this.copy1); - matchFunction.join(copy1, nextBlockVal, collector); + joinFunction.join(copy1, nextBlockVal, collector); } // reset block iterator http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java new file mode 100644 index 0000000..01b371e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.MemoryAllocationException; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; + +import java.util.Iterator; + +/** + * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the + * outer join through a sort-merge join strategy. + */ +public abstract class AbstractMergeOuterJoinIterator<T1, T2, O> extends AbstractMergeIterator<T1, T2, O> { + + public static enum OuterJoinType {LEFT, RIGHT, FULL} + + private final OuterJoinType outerJoinType; + + private boolean initialized = false; + private boolean it1Empty = false; + private boolean it2Empty = false; + + + public AbstractMergeOuterJoinIterator( + OuterJoinType outerJoinType, + MutableObjectIterator<T1> input1, + MutableObjectIterator<T2> input2, + TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1, + TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2, + TypePairComparator<T1, T2> pairComparator, + MemoryManager memoryManager, + IOManager ioManager, + int numMemoryPages, + AbstractInvokable parentTask) + throws MemoryAllocationException { + super(input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask); + + this.outerJoinType = outerJoinType; + } + + /** + * Calls the <code>JoinFunction#join()</code> method for all two key-value pairs that share the same key and come + * from different inputs. Furthermore, depending on the outer join type (LEFT, RIGHT, FULL), all key-value pairs where no + * matching partner from the other input exists are joined with null. + * The output of the <code>join()</code> method is forwarded. + * + * @throws Exception Forwards all exceptions from the user code and the I/O system. + * @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction, org.apache.flink.util.Collector) + */ + @Override + public boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> joinFunction, final Collector<O> collector) throws Exception { + if (!initialized) { + //first run, set iterators to first elements + it1Empty = !this.iterator1.nextKey(); + it2Empty = !this.iterator2.nextKey(); + initialized = true; + } + + if (it1Empty && it2Empty) { + return false; + } else if (it2Empty) { + if (outerJoinType == OuterJoinType.LEFT || outerJoinType == OuterJoinType.FULL) { + joinLeftKeyValuesWithNull(iterator1.getValues(), joinFunction, collector); + it1Empty = !iterator1.nextKey(); + return true; + } else { + //consume rest of left side + while (iterator1.nextKey()) ; + it1Empty = true; + return false; + } + } else if (it1Empty) { + if (outerJoinType == OuterJoinType.RIGHT || outerJoinType == OuterJoinType.FULL) { + joinRightKeyValuesWithNull(iterator2.getValues(), joinFunction, collector); + it2Empty = !iterator2.nextKey(); + return true; + } else { + //consume rest of right side + while (iterator2.nextKey()) ; + it2Empty = true; + return false; + } + } else { + final TypePairComparator<T1, T2> comparator = super.pairComparator; + comparator.setReference(this.iterator1.getCurrent()); + T2 current2 = this.iterator2.getCurrent(); + + // zig zag + while (true) { + // determine the relation between the (possibly composite) keys + final int comp = comparator.compareToReference(current2); + + if (comp == 0) { + break; + } + + if (comp < 0) { + //right key < left key + if (outerJoinType == OuterJoinType.RIGHT || outerJoinType == OuterJoinType.FULL) { + //join right key values with null in case of right or full outer join + joinRightKeyValuesWithNull(iterator2.getValues(), joinFunction, collector); + it2Empty = !iterator2.nextKey(); + return true; + } else { + //skip this right key if it is a left outer join + if (!this.iterator2.nextKey()) { + //if right side is empty, join current left key values with null + joinLeftKeyValuesWithNull(iterator1.getValues(), joinFunction, collector); + it1Empty = !iterator1.nextKey(); + it2Empty = true; + return true; + } + current2 = this.iterator2.getCurrent(); + } + } else { + //right key > left key + if (outerJoinType == OuterJoinType.LEFT || outerJoinType == OuterJoinType.FULL) { + //join left key values with null in case of left or full outer join + joinLeftKeyValuesWithNull(iterator1.getValues(), joinFunction, collector); + it1Empty = !iterator1.nextKey(); + return true; + } else { + //skip this left key if it is a right outer join + if (!this.iterator1.nextKey()) { + //if right side is empty, join current right key values with null + joinRightKeyValuesWithNull(iterator2.getValues(), joinFunction, collector); + it1Empty = true; + it2Empty = !iterator2.nextKey(); + return true; + } + comparator.setReference(this.iterator1.getCurrent()); + } + } + } + + // here, we have a common key! call the join function with the cross product of the + // values + final Iterator<T1> values1 = this.iterator1.getValues(); + final Iterator<T2> values2 = this.iterator2.getValues(); + + crossMatchingGroup(values1, values2, joinFunction, collector); + it1Empty = !iterator1.nextKey(); + it2Empty = !iterator2.nextKey(); + return true; + } + } + + private void joinLeftKeyValuesWithNull(Iterator<T1> values, FlatJoinFunction<T1, T2, O> joinFunction, Collector<O> collector) throws Exception { + while (values.hasNext()) { + T1 next = values.next(); + this.copy1 = createCopy(serializer1, next, copy1); + joinFunction.join(copy1, null, collector); + } + } + + private void joinRightKeyValuesWithNull(Iterator<T2> values, FlatJoinFunction<T1, T2, O> joinFunction, Collector<O> collector) throws Exception { + while (values.hasNext()) { + T2 next = values.next(); + this.copy2 = createCopy(serializer2, next, copy2); + joinFunction.join(null, copy2, collector); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java new file mode 100644 index 0000000..ac49ece --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.MemoryAllocationException; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.util.KeyGroupedIterator; +import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator; +import org.apache.flink.util.MutableObjectIterator; + +public class NonReusingMergeOuterJoinIterator<T1, T2, O> extends AbstractMergeOuterJoinIterator<T1, T2, O> { + + public NonReusingMergeOuterJoinIterator( + OuterJoinType outerJoinType, + MutableObjectIterator<T1> input1, + MutableObjectIterator<T2> input2, + TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1, + TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2, + TypePairComparator<T1, T2> pairComparator, + MemoryManager memoryManager, + IOManager ioManager, + int numMemoryPages, + AbstractInvokable parentTask) + throws MemoryAllocationException { + super(outerJoinType, input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask); + } + + @Override + protected <T> KeyGroupedIterator<T> createKeyGroupedIterator(MutableObjectIterator<T> input, TypeSerializer<T> serializer, TypeComparator<T> comparator) { + return new NonReusingKeyGroupedIterator<T>(input, comparator); + } + + @Override + protected <T> T createCopy(TypeSerializer<T> serializer, T value, T reuse) { + return serializer.copy(value); + } + + +} http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java new file mode 100644 index 0000000..0cefbc5 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.MemoryAllocationException; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.util.KeyGroupedIterator; +import org.apache.flink.runtime.util.ReusingKeyGroupedIterator; +import org.apache.flink.util.MutableObjectIterator; + +public class ReusingMergeOuterJoinIterator<T1, T2, O> extends AbstractMergeOuterJoinIterator<T1, T2, O> { + + public ReusingMergeOuterJoinIterator( + OuterJoinType outerJoinType, + MutableObjectIterator<T1> input1, + MutableObjectIterator<T2> input2, + TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1, + TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2, + TypePairComparator<T1, T2> pairComparator, + MemoryManager memoryManager, + IOManager ioManager, + int numMemoryPages, + AbstractInvokable parentTask) + throws MemoryAllocationException { + super(outerJoinType, input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask); + + this.copy1 = serializer1.createInstance(); + this.spillHeadCopy = serializer1.createInstance(); + this.copy2 = serializer2.createInstance(); + this.blockHeadCopy = serializer2.createInstance(); + } + + @Override + protected <T> KeyGroupedIterator<T> createKeyGroupedIterator(MutableObjectIterator<T> input, TypeSerializer<T> serializer, TypeComparator<T> comparator) { + return new ReusingKeyGroupedIterator<T>(input, serializer, comparator); + } + + @Override + protected <T> T createCopy(TypeSerializer<T> serializer, T value, T reuse) { return serializer.copy(value, reuse); } + + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java new file mode 100644 index 0000000..1fbe025 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.util.ListCollector; +import org.apache.flink.api.common.typeutils.GenericPairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntComparator; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.runtime.TupleComparator; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.DefaultMemoryManager; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType; +import org.apache.flink.runtime.operators.testutils.*; +import org.apache.flink.runtime.operators.testutils.TestData.TupleConstantValueIterator; +import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator; +import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode; +import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode; +import org.apache.flink.runtime.operators.testutils.TestData.TupleGeneratorIterator; +import org.apache.flink.runtime.util.ResettableMutableObjectIterator; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; + +import java.util.*; +import java.util.Map.Entry; + +public abstract class AbstractSortMergeOuterJoinIteratorITCase { + + // total memory + private static final int MEMORY_SIZE = 1024 * 1024 * 16; + private static final int PAGES_FOR_BNLJN = 2; + + // the size of the left and right inputs + private static final int INPUT_1_SIZE = 20000; + + private static final int INPUT_2_SIZE = 1000; + + // random seeds for the left and right input data generators + private static final long SEED1 = 561349061987311L; + + private static final long SEED2 = 231434613412342L; + + // dummy abstract task + private final AbstractInvokable parentTask = new DummyInvokable(); + + private IOManager ioManager; + private MemoryManager memoryManager; + + private TupleTypeInfo<Tuple2<String, String>> typeInfo1; + private TupleTypeInfo<Tuple2<String, Integer>> typeInfo2; + private TupleSerializer<Tuple2<String, String>> serializer1; + private TupleSerializer<Tuple2<String, Integer>> serializer2; + private TypeComparator<Tuple2<String, String>> comparator1; + private TypeComparator<Tuple2<String, Integer>> comparator2; + private TypePairComparator<Tuple2<String, String>, Tuple2<String, Integer>> pairComp; + + + @Before + public void beforeTest() { + ExecutionConfig config = new ExecutionConfig(); + config.disableObjectReuse(); + + typeInfo1 = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class); + typeInfo2 = TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class); + serializer1 = typeInfo1.createSerializer(config); + serializer2 = typeInfo2.createSerializer(config); + comparator1 = typeInfo1.createComparator(new int[]{0}, new boolean[]{true}, 0, config); + comparator2 = typeInfo2.createComparator(new int[]{0}, new boolean[]{true}, 0, config); + pairComp = new GenericPairComparator<Tuple2<String, String>, Tuple2<String, Integer>>(comparator1, comparator2); + + this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1); + this.ioManager = new IOManagerAsync(); + } + + @After + public void afterTest() { + if (this.ioManager != null) { + this.ioManager.shutdown(); + if (!this.ioManager.isProperlyShutDown()) { + Assert.fail("I/O manager failed to properly shut down."); + } + this.ioManager = null; + } + + if (this.memoryManager != null) { + Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.", + this.memoryManager.verifyEmpty()); + this.memoryManager.shutdown(); + this.memoryManager = null; + } + } + + protected void testFullOuterWithSample() throws Exception { + CollectionIterator<Tuple2<String, String>> input1 = CollectionIterator.of( + new Tuple2<String, String>("Jack", "Engineering"), + new Tuple2<String, String>("Tim", "Sales"), + new Tuple2<String, String>("Zed", "HR") + ); + CollectionIterator<Tuple2<String, Integer>> input2 = CollectionIterator.of( + new Tuple2<String, Integer>("Allison", 100), + new Tuple2<String, Integer>("Jack", 200), + new Tuple2<String, Integer>("Zed", 150), + new Tuple2<String, Integer>("Zed", 250) + ); + + OuterJoinType outerJoinType = OuterJoinType.FULL; + List<Tuple4<String, String, String, Object>> actual = computeOuterJoin(input1, input2, outerJoinType); + + List<Tuple4<String, String, String, Object>> expected = Arrays.asList( + new Tuple4<String, String, String, Object>(null, null, "Allison", 100), + new Tuple4<String, String, String, Object>("Jack", "Engineering", "Jack", 200), + new Tuple4<String, String, String, Object>("Tim", "Sales", null, null), + new Tuple4<String, String, String, Object>("Zed", "HR", "Zed", 150), + new Tuple4<String, String, String, Object>("Zed", "HR", "Zed", 250) + ); + + Assert.assertEquals(expected, actual); + } + + protected void testLeftOuterWithSample() throws Exception { + CollectionIterator<Tuple2<String, String>> input1 = CollectionIterator.of( + new Tuple2<String, String>("Jack", "Engineering"), + new Tuple2<String, String>("Tim", "Sales"), + new Tuple2<String, String>("Zed", "HR") + ); + CollectionIterator<Tuple2<String, Integer>> input2 = CollectionIterator.of( + new Tuple2<String, Integer>("Allison", 100), + new Tuple2<String, Integer>("Jack", 200), + new Tuple2<String, Integer>("Zed", 150), + new Tuple2<String, Integer>("Zed", 250) + ); + + List<Tuple4<String, String, String, Object>> actual = computeOuterJoin(input1, input2, OuterJoinType.LEFT); + + List<Tuple4<String, String, String, Object>> expected = Arrays.asList( + new Tuple4<String, String, String, Object>("Jack", "Engineering", "Jack", 200), + new Tuple4<String, String, String, Object>("Tim", "Sales", null, null), + new Tuple4<String, String, String, Object>("Zed", "HR", "Zed", 150), + new Tuple4<String, String, String, Object>("Zed", "HR", "Zed", 250) + ); + + Assert.assertEquals(expected, actual); + } + + protected void testRightOuterWithSample() throws Exception { + CollectionIterator<Tuple2<String, String>> input1 = CollectionIterator.of( + new Tuple2<String, String>("Jack", "Engineering"), + new Tuple2<String, String>("Tim", "Sales"), + new Tuple2<String, String>("Zed", "HR") + ); + CollectionIterator<Tuple2<String, Integer>> input2 = CollectionIterator.of( + new Tuple2<String, Integer>("Allison", 100), + new Tuple2<String, Integer>("Jack", 200), + new Tuple2<String, Integer>("Zed", 150), + new Tuple2<String, Integer>("Zed", 250) + ); + + List<Tuple4<String, String, String, Object>> actual = computeOuterJoin(input1, input2, OuterJoinType.RIGHT); + + List<Tuple4<String, String, String, Object>> expected = Arrays.asList( + new Tuple4<String, String, String, Object>(null, null, "Allison", 100), + new Tuple4<String, String, String, Object>("Jack", "Engineering", "Jack", 200), + new Tuple4<String, String, String, Object>("Zed", "HR", "Zed", 150), + new Tuple4<String, String, String, Object>("Zed", "HR", "Zed", 250) + ); + + Assert.assertEquals(expected, actual); + } + + protected void testRightSideEmpty() throws Exception { + CollectionIterator<Tuple2<String, String>> input1 = CollectionIterator.of( + new Tuple2<String, String>("Jack", "Engineering"), + new Tuple2<String, String>("Tim", "Sales"), + new Tuple2<String, String>("Zed", "HR") + ); + CollectionIterator<Tuple2<String, Integer>> input2 = CollectionIterator.of(); + + List<Tuple4<String, String, String, Object>> actualLeft = computeOuterJoin(input1, input2, OuterJoinType.LEFT); + List<Tuple4<String, String, String, Object>> actualRight = computeOuterJoin(input1, input2, OuterJoinType.RIGHT); + List<Tuple4<String, String, String, Object>> actualFull = computeOuterJoin(input1, input2, OuterJoinType.FULL); + + List<Tuple4<String, String, String, Object>> expected = Arrays.asList( + new Tuple4<String, String, String, Object>("Jack", "Engineering", null, null), + new Tuple4<String, String, String, Object>("Tim", "Sales", null, null), + new Tuple4<String, String, String, Object>("Zed", "HR", null, null) + ); + + Assert.assertEquals(expected, actualLeft); + Assert.assertEquals(expected, actualFull); + Assert.assertEquals(Collections.<Tuple4<String,String,String,Object>>emptyList(), actualRight); + } + + protected void testLeftSideEmpty() throws Exception { + CollectionIterator<Tuple2<String, String>> input1 = CollectionIterator.of(); + CollectionIterator<Tuple2<String, Integer>> input2 = CollectionIterator.of( + new Tuple2<String, Integer>("Allison", 100), + new Tuple2<String, Integer>("Jack", 200), + new Tuple2<String, Integer>("Zed", 150), + new Tuple2<String, Integer>("Zed", 250) + ); + + List<Tuple4<String, String, String, Object>> actualLeft = computeOuterJoin(input1, input2, OuterJoinType.LEFT); + List<Tuple4<String, String, String, Object>> actualRight = computeOuterJoin(input1, input2, OuterJoinType.RIGHT); + List<Tuple4<String, String, String, Object>> actualFull = computeOuterJoin(input1, input2, OuterJoinType.FULL); + + List<Tuple4<String, String, String, Object>> expected = Arrays.asList( + new Tuple4<String, String, String, Object>(null, null, "Allison", 100), + new Tuple4<String, String, String, Object>(null, null, "Jack", 200), + new Tuple4<String, String, String, Object>(null, null, "Zed", 150), + new Tuple4<String, String, String, Object>(null, null, "Zed", 250) + ); + + Assert.assertEquals(Collections.<Tuple4<String,String,String,Object>>emptyList(), actualLeft); + Assert.assertEquals(expected, actualRight); + Assert.assertEquals(expected, actualFull); + } + + private List<Tuple4<String, String, String, Object>> computeOuterJoin(ResettableMutableObjectIterator<Tuple2<String, String>> input1, + ResettableMutableObjectIterator<Tuple2<String, Integer>> input2, + OuterJoinType outerJoinType) throws Exception { + input1.reset(); + input2.reset(); + AbstractMergeOuterJoinIterator<Tuple2<String, String>, Tuple2<String, Integer>, Tuple4<String, String, String, Object>> iterator = + createOuterJoinIterator(outerJoinType, input1, input2, serializer1, comparator1, serializer2, comparator2, + pairComp, this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask); + + List<Tuple4<String, String, String, Object>> actual = new ArrayList<Tuple4<String, String, String, Object>>(); + ListCollector<Tuple4<String, String, String, Object>> collector = new ListCollector<Tuple4<String, String, String, Object>>(actual); + while (iterator.callWithNextKey(new SimpleTupleJoinFunction(), collector)) ; + iterator.close(); + + return actual; + } + + protected void testOuterJoinWithHighNumberOfCommonKeys(OuterJoinType outerJoinType, int input1Size, int input1Duplicates, int input1ValueLength, + float input1KeyDensity, int input2Size, int input2Duplicates, int input2ValueLength, float input2KeyDensity) { + TypeSerializer<Tuple2<Integer, String>> serializer1 = new TupleSerializer<Tuple2<Integer, String>>( + (Class<Tuple2<Integer, String>>) (Class<?>) Tuple2.class, + new TypeSerializer<?>[] { IntSerializer.INSTANCE, StringSerializer.INSTANCE }); + TypeSerializer<Tuple2<Integer, String>> serializer2 = new TupleSerializer<Tuple2<Integer, String>>( + (Class<Tuple2<Integer, String>>) (Class<?>) Tuple2.class, + new TypeSerializer<?>[] { IntSerializer.INSTANCE, StringSerializer.INSTANCE }); + TypeComparator<Tuple2<Integer, String>> comparator1 = new TupleComparator<Tuple2<Integer, String>>( + new int[]{0}, + new TypeComparator<?>[] { new IntComparator(true) }, + new TypeSerializer<?>[] { IntSerializer.INSTANCE }); + TypeComparator<Tuple2<Integer, String>> comparator2 = new TupleComparator<Tuple2<Integer, String>>( + new int[]{0}, + new TypeComparator<?>[] { new IntComparator(true) }, + new TypeSerializer<?>[] { IntSerializer.INSTANCE }); + + TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> pairComparator = + new GenericPairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>>(comparator1, comparator2); + + this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1); + this.ioManager = new IOManagerAsync(); + + final int DUPLICATE_KEY = 13; + + try { + final TupleGenerator generator1 = new TupleGenerator(SEED1, 500, input1KeyDensity, input1ValueLength, KeyMode.SORTED_SPARSE, ValueMode.RANDOM_LENGTH, null); + final TupleGenerator generator2 = new TupleGenerator(SEED2, 500, input2KeyDensity, input2ValueLength, KeyMode.SORTED_SPARSE, ValueMode.RANDOM_LENGTH, null); + + final TupleGeneratorIterator gen1Iter = new TupleGeneratorIterator(generator1, input1Size); + final TupleGeneratorIterator gen2Iter = new TupleGeneratorIterator(generator2, input2Size); + + final TupleConstantValueIterator const1Iter = new TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", input1Duplicates); + final TupleConstantValueIterator const2Iter = new TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", input2Duplicates); + + final List<MutableObjectIterator<Tuple2<Integer, String>>> inList1 = new ArrayList<MutableObjectIterator<Tuple2<Integer, String>>>(); + inList1.add(gen1Iter); + inList1.add(const1Iter); + + final List<MutableObjectIterator<Tuple2<Integer, String>>> inList2 = new ArrayList<MutableObjectIterator<Tuple2<Integer, String>>>(); + inList2.add(gen2Iter); + inList2.add(const2Iter); + + MutableObjectIterator<Tuple2<Integer, String>> input1 = new MergeIterator<Tuple2<Integer, String>>(inList1, comparator1.duplicate()); + MutableObjectIterator<Tuple2<Integer, String>> input2 = new MergeIterator<Tuple2<Integer, String>>(inList2, comparator2.duplicate()); + + // collect expected data + final Map<Integer, Collection<Match>> expectedMatchesMap = joinValues( + collectData(input1), + collectData(input2), + outerJoinType); + + // re-create the whole thing for actual processing + + // reset the generators and iterators + generator1.reset(); + generator2.reset(); + const1Iter.reset(); + const2Iter.reset(); + gen1Iter.reset(); + gen2Iter.reset(); + + inList1.clear(); + inList1.add(gen1Iter); + inList1.add(const1Iter); + + inList2.clear(); + inList2.add(gen2Iter); + inList2.add(const2Iter); + + input1 = new MergeIterator<Tuple2<Integer, String>>(inList1, comparator1.duplicate()); + input2 = new MergeIterator<Tuple2<Integer, String>>(inList2, comparator2.duplicate()); + + final FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> joinFunction = + new MatchRemovingJoiner(expectedMatchesMap); + + final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<Tuple2<Integer, String>>(); + + + // we create this sort-merge iterator with little memory for the block-nested-loops fall-back to make sure it + // needs to spill for the duplicate keys + AbstractMergeOuterJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator = + createOuterJoinIterator( + outerJoinType, input1, input2, serializer1, comparator1, serializer2, comparator2, + pairComparator, this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask); + + iterator.open(); + + while (iterator.callWithNextKey(joinFunction, collector)) ; + + iterator.close(); + + // assert that each expected match was seen + for (Entry<Integer, Collection<Match>> entry : expectedMatchesMap.entrySet()) { + if (!entry.getValue().isEmpty()) { + Assert.fail("Collection for key " + entry.getKey() + " is not empty"); + } + } + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("An exception occurred during the test: " + e.getMessage()); + } + } + + protected abstract <T1, T2> AbstractMergeOuterJoinIterator createOuterJoinIterator(OuterJoinType outerJoinType, + MutableObjectIterator<T1> input1, + MutableObjectIterator<T2> input2, + TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1, + TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2, + TypePairComparator<T1, T2> pairComparator, + MemoryManager memoryManager, + IOManager ioManager, + int numMemoryPages, + AbstractInvokable parentTask) throws Exception; + + // -------------------------------------------------------------------------------------------- + // Utilities + // -------------------------------------------------------------------------------------------- + + + private Map<Integer, Collection<Match>> joinValues( + Map<Integer, Collection<String>> leftMap, + Map<Integer, Collection<String>> rightMap, + OuterJoinType outerJoinType) { + Map<Integer, Collection<Match>> map = new HashMap<Integer, Collection<Match>>(); + + for (Integer key : leftMap.keySet()) { + Collection<String> leftValues = leftMap.get(key); + Collection<String> rightValues = rightMap.get(key); + + if (outerJoinType == OuterJoinType.RIGHT && rightValues == null) { + continue; + } + + if (!map.containsKey(key)) { + map.put(key, new ArrayList<Match>()); + } + + Collection<Match> joinedValues = map.get(key); + + for (String leftValue : leftValues) { + if (rightValues != null) { + for (String rightValue : rightValues) { + joinedValues.add(new Match(leftValue, rightValue)); + } + } else { + joinedValues.add(new Match(leftValue, null)); + } + } + } + + if (outerJoinType == OuterJoinType.RIGHT || outerJoinType == OuterJoinType.FULL) { + for (Integer key : rightMap.keySet()) { + Collection<String> leftValues = leftMap.get(key); + Collection<String> rightValues = rightMap.get(key); + + if (leftValues != null) { + continue; + } + + if (!map.containsKey(key)) { + map.put(key, new ArrayList<Match>()); + } + + Collection<Match> joinedValues = map.get(key); + + for (String rightValue : rightValues) { + joinedValues.add(new Match(null, rightValue)); + } + } + } + + return map; + } + + + private Map<Integer, Collection<String>> collectData(MutableObjectIterator<Tuple2<Integer, String>> iter) + throws Exception { + final Map<Integer, Collection<String>> map = new HashMap<Integer, Collection<String>>(); + Tuple2<Integer, String> pair = new Tuple2<Integer, String>(); + + while ((pair = iter.next(pair)) != null) { + final Integer key = pair.getField(0); + + if (!map.containsKey(key)) { + map.put(key, new ArrayList<String>()); + } + + Collection<String> values = map.get(key); + final String value = pair.getField(1); + values.add(value); + } + + return map; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java index 7fc3734..6548052 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java @@ -135,7 +135,7 @@ public class NonReusingSortMergeInnerJoinIteratorITCase { collectData(input2)); final FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> joinFunction = - new MatchRemovingMatcher(expectedMatchesMap); + new MatchRemovingJoiner(expectedMatchesMap); final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<Tuple2<Integer, String>>(); @@ -226,7 +226,7 @@ public class NonReusingSortMergeInnerJoinIteratorITCase { input1 = new MergeIterator<Tuple2<Integer, String>>(inList1, comparator1.duplicate()); input2 = new MergeIterator<Tuple2<Integer, String>>(inList2, comparator2.duplicate()); - final FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> joinFunction = new MatchRemovingMatcher(expectedMatchesMap); + final FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> joinFunction = new MatchRemovingJoiner(expectedMatchesMap); final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<Tuple2<Integer, String>>(); http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java new file mode 100644 index 0000000..1205bc1 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType; +import org.apache.flink.util.MutableObjectIterator; +import org.junit.Test; + +public class NonReusingSortMergeOuterJoinIteratorITCase extends AbstractSortMergeOuterJoinIteratorITCase { + + @Override + protected <T1, T2> AbstractMergeOuterJoinIterator createOuterJoinIterator(OuterJoinType outerJoinType, MutableObjectIterator<T1> input1, + MutableObjectIterator<T2> input2, TypeSerializer<T1> serializer1, + TypeComparator<T1> comparator1, TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2, + TypePairComparator<T1, T2> pairComparator, MemoryManager memoryManager, IOManager ioManager, + int numMemoryPages, AbstractInvokable parentTask) throws Exception { + return new NonReusingMergeOuterJoinIterator(outerJoinType, input1, input2, serializer1, comparator1, + serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask); + } + + @Test + public void testFullOuterWithSample() throws Exception { + super.testFullOuterWithSample(); + } + + @Test + public void testLeftOuterWithSample() throws Exception { + super.testLeftOuterWithSample(); + } + + @Test + public void testRightOuterWithSample() throws Exception { + super.testRightOuterWithSample(); + } + + @Test + public void testRightSideEmpty() throws Exception { + super.testRightSideEmpty(); + } + + @Test + public void testLeftSideEmpty() throws Exception { + super.testLeftSideEmpty(); + } + + @Test + public void testFullOuterJoinWithHighNumberOfCommonKeys() { + testOuterJoinWithHighNumberOfCommonKeys(OuterJoinType.FULL, 200, 500, 2048, 0.02f, 200, 500, 2048, 0.02f); + } + + @Test + public void testLeftOuterJoinWithHighNumberOfCommonKeys() { + testOuterJoinWithHighNumberOfCommonKeys(OuterJoinType.LEFT, 200, 10, 4096, 0.02f, 100, 4000, 2048, 0.02f); + } + + @Test + public void testRightOuterJoinWithHighNumberOfCommonKeys() { + testOuterJoinWithHighNumberOfCommonKeys(OuterJoinType.RIGHT, 100, 10, 2048, 0.02f, 200, 4000, 4096, 0.02f); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java index e4eec86..39316e3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java @@ -135,7 +135,7 @@ public class ReusingSortMergeInnerJoinIteratorITCase { collectData(input2)); final FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> joinFunction = - new MatchRemovingMatcher(expectedMatchesMap); + new MatchRemovingJoiner(expectedMatchesMap); final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<Tuple2<Integer, String>>(); @@ -226,7 +226,7 @@ public class ReusingSortMergeInnerJoinIteratorITCase { input1 = new MergeIterator<Tuple2<Integer, String>>(inList1, comparator1.duplicate()); input2 = new MergeIterator<Tuple2<Integer, String>>(inList2, comparator2.duplicate()); - final FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> matcher = new MatchRemovingMatcher(expectedMatchesMap); + final FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> matcher = new MatchRemovingJoiner(expectedMatchesMap); final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<Tuple2<Integer, String>>(); http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java new file mode 100644 index 0000000..b4fbd80 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType; +import org.apache.flink.util.MutableObjectIterator; +import org.junit.Test; + +public class ReusingSortMergeOuterJoinIteratorITCase extends AbstractSortMergeOuterJoinIteratorITCase { + + @Override + protected <T1, T2> AbstractMergeOuterJoinIterator createOuterJoinIterator(OuterJoinType outerJoinType, MutableObjectIterator<T1> input1, + MutableObjectIterator<T2> input2, TypeSerializer<T1> serializer1, + TypeComparator<T1> comparator1, TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2, + TypePairComparator<T1, T2> pairComparator, MemoryManager memoryManager, IOManager ioManager, + int numMemoryPages, AbstractInvokable parentTask) throws Exception { + return new ReusingMergeOuterJoinIterator(outerJoinType, input1, input2, serializer1, comparator1, + serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask); + } + + @Test + public void testFullOuterWithSample() throws Exception { + super.testFullOuterWithSample(); + } + + @Test + public void testLeftOuterWithSample() throws Exception { + super.testLeftOuterWithSample(); + } + + @Test + public void testRightOuterWithSample() throws Exception { + super.testRightOuterWithSample(); + } + + @Test + public void testRightSideEmpty() throws Exception { + super.testRightSideEmpty(); + } + + @Test + public void testLeftSideEmpty() throws Exception { + super.testLeftSideEmpty(); + } + + @Test + public void testFullOuterJoinWithHighNumberOfCommonKeys() { + testOuterJoinWithHighNumberOfCommonKeys(OuterJoinType.FULL, 200, 500, 2048, 0.02f, 200, 500, 2048, 0.02f); + } + + @Test + public void testLeftOuterJoinWithHighNumberOfCommonKeys() { + testOuterJoinWithHighNumberOfCommonKeys(OuterJoinType.LEFT, 200, 10, 4096, 0.02f, 100, 4000, 2048, 0.02f); + } + + @Test + public void testRightOuterJoinWithHighNumberOfCommonKeys() { + testOuterJoinWithHighNumberOfCommonKeys(OuterJoinType.RIGHT, 100, 10, 2048, 0.02f, 200, 4000, 4096, 0.02f); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java index 539d864..4ac9093 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators.testutils; /** * Utility class for keeping track of matches in join operator tests. * - * @see org.apache.flink.runtime.operators.testutils.MatchRemovingMatcher + * @see MatchRemovingJoiner */ public class Match { private final String left; http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingJoiner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingJoiner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingJoiner.java new file mode 100644 index 0000000..e588d92 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingJoiner.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.testutils; + +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.Collector; +import org.junit.Assert; + +import java.util.Collection; +import java.util.Map; + + +public final class MatchRemovingJoiner implements FlatJoinFunction<Tuple2<Integer,String>,Tuple2<Integer,String>,Tuple2<Integer,String>> { + private static final long serialVersionUID = 1L; + + private final Map<Integer, Collection<Match>> toRemoveFrom; + + public MatchRemovingJoiner(Map<Integer, Collection<Match>> map) { + this.toRemoveFrom = map; + } + + @Override + public void join(Tuple2<Integer, String> rec1, Tuple2<Integer, String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception { + final Integer key = rec1 != null ? (Integer) rec1.getField(0) : (Integer) rec2.getField(0); + final String value1 = rec1 != null ? (String) rec1.getField(1) : null; + final String value2 = rec2 != null ? (String) rec2.getField(1) : null; + + Collection<Match> matches = this.toRemoveFrom.get(key); + if (matches == null) { + Assert.fail("Match " + key + " - " + value1 + ":" + value2 + " is unexpected."); + } + + boolean contained = matches.remove(new Match(value1, value2)); + if (!contained) { + Assert.fail("Produced match was not contained: " + key + " - " + value1 + ":" + value2); + } + if (matches.isEmpty()) { + this.toRemoveFrom.remove(key); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/941ac6df/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingMatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingMatcher.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingMatcher.java deleted file mode 100644 index f69b4d7..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MatchRemovingMatcher.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.operators.testutils; - -import org.apache.flink.api.common.functions.FlatJoinFunction; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.util.Collector; -import org.junit.Assert; - -import java.util.Collection; -import java.util.Map; - - -public final class MatchRemovingMatcher implements FlatJoinFunction<Tuple2<Integer,String>,Tuple2<Integer,String>,Tuple2<Integer,String>> { - private static final long serialVersionUID = 1L; - - private final Map<Integer, Collection<Match>> toRemoveFrom; - - public MatchRemovingMatcher(Map<Integer, Collection<Match>> map) { - this.toRemoveFrom = map; - } - - @Override - public void join(Tuple2<Integer, String> rec1, Tuple2<Integer, String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception { - final Integer key = rec1 != null ? (Integer) rec1.getField(0) : (Integer) rec2.getField(0); - final String value1 = rec1 != null ? (String) rec1.getField(1) : null; - final String value2 = rec2 != null ? (String) rec2.getField(1) : null; - - Collection<Match> matches = this.toRemoveFrom.get(key); - if (matches == null) { - Assert.fail("Match " + key + " - " + value1 + ":" + value2 + " is unexpected."); - } - - boolean contained = matches.remove(new Match(value1, value2)); - if (!contained) { - Assert.fail("Produced match was not contained: " + key + " - " + value1 + ":" + value2); - } - if (matches.isEmpty()) { - this.toRemoveFrom.remove(key); - } - } -}