[FLINK-2105] [tests] Move duplicate utility classes to testutil package
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/df9f4819 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/df9f4819 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/df9f4819 Branch: refs/heads/master Commit: df9f4819b9368600c7531dbf4d4ec42c1cddea8f Parents: db0b008 Author: r-pogalz <r.pog...@campus.tu-berlin.de> Authored: Mon Aug 3 12:59:01 2015 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Tue Aug 4 21:35:27 2015 +0200 ---------------------------------------------------------------------- .../flink/runtime/operators/MatchDriver.java | 8 +- .../sort/AbstractMergeInnerJoinIterator.java | 108 ++++++ .../sort/AbstractMergeMatchIterator.java | 107 ------ .../sort/NonReusingMergeInnerJoinIterator.java | 59 +++ .../sort/NonReusingMergeMatchIterator.java | 59 --- .../sort/ReusingMergeInnerJoinIterator.java | 64 ++++ .../sort/ReusingMergeMatchIterator.java | 64 ---- ...ReusingSortMergeInnerJoinIteratorITCase.java | 318 ++++++++++++++++ .../NonReusingSortMergeMatchIteratorITCase.java | 371 ------------------- ...ReusingSortMergeInnerJoinIteratorITCase.java | 318 ++++++++++++++++ .../ReusingSortMergeMatchIteratorITCase.java | 371 ------------------- .../operators/testutils/CollectionIterator.java | 61 +++ .../runtime/operators/testutils/Match.java | 63 ++++ .../testutils/MatchRemovingMatcher.java | 58 +++ .../testutils/SimpleTupleJoinFunction.java | 41 ++ .../operators/util/HashVsSortMiniBenchmark.java | 6 +- 16 files changed, 1097 insertions(+), 979 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java index 0381aab..e54fca5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java @@ -22,7 +22,7 @@ package org.apache.flink.runtime.operators; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator; import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashMatchIterator; -import org.apache.flink.runtime.operators.sort.NonReusingMergeMatchIterator; +import org.apache.flink.runtime.operators.sort.NonReusingMergeInnerJoinIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.functions.FlatJoinFunction; @@ -33,7 +33,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.memorymanager.MemoryManager; import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashMatchIterator; import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashMatchIterator; -import org.apache.flink.runtime.operators.sort.ReusingMergeMatchIterator; +import org.apache.flink.runtime.operators.sort.ReusingMergeInnerJoinIterator; import org.apache.flink.runtime.operators.util.JoinTaskIterator; import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.util.Collector; @@ -126,7 +126,7 @@ public class MatchDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT if (this.objectReuseEnabled) { switch (ls) { case MERGE: - this.matchIterator = new ReusingMergeMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask()); + this.matchIterator = new ReusingMergeInnerJoinIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask()); break; case HYBRIDHASH_BUILD_FIRST: @@ -141,7 +141,7 @@ public class MatchDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT } else { switch (ls) { case MERGE: - this.matchIterator = new NonReusingMergeMatchIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask()); + this.matchIterator = new NonReusingMergeInnerJoinIterator<IT1, IT2, OT>(in1, in2, serializer1, comparator1, serializer2, comparator2, pairComparatorFactory.createComparator12(comparator1, comparator2), memoryManager, ioManager, numPages, this.taskContext.getOwningNepheleTask()); break; case HYBRIDHASH_BUILD_FIRST: http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeInnerJoinIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeInnerJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeInnerJoinIterator.java new file mode 100644 index 0000000..e9ccf52 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeInnerJoinIterator.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.MemoryAllocationException; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; + +import java.util.Iterator; + +/** + * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the + * joining through a sort-merge join strategy. + */ +public abstract class AbstractMergeInnerJoinIterator<T1, T2, O> extends AbstractMergeIterator<T1, T2, O> { + + public AbstractMergeInnerJoinIterator( + MutableObjectIterator<T1> input1, MutableObjectIterator<T2> input2, + TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1, + TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2, + TypePairComparator<T1, T2> pairComparator, + MemoryManager memoryManager, + IOManager ioManager, + int numMemoryPages, + AbstractInvokable parentTask) + throws MemoryAllocationException { + super(input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask); + } + + /** + * Calls the <code>JoinFunction#join()</code> method for all two key-value pairs that share the same key and come + * from different inputs. The output of the <code>join()</code> method is forwarded. + * <p/> + * This method first zig-zags between the two sorted inputs in order to find a common + * key, and then calls the join stub with the cross product of the values. + * + * @throws Exception Forwards all exceptions from the user code and the I/O system. + * @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction, org.apache.flink.util.Collector) + */ + @Override + public boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> joinFunction, final Collector<O> collector) + throws Exception { + if (!this.iterator1.nextKey() || !this.iterator2.nextKey()) { + // consume all remaining keys (hack to prevent remaining inputs during iterations, lets get rid of this soon) + while (this.iterator1.nextKey()) ; + while (this.iterator2.nextKey()) ; + + return false; + } + + final TypePairComparator<T1, T2> comparator = this.pairComparator; + comparator.setReference(this.iterator1.getCurrent()); + T2 current2 = this.iterator2.getCurrent(); + + // zig zag + while (true) { + // determine the relation between the (possibly composite) keys + final int comp = comparator.compareToReference(current2); + + if (comp == 0) { + break; + } + + if (comp < 0) { + if (!this.iterator2.nextKey()) { + return false; + } + current2 = this.iterator2.getCurrent(); + } else { + if (!this.iterator1.nextKey()) { + return false; + } + comparator.setReference(this.iterator1.getCurrent()); + } + } + + // here, we have a common key! call the join function with the cross product of the + // values + final Iterator<T1> values1 = this.iterator1.getValues(); + final Iterator<T2> values2 = this.iterator2.getValues(); + + crossMatchingGroup(values1, values2, joinFunction, collector); + return true; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeMatchIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeMatchIterator.java deleted file mode 100644 index 791494d..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeMatchIterator.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.operators.sort; - -import org.apache.flink.api.common.functions.FlatJoinFunction; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypePairComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.memorymanager.MemoryAllocationException; -import org.apache.flink.runtime.memorymanager.MemoryManager; -import org.apache.flink.util.Collector; -import org.apache.flink.util.MutableObjectIterator; - -import java.util.Iterator; - -/** - * An implementation of the {@link org.apache.flink.runtime.operators.util.JoinTaskIterator} that realizes the - * matching through a sort-merge join strategy. - */ -public abstract class AbstractMergeMatchIterator<T1, T2, O> extends AbstractMergeIterator<T1, T2, O> { - - public AbstractMergeMatchIterator(MutableObjectIterator<T1> input1, MutableObjectIterator<T2> input2, - TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1, - TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2, - TypePairComparator<T1, T2> pairComparator, - MemoryManager memoryManager, - IOManager ioManager, - int numMemoryPages, - AbstractInvokable parentTask) - throws MemoryAllocationException { - super(input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask); - } - - /** - * Calls the <code>JoinFunction#match()</code> method for all two key-value pairs that share the same key and come - * from different inputs. The output of the <code>match()</code> method is forwarded. - * <p> - * This method first zig-zags between the two sorted inputs in order to find a common - * key, and then calls the match stub with the cross product of the values. - * - * @throws Exception Forwards all exceptions from the user code and the I/O system. - * @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey(org.apache.flink.api.common.functions.FlatJoinFunction, org.apache.flink.util.Collector) - */ - @Override - public boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector) - throws Exception { - if (!this.iterator1.nextKey() || !this.iterator2.nextKey()) { - // consume all remaining keys (hack to prevent remaining inputs during iterations, lets get rid of this soon) - while (this.iterator1.nextKey()) ; - while (this.iterator2.nextKey()) ; - - return false; - } - - final TypePairComparator<T1, T2> comparator = this.pairComparator; - comparator.setReference(this.iterator1.getCurrent()); - T2 current2 = this.iterator2.getCurrent(); - - // zig zag - while (true) { - // determine the relation between the (possibly composite) keys - final int comp = comparator.compareToReference(current2); - - if (comp == 0) { - break; - } - - if (comp < 0) { - if (!this.iterator2.nextKey()) { - return false; - } - current2 = this.iterator2.getCurrent(); - } else { - if (!this.iterator1.nextKey()) { - return false; - } - comparator.setReference(this.iterator1.getCurrent()); - } - } - - // here, we have a common key! call the match function with the cross product of the - // values - final Iterator<T1> values1 = this.iterator1.getValues(); - final Iterator<T2> values2 = this.iterator2.getValues(); - - crossMatchingGroup(values1, values2, matchFunction, collector); - return true; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeInnerJoinIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeInnerJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeInnerJoinIterator.java new file mode 100644 index 0000000..644084c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeInnerJoinIterator.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.MemoryAllocationException; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.util.KeyGroupedIterator; +import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator; +import org.apache.flink.util.MutableObjectIterator; + + +public class NonReusingMergeInnerJoinIterator<T1, T2, O> extends AbstractMergeInnerJoinIterator<T1, T2, O> { + + public NonReusingMergeInnerJoinIterator( + MutableObjectIterator<T1> input1, + MutableObjectIterator<T2> input2, + TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1, + TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2, + TypePairComparator<T1, T2> pairComparator, + MemoryManager memoryManager, + IOManager ioManager, + int numMemoryPages, + AbstractInvokable parentTask) + throws MemoryAllocationException { + super(input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask); + } + + @Override + protected <T> KeyGroupedIterator<T> createKeyGroupedIterator(MutableObjectIterator<T> input, TypeSerializer<T> serializer, TypeComparator<T> comparator) { + return new NonReusingKeyGroupedIterator<T>(input, comparator); + } + + @Override + protected <T> T createCopy(TypeSerializer<T> serializer, T value, T reuse) { + return serializer.copy(value); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java deleted file mode 100644 index 9705778..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeMatchIterator.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.operators.sort; - -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypePairComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.memorymanager.MemoryAllocationException; -import org.apache.flink.runtime.memorymanager.MemoryManager; -import org.apache.flink.runtime.util.KeyGroupedIterator; -import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator; -import org.apache.flink.util.MutableObjectIterator; - - -public class NonReusingMergeMatchIterator<T1, T2, O> extends AbstractMergeMatchIterator<T1, T2, O> { - - public NonReusingMergeMatchIterator( - MutableObjectIterator<T1> input1, - MutableObjectIterator<T2> input2, - TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1, - TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2, - TypePairComparator<T1, T2> pairComparator, - MemoryManager memoryManager, - IOManager ioManager, - int numMemoryPages, - AbstractInvokable parentTask) - throws MemoryAllocationException { - super(input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask); - } - - @Override - protected <T> KeyGroupedIterator<T> createKeyGroupedIterator(MutableObjectIterator<T> input, TypeSerializer<T> serializer, TypeComparator<T> comparator) { - return new NonReusingKeyGroupedIterator<T>(input, comparator); - } - - @Override - protected <T> T createCopy(TypeSerializer<T> serializer, T value, T reuse) { - return serializer.copy(value); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeInnerJoinIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeInnerJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeInnerJoinIterator.java new file mode 100644 index 0000000..3a1a17a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeInnerJoinIterator.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.MemoryAllocationException; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.util.KeyGroupedIterator; +import org.apache.flink.runtime.util.ReusingKeyGroupedIterator; +import org.apache.flink.util.MutableObjectIterator; + + +public class ReusingMergeInnerJoinIterator<T1, T2, O> extends AbstractMergeInnerJoinIterator<T1, T2, O> { + + public ReusingMergeInnerJoinIterator( + MutableObjectIterator<T1> input1, + MutableObjectIterator<T2> input2, + TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1, + TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2, + TypePairComparator<T1, T2> pairComparator, + MemoryManager memoryManager, + IOManager ioManager, + int numMemoryPages, + AbstractInvokable parentTask) + throws MemoryAllocationException { + super(input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask); + + this.copy1 = serializer1.createInstance(); + this.spillHeadCopy = serializer1.createInstance(); + this.copy2 = serializer2.createInstance(); + this.blockHeadCopy = serializer2.createInstance(); + } + + @Override + protected <T> KeyGroupedIterator<T> createKeyGroupedIterator(MutableObjectIterator<T> input, TypeSerializer<T> serializer, TypeComparator<T> comparator) { + return new ReusingKeyGroupedIterator<T>(input, serializer, comparator); + } + + @Override + protected <T> T createCopy(TypeSerializer<T> serializer, T value, T reuse) { + return serializer.copy(value, reuse); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java deleted file mode 100644 index c9cf5a2..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeMatchIterator.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.operators.sort; - -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypePairComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.memorymanager.MemoryAllocationException; -import org.apache.flink.runtime.memorymanager.MemoryManager; -import org.apache.flink.runtime.util.KeyGroupedIterator; -import org.apache.flink.runtime.util.ReusingKeyGroupedIterator; -import org.apache.flink.util.MutableObjectIterator; - - -public class ReusingMergeMatchIterator<T1, T2, O> extends AbstractMergeMatchIterator<T1, T2, O> { - - public ReusingMergeMatchIterator( - MutableObjectIterator<T1> input1, - MutableObjectIterator<T2> input2, - TypeSerializer<T1> serializer1, TypeComparator<T1> comparator1, - TypeSerializer<T2> serializer2, TypeComparator<T2> comparator2, - TypePairComparator<T1, T2> pairComparator, - MemoryManager memoryManager, - IOManager ioManager, - int numMemoryPages, - AbstractInvokable parentTask) - throws MemoryAllocationException { - super(input1, input2, serializer1, comparator1, serializer2, comparator2, pairComparator, memoryManager, ioManager, numMemoryPages, parentTask); - - this.copy1 = serializer1.createInstance(); - this.spillHeadCopy = serializer1.createInstance(); - this.copy2 = serializer2.createInstance(); - this.blockHeadCopy = serializer2.createInstance(); - } - - @Override - protected <T> KeyGroupedIterator<T> createKeyGroupedIterator(MutableObjectIterator<T> input, TypeSerializer<T> serializer, TypeComparator<T> comparator) { - return new ReusingKeyGroupedIterator<T>(input, serializer, comparator); - } - - @Override - protected <T> T createCopy(TypeSerializer<T> serializer, T value, T reuse) { - return serializer.copy(value, reuse); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java new file mode 100644 index 0000000..7fc3734 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeInnerJoinIteratorITCase.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.typeutils.GenericPairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntComparator; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.TupleComparator; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.DefaultMemoryManager; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.operators.testutils.*; +import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator; +import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode; +import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.*; +import java.util.Map.Entry; + +@SuppressWarnings("deprecation") +public class NonReusingSortMergeInnerJoinIteratorITCase { + + // total memory + private static final int MEMORY_SIZE = 1024 * 1024 * 16; + private static final int PAGES_FOR_BNLJN = 2; + + // the size of the left and right inputs + private static final int INPUT_1_SIZE = 20000; + + private static final int INPUT_2_SIZE = 1000; + + // random seeds for the left and right input data generators + private static final long SEED1 = 561349061987311L; + + private static final long SEED2 = 231434613412342L; + + // dummy abstract task + private final AbstractInvokable parentTask = new DummyInvokable(); + + private IOManager ioManager; + private MemoryManager memoryManager; + + private TypeSerializer<Tuple2<Integer, String>> serializer1; + private TypeSerializer<Tuple2<Integer, String>> serializer2; + private TypeComparator<Tuple2<Integer, String>> comparator1; + private TypeComparator<Tuple2<Integer, String>> comparator2; + private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> pairComparator; + + + @SuppressWarnings("unchecked") + @Before + public void beforeTest() { + serializer1 = new TupleSerializer<Tuple2<Integer, String>>( + (Class<Tuple2<Integer, String>>) (Class<?>) Tuple2.class, + new TypeSerializer<?>[] { IntSerializer.INSTANCE, StringSerializer.INSTANCE }); + serializer2 = new TupleSerializer<Tuple2<Integer, String>>( + (Class<Tuple2<Integer, String>>) (Class<?>) Tuple2.class, + new TypeSerializer<?>[] { IntSerializer.INSTANCE, StringSerializer.INSTANCE }); + comparator1 = new TupleComparator<Tuple2<Integer, String>>( + new int[]{0}, + new TypeComparator<?>[] { new IntComparator(true) }, + new TypeSerializer<?>[] { IntSerializer.INSTANCE }); + comparator2 = new TupleComparator<Tuple2<Integer, String>>( + new int[]{0}, + new TypeComparator<?>[] { new IntComparator(true) }, + new TypeSerializer<?>[] { IntSerializer.INSTANCE }); + pairComparator = new GenericPairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>>(comparator1, comparator2); + + this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1); + this.ioManager = new IOManagerAsync(); + } + + @After + public void afterTest() { + if (this.ioManager != null) { + this.ioManager.shutdown(); + if (!this.ioManager.isProperlyShutDown()) { + Assert.fail("I/O manager failed to properly shut down."); + } + this.ioManager = null; + } + + if (this.memoryManager != null) { + Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.", + this.memoryManager.verifyEmpty()); + this.memoryManager.shutdown(); + this.memoryManager = null; + } + } + + @Test + public void testMerge() { + try { + + final TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH); + final TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH); + + final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE); + final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE); + + // collect expected data + final Map<Integer, Collection<Match>> expectedMatchesMap = matchValues( + collectData(input1), + collectData(input2)); + + final FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> joinFunction = + new MatchRemovingMatcher(expectedMatchesMap); + + final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<Tuple2<Integer, String>>(); + + // reset the generators + generator1.reset(); + generator2.reset(); + input1.reset(); + input2.reset(); + + // compare with iterator values + NonReusingMergeInnerJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator = + new NonReusingMergeInnerJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>>( + input1, input2, this.serializer1, this.comparator1, this.serializer2, this.comparator2, + this.pairComparator, this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask); + + iterator.open(); + + while (iterator.callWithNextKey(joinFunction, collector)); + + iterator.close(); + + // assert that each expected match was seen + for (Entry<Integer, Collection<Match>> entry : expectedMatchesMap.entrySet()) { + Assert.assertTrue("Collection for key " + entry.getKey() + " is not empty", entry.getValue().isEmpty()); + } + } + catch (Exception e) { + e.printStackTrace(); + Assert.fail("An exception occurred during the test: " + e.getMessage()); + } + } + + @Test + public void testMergeWithHighNumberOfCommonKeys() + { + // the size of the left and right inputs + final int INPUT_1_SIZE = 200; + final int INPUT_2_SIZE = 100; + + final int INPUT_1_DUPLICATES = 10; + final int INPUT_2_DUPLICATES = 4000; + final int DUPLICATE_KEY = 13; + + try { + final TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH); + final TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH); + + final TestData.TupleGeneratorIterator gen1Iter = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE); + final TestData.TupleGeneratorIterator gen2Iter = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE); + + final TestData.TupleConstantValueIterator const1Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES); + final TestData.TupleConstantValueIterator const2Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES); + + final List<MutableObjectIterator<Tuple2<Integer, String>>> inList1 = new ArrayList<MutableObjectIterator<Tuple2<Integer, String>>>(); + inList1.add(gen1Iter); + inList1.add(const1Iter); + + final List<MutableObjectIterator<Tuple2<Integer, String>>> inList2 = new ArrayList<MutableObjectIterator<Tuple2<Integer, String>>>(); + inList2.add(gen2Iter); + inList2.add(const2Iter); + + MutableObjectIterator<Tuple2<Integer, String>> input1 = new MergeIterator<Tuple2<Integer, String>>(inList1, comparator1.duplicate()); + MutableObjectIterator<Tuple2<Integer, String>> input2 = new MergeIterator<Tuple2<Integer, String>>(inList2, comparator2.duplicate()); + + // collect expected data + final Map<Integer, Collection<Match>> expectedMatchesMap = matchValues( + collectData(input1), + collectData(input2)); + + // re-create the whole thing for actual processing + + // reset the generators and iterators + generator1.reset(); + generator2.reset(); + const1Iter.reset(); + const2Iter.reset(); + gen1Iter.reset(); + gen2Iter.reset(); + + inList1.clear(); + inList1.add(gen1Iter); + inList1.add(const1Iter); + + inList2.clear(); + inList2.add(gen2Iter); + inList2.add(const2Iter); + + input1 = new MergeIterator<Tuple2<Integer, String>>(inList1, comparator1.duplicate()); + input2 = new MergeIterator<Tuple2<Integer, String>>(inList2, comparator2.duplicate()); + + final FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> joinFunction = new MatchRemovingMatcher(expectedMatchesMap); + + final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<Tuple2<Integer, String>>(); + + + // we create this sort-merge iterator with little memory for the block-nested-loops fall-back to make sure it + // needs to spill for the duplicate keys + NonReusingMergeInnerJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator = + new NonReusingMergeInnerJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>>( + input1, input2, this.serializer1, this.comparator1, this.serializer2, this.comparator2, + this.pairComparator, this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask); + + iterator.open(); + + while (iterator.callWithNextKey(joinFunction, collector)); + + iterator.close(); + + // assert that each expected match was seen + for (Entry<Integer, Collection<Match>> entry : expectedMatchesMap.entrySet()) { + if (!entry.getValue().isEmpty()) { + Assert.fail("Collection for key " + entry.getKey() + " is not empty"); + } + } + } + catch (Exception e) { + e.printStackTrace(); + Assert.fail("An exception occurred during the test: " + e.getMessage()); + } + } + + + + // -------------------------------------------------------------------------------------------- + // Utilities + // -------------------------------------------------------------------------------------------- + + private Map<Integer, Collection<Match>> matchValues( + Map<Integer, Collection<String>> leftMap, + Map<Integer, Collection<String>> rightMap) + { + Map<Integer, Collection<Match>> map = new HashMap<Integer, Collection<Match>>(); + + for (Integer key : leftMap.keySet()) { + Collection<String> leftValues = leftMap.get(key); + Collection<String> rightValues = rightMap.get(key); + + if (rightValues == null) { + continue; + } + + if (!map.containsKey(key)) { + map.put(key, new ArrayList<Match>()); + } + + Collection<Match> matchedValues = map.get(key); + + for (String leftValue : leftValues) { + for (String rightValue : rightValues) { + matchedValues.add(new Match(leftValue, rightValue)); + } + } + } + + return map; + } + + + private Map<Integer, Collection<String>> collectData(MutableObjectIterator<Tuple2<Integer, String>> iter) + throws Exception + { + Map<Integer, Collection<String>> map = new HashMap<Integer, Collection<String>>(); + Tuple2<Integer, String> pair = new Tuple2<Integer, String>(); + + while ((pair = iter.next(pair)) != null) { + final Integer key = pair.getField(0); + + if (!map.containsKey(key)) { + map.put(key, new ArrayList<String>()); + } + + Collection<String> values = map.get(key); + final String value = pair.getField(1); + values.add(value); + } + + return map; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeMatchIteratorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeMatchIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeMatchIteratorITCase.java deleted file mode 100644 index 757b2e7..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeMatchIteratorITCase.java +++ /dev/null @@ -1,371 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.operators.sort; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypePairComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.record.RecordComparator; -import org.apache.flink.api.common.typeutils.record.RecordPairComparator; -import org.apache.flink.api.common.typeutils.record.RecordSerializer; -import org.apache.flink.api.java.record.functions.JoinFunction; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.memorymanager.DefaultMemoryManager; -import org.apache.flink.runtime.memorymanager.MemoryManager; -import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector; -import org.apache.flink.runtime.operators.testutils.DummyInvokable; -import org.apache.flink.runtime.operators.testutils.TestData; -import org.apache.flink.runtime.operators.testutils.TestData.Generator; -import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode; -import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode; -import org.apache.flink.types.Record; -import org.apache.flink.types.Value; -import org.apache.flink.util.Collector; -import org.apache.flink.util.MutableObjectIterator; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -@SuppressWarnings("deprecation") -public class NonReusingSortMergeMatchIteratorITCase { - - // total memory - private static final int MEMORY_SIZE = 1024 * 1024 * 16; - private static final int PAGES_FOR_BNLJN = 2; - - // the size of the left and right inputs - private static final int INPUT_1_SIZE = 20000; - - private static final int INPUT_2_SIZE = 1000; - - // random seeds for the left and right input data generators - private static final long SEED1 = 561349061987311L; - - private static final long SEED2 = 231434613412342L; - - // dummy abstract task - private final AbstractInvokable parentTask = new DummyInvokable(); - - private IOManager ioManager; - private MemoryManager memoryManager; - - private TypeSerializer<Record> serializer1; - private TypeSerializer<Record> serializer2; - private TypeComparator<Record> comparator1; - private TypeComparator<Record> comparator2; - private TypePairComparator<Record, Record> pairComparator; - - - @SuppressWarnings("unchecked") - @Before - public void beforeTest() { - this.serializer1 = RecordSerializer.get(); - this.serializer2 = RecordSerializer.get(); - this.comparator1 = new RecordComparator(new int[] {0}, new Class[]{TestData.Key.class}); - this.comparator2 = new RecordComparator(new int[] {0}, new Class[]{TestData.Key.class}); - this.pairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[]{TestData.Key.class}); - - this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1); - this.ioManager = new IOManagerAsync(); - } - - @After - public void afterTest() { - if (this.ioManager != null) { - this.ioManager.shutdown(); - if (!this.ioManager.isProperlyShutDown()) { - Assert.fail("I/O manager failed to properly shut down."); - } - this.ioManager = null; - } - - if (this.memoryManager != null) { - Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.", - this.memoryManager.verifyEmpty()); - this.memoryManager.shutdown(); - this.memoryManager = null; - } - } - - - - @Test - public void testMerge() { - try { - - final TestData.Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH); - final TestData.Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH); - - final TestData.GeneratorIterator input1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE); - final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE); - - // collect expected data - final Map<TestData.Key, Collection<Match>> expectedMatchesMap = matchValues( - collectData(input1), - collectData(input2)); - - final JoinFunction matcher = new MatchRemovingMatcher(expectedMatchesMap); - - final Collector<Record> collector = new DiscardingOutputCollector<Record>(); - - // reset the generators - generator1.reset(); - generator2.reset(); - input1.reset(); - input2.reset(); - - // compare with iterator values - NonReusingMergeMatchIterator<Record, Record, Record> iterator = - new NonReusingMergeMatchIterator<Record, Record, Record>( - input1, input2, this.serializer1, this.comparator1, this.serializer2, this.comparator2, - this.pairComparator, this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask); - - iterator.open(); - - while (iterator.callWithNextKey(matcher, collector)); - - iterator.close(); - - // assert that each expected match was seen - for (Entry<TestData.Key, Collection<Match>> entry : expectedMatchesMap.entrySet()) { - Assert.assertTrue("Collection for key " + entry.getKey() + " is not empty", entry.getValue().isEmpty()); - } - } - catch (Exception e) { - e.printStackTrace(); - Assert.fail("An exception occurred during the test: " + e.getMessage()); - } - } - - @Test - public void testMergeWithHighNumberOfCommonKeys() - { - // the size of the left and right inputs - final int INPUT_1_SIZE = 200; - final int INPUT_2_SIZE = 100; - - final int INPUT_1_DUPLICATES = 10; - final int INPUT_2_DUPLICATES = 4000; - final int DUPLICATE_KEY = 13; - - try { - final TestData.Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH); - final TestData.Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH); - - final TestData.GeneratorIterator gen1Iter = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE); - final TestData.GeneratorIterator gen2Iter = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE); - - final TestData.ConstantValueIterator const1Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES); - final TestData.ConstantValueIterator const2Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES); - - final List<MutableObjectIterator<Record>> inList1 = new ArrayList<MutableObjectIterator<Record>>(); - inList1.add(gen1Iter); - inList1.add(const1Iter); - - final List<MutableObjectIterator<Record>> inList2 = new ArrayList<MutableObjectIterator<Record>>(); - inList2.add(gen2Iter); - inList2.add(const2Iter); - - MutableObjectIterator<Record> input1 = new MergeIterator<Record>(inList1, comparator1.duplicate()); - MutableObjectIterator<Record> input2 = new MergeIterator<Record>(inList2, comparator2.duplicate()); - - // collect expected data - final Map<TestData.Key, Collection<Match>> expectedMatchesMap = matchValues( - collectData(input1), - collectData(input2)); - - // re-create the whole thing for actual processing - - // reset the generators and iterators - generator1.reset(); - generator2.reset(); - const1Iter.reset(); - const2Iter.reset(); - gen1Iter.reset(); - gen2Iter.reset(); - - inList1.clear(); - inList1.add(gen1Iter); - inList1.add(const1Iter); - - inList2.clear(); - inList2.add(gen2Iter); - inList2.add(const2Iter); - - input1 = new MergeIterator<Record>(inList1, comparator1.duplicate()); - input2 = new MergeIterator<Record>(inList2, comparator2.duplicate()); - - final JoinFunction matcher = new MatchRemovingMatcher(expectedMatchesMap); - - final Collector<Record> collector = new DiscardingOutputCollector<Record>(); - - - // we create this sort-merge iterator with little memory for the block-nested-loops fall-back to make sure it - // needs to spill for the duplicate keys - NonReusingMergeMatchIterator<Record, Record, Record> iterator = - new NonReusingMergeMatchIterator<Record, Record, Record>( - input1, input2, this.serializer1, this.comparator1, this.serializer2, this.comparator2, - this.pairComparator, this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask); - - iterator.open(); - - while (iterator.callWithNextKey(matcher, collector)); - - iterator.close(); - - // assert that each expected match was seen - for (Entry<TestData.Key, Collection<Match>> entry : expectedMatchesMap.entrySet()) { - if (!entry.getValue().isEmpty()) { - Assert.fail("Collection for key " + entry.getKey() + " is not empty"); - } - } - } - catch (Exception e) { - e.printStackTrace(); - Assert.fail("An exception occurred during the test: " + e.getMessage()); - } - } - - - - // -------------------------------------------------------------------------------------------- - // Utilities - // -------------------------------------------------------------------------------------------- - - private Map<TestData.Key, Collection<Match>> matchValues( - Map<TestData.Key, Collection<TestData.Value>> leftMap, - Map<TestData.Key, Collection<TestData.Value>> rightMap) - { - Map<TestData.Key, Collection<Match>> map = new HashMap<TestData.Key, Collection<Match>>(); - - for (TestData.Key key : leftMap.keySet()) { - Collection<TestData.Value> leftValues = leftMap.get(key); - Collection<TestData.Value> rightValues = rightMap.get(key); - - if (rightValues == null) { - continue; - } - - if (!map.containsKey(key)) { - map.put(key, new ArrayList<Match>()); - } - - Collection<Match> matchedValues = map.get(key); - - for (TestData.Value leftValue : leftValues) { - for (TestData.Value rightValue : rightValues) { - matchedValues.add(new Match(leftValue, rightValue)); - } - } - } - - return map; - } - - - private Map<TestData.Key, Collection<TestData.Value>> collectData(MutableObjectIterator<Record> iter) - throws Exception - { - Map<TestData.Key, Collection<TestData.Value>> map = new HashMap<TestData.Key, Collection<TestData.Value>>(); - Record pair = new Record(); - - while ((pair = iter.next(pair)) != null) { - TestData.Key key = pair.getField(0, TestData.Key.class); - - if (!map.containsKey(key)) { - map.put(new TestData.Key(key.getKey()), new ArrayList<TestData.Value>()); - } - - Collection<TestData.Value> values = map.get(key); - values.add(new TestData.Value(pair.getField(1, TestData.Value.class).getValue())); - } - - return map; - } - - /** - * Private class used for storage of the expected matches in a hashmap. - */ - private static class Match { - private final Value left; - - private final Value right; - - public Match(Value left, Value right) { - this.left = left; - this.right = right; - } - - @Override - public boolean equals(Object obj) { - Match o = (Match) obj; - return this.left.equals(o.left) && this.right.equals(o.right); - } - - @Override - public int hashCode() { - return this.left.hashCode() ^ this.right.hashCode(); - } - - @Override - public String toString() { - return left + ", " + right; - } - } - - private static final class MatchRemovingMatcher extends JoinFunction { - private static final long serialVersionUID = 1L; - - private final Map<TestData.Key, Collection<Match>> toRemoveFrom; - - protected MatchRemovingMatcher(Map<TestData.Key, Collection<Match>> map) { - this.toRemoveFrom = map; - } - - @Override - public void join(Record rec1, Record rec2, Collector<Record> out) throws Exception { - TestData.Key key = rec1.getField(0, TestData.Key.class); - TestData.Value value1 = rec1.getField(1, TestData.Value.class); - TestData.Value value2 = rec2.getField(1, TestData.Value.class); - - Collection<Match> matches = this.toRemoveFrom.get(key); - if (matches == null) { - Assert.fail("Match " + key + " - " + value1 + ":" + value2 + " is unexpected."); - } - - boolean contained = matches.remove(new Match(value1, value2)); - if (!contained) { - Assert.fail("Produced match was not contained: " + key + " - " + value1 + ":" + value2); - } - if (matches.isEmpty()) { - this.toRemoveFrom.remove(key); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java new file mode 100644 index 0000000..e4eec86 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeInnerJoinIteratorITCase.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.typeutils.GenericPairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntComparator; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.TupleComparator; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memorymanager.DefaultMemoryManager; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.operators.testutils.*; +import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator; +import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode; +import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.*; +import java.util.Map.Entry; + +@SuppressWarnings("deprecation") +public class ReusingSortMergeInnerJoinIteratorITCase { + + // total memory + private static final int MEMORY_SIZE = 1024 * 1024 * 16; + private static final int PAGES_FOR_BNLJN = 2; + + // the size of the left and right inputs + private static final int INPUT_1_SIZE = 20000; + + private static final int INPUT_2_SIZE = 1000; + + // random seeds for the left and right input data generators + private static final long SEED1 = 561349061987311L; + + private static final long SEED2 = 231434613412342L; + + // dummy abstract task + private final AbstractInvokable parentTask = new DummyInvokable(); + + private IOManager ioManager; + private MemoryManager memoryManager; + + private TypeSerializer<Tuple2<Integer, String>> serializer1; + private TypeSerializer<Tuple2<Integer, String>> serializer2; + private TypeComparator<Tuple2<Integer, String>> comparator1; + private TypeComparator<Tuple2<Integer, String>> comparator2; + private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> pairComparator; + + + @SuppressWarnings("unchecked") + @Before + public void beforeTest() { + serializer1 = new TupleSerializer<Tuple2<Integer, String>>( + (Class<Tuple2<Integer, String>>) (Class<?>) Tuple2.class, + new TypeSerializer<?>[] { IntSerializer.INSTANCE, StringSerializer.INSTANCE }); + serializer2 = new TupleSerializer<Tuple2<Integer, String>>( + (Class<Tuple2<Integer, String>>) (Class<?>) Tuple2.class, + new TypeSerializer<?>[] { IntSerializer.INSTANCE, StringSerializer.INSTANCE }); + comparator1 = new TupleComparator<Tuple2<Integer, String>>( + new int[]{0}, + new TypeComparator<?>[] { new IntComparator(true) }, + new TypeSerializer<?>[] { IntSerializer.INSTANCE }); + comparator2 = new TupleComparator<Tuple2<Integer, String>>( + new int[]{0}, + new TypeComparator<?>[] { new IntComparator(true) }, + new TypeSerializer<?>[] { IntSerializer.INSTANCE }); + pairComparator = new GenericPairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>>(comparator1, comparator2); + + this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1); + this.ioManager = new IOManagerAsync(); + } + + @After + public void afterTest() { + if (this.ioManager != null) { + this.ioManager.shutdown(); + if (!this.ioManager.isProperlyShutDown()) { + Assert.fail("I/O manager failed to properly shut down."); + } + this.ioManager = null; + } + + if (this.memoryManager != null) { + Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.", + this.memoryManager.verifyEmpty()); + this.memoryManager.shutdown(); + this.memoryManager = null; + } + } + + @Test + public void testMerge() { + try { + + final TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH); + final TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH); + + final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE); + final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE); + + // collect expected data + final Map<Integer, Collection<Match>> expectedMatchesMap = matchValues( + collectData(input1), + collectData(input2)); + + final FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> joinFunction = + new MatchRemovingMatcher(expectedMatchesMap); + + final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<Tuple2<Integer, String>>(); + + // reset the generators + generator1.reset(); + generator2.reset(); + input1.reset(); + input2.reset(); + + // compare with iterator values + ReusingMergeInnerJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator = + new ReusingMergeInnerJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>>( + input1, input2, this.serializer1, this.comparator1, this.serializer2, this.comparator2, + this.pairComparator, this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask); + + iterator.open(); + + while (iterator.callWithNextKey(joinFunction, collector)); + + iterator.close(); + + // assert that each expected match was seen + for (Entry<Integer, Collection<Match>> entry : expectedMatchesMap.entrySet()) { + Assert.assertTrue("Collection for key " + entry.getKey() + " is not empty", entry.getValue().isEmpty()); + } + } + catch (Exception e) { + e.printStackTrace(); + Assert.fail("An exception occurred during the test: " + e.getMessage()); + } + } + + @Test + public void testMergeWithHighNumberOfCommonKeys() + { + // the size of the left and right inputs + final int INPUT_1_SIZE = 200; + final int INPUT_2_SIZE = 100; + + final int INPUT_1_DUPLICATES = 10; + final int INPUT_2_DUPLICATES = 4000; + final int DUPLICATE_KEY = 13; + + try { + final TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH); + final TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH); + + final TestData.TupleGeneratorIterator gen1Iter = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE); + final TestData.TupleGeneratorIterator gen2Iter = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE); + + final TestData.TupleConstantValueIterator const1Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES); + final TestData.TupleConstantValueIterator const2Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES); + + final List<MutableObjectIterator<Tuple2<Integer, String>>> inList1 = new ArrayList<MutableObjectIterator<Tuple2<Integer, String>>>(); + inList1.add(gen1Iter); + inList1.add(const1Iter); + + final List<MutableObjectIterator<Tuple2<Integer, String>>> inList2 = new ArrayList<MutableObjectIterator<Tuple2<Integer, String>>>(); + inList2.add(gen2Iter); + inList2.add(const2Iter); + + MutableObjectIterator<Tuple2<Integer, String>> input1 = new MergeIterator<Tuple2<Integer, String>>(inList1, comparator1.duplicate()); + MutableObjectIterator<Tuple2<Integer, String>> input2 = new MergeIterator<Tuple2<Integer, String>>(inList2, comparator2.duplicate()); + + // collect expected data + final Map<Integer, Collection<Match>> expectedMatchesMap = matchValues( + collectData(input1), + collectData(input2)); + + // re-create the whole thing for actual processing + + // reset the generators and iterators + generator1.reset(); + generator2.reset(); + const1Iter.reset(); + const2Iter.reset(); + gen1Iter.reset(); + gen2Iter.reset(); + + inList1.clear(); + inList1.add(gen1Iter); + inList1.add(const1Iter); + + inList2.clear(); + inList2.add(gen2Iter); + inList2.add(const2Iter); + + input1 = new MergeIterator<Tuple2<Integer, String>>(inList1, comparator1.duplicate()); + input2 = new MergeIterator<Tuple2<Integer, String>>(inList2, comparator2.duplicate()); + + final FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> matcher = new MatchRemovingMatcher(expectedMatchesMap); + + final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<Tuple2<Integer, String>>(); + + + // we create this sort-merge iterator with little memory for the block-nested-loops fall-back to make sure it + // needs to spill for the duplicate keys + ReusingMergeInnerJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator = + new ReusingMergeInnerJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>>( + input1, input2, this.serializer1, this.comparator1, this.serializer2, this.comparator2, + this.pairComparator, this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask); + + iterator.open(); + + while (iterator.callWithNextKey(matcher, collector)); + + iterator.close(); + + // assert that each expected match was seen + for (Entry<Integer, Collection<Match>> entry : expectedMatchesMap.entrySet()) { + if (!entry.getValue().isEmpty()) { + Assert.fail("Collection for key " + entry.getKey() + " is not empty"); + } + } + } + catch (Exception e) { + e.printStackTrace(); + Assert.fail("An exception occurred during the test: " + e.getMessage()); + } + } + + + + // -------------------------------------------------------------------------------------------- + // Utilities + // -------------------------------------------------------------------------------------------- + + private Map<Integer, Collection<Match>> matchValues( + Map<Integer, Collection<String>> leftMap, + Map<Integer, Collection<String>> rightMap) + { + Map<Integer, Collection<Match>> map = new HashMap<Integer, Collection<Match>>(); + + for (Integer key : leftMap.keySet()) { + Collection<String> leftValues = leftMap.get(key); + Collection<String> rightValues = rightMap.get(key); + + if (rightValues == null) { + continue; + } + + if (!map.containsKey(key)) { + map.put(key, new ArrayList<Match>()); + } + + Collection<Match> matchedValues = map.get(key); + + for (String leftValue : leftValues) { + for (String rightValue : rightValues) { + matchedValues.add(new Match(leftValue, rightValue)); + } + } + } + + return map; + } + + + private Map<Integer, Collection<String>> collectData(MutableObjectIterator<Tuple2<Integer, String>> iter) + throws Exception + { + Map<Integer, Collection<String>> map = new HashMap<Integer, Collection<String>>(); + Tuple2<Integer, String> pair = new Tuple2<Integer, String>(); + + while ((pair = iter.next(pair)) != null) { + final Integer key = pair.getField(0); + + if (!map.containsKey(key)) { + map.put(key, new ArrayList<String>()); + } + + Collection<String> values = map.get(key); + final String value = pair.getField(1); + values.add(value); + } + + return map; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeMatchIteratorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeMatchIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeMatchIteratorITCase.java deleted file mode 100644 index 474fa3c..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeMatchIteratorITCase.java +++ /dev/null @@ -1,371 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.operators.sort; - -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypePairComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.record.RecordComparator; -import org.apache.flink.api.common.typeutils.record.RecordPairComparator; -import org.apache.flink.api.common.typeutils.record.RecordSerializer; -import org.apache.flink.api.java.record.functions.JoinFunction; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.memorymanager.DefaultMemoryManager; -import org.apache.flink.runtime.memorymanager.MemoryManager; -import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector; -import org.apache.flink.runtime.operators.testutils.DummyInvokable; -import org.apache.flink.runtime.operators.testutils.TestData; -import org.apache.flink.runtime.operators.testutils.TestData.Generator; -import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode; -import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode; -import org.apache.flink.types.Record; -import org.apache.flink.types.Value; -import org.apache.flink.util.Collector; -import org.apache.flink.util.MutableObjectIterator; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -@SuppressWarnings("deprecation") -public class ReusingSortMergeMatchIteratorITCase { - - // total memory - private static final int MEMORY_SIZE = 1024 * 1024 * 16; - private static final int PAGES_FOR_BNLJN = 2; - - // the size of the left and right inputs - private static final int INPUT_1_SIZE = 20000; - - private static final int INPUT_2_SIZE = 1000; - - // random seeds for the left and right input data generators - private static final long SEED1 = 561349061987311L; - - private static final long SEED2 = 231434613412342L; - - // dummy abstract task - private final AbstractInvokable parentTask = new DummyInvokable(); - - private IOManager ioManager; - private MemoryManager memoryManager; - - private TypeSerializer<Record> serializer1; - private TypeSerializer<Record> serializer2; - private TypeComparator<Record> comparator1; - private TypeComparator<Record> comparator2; - private TypePairComparator<Record, Record> pairComparator; - - - @SuppressWarnings("unchecked") - @Before - public void beforeTest() { - this.serializer1 = RecordSerializer.get(); - this.serializer2 = RecordSerializer.get(); - this.comparator1 = new RecordComparator(new int[] {0}, new Class[]{TestData.Key.class}); - this.comparator2 = new RecordComparator(new int[] {0}, new Class[]{TestData.Key.class}); - this.pairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[]{TestData.Key.class}); - - this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1); - this.ioManager = new IOManagerAsync(); - } - - @After - public void afterTest() { - if (this.ioManager != null) { - this.ioManager.shutdown(); - if (!this.ioManager.isProperlyShutDown()) { - Assert.fail("I/O manager failed to properly shut down."); - } - this.ioManager = null; - } - - if (this.memoryManager != null) { - Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.", - this.memoryManager.verifyEmpty()); - this.memoryManager.shutdown(); - this.memoryManager = null; - } - } - - - - @Test - public void testMerge() { - try { - - final Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH); - final Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH); - - final TestData.GeneratorIterator input1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE); - final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE); - - // collect expected data - final Map<TestData.Key, Collection<Match>> expectedMatchesMap = matchValues( - collectData(input1), - collectData(input2)); - - final JoinFunction matcher = new MatchRemovingMatcher(expectedMatchesMap); - - final Collector<Record> collector = new DiscardingOutputCollector<Record>(); - - // reset the generators - generator1.reset(); - generator2.reset(); - input1.reset(); - input2.reset(); - - // compare with iterator values - ReusingMergeMatchIterator<Record, Record, Record> iterator = - new ReusingMergeMatchIterator<Record, Record, Record>( - input1, input2, this.serializer1, this.comparator1, this.serializer2, this.comparator2, - this.pairComparator, this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask); - - iterator.open(); - - while (iterator.callWithNextKey(matcher, collector)); - - iterator.close(); - - // assert that each expected match was seen - for (Entry<TestData.Key, Collection<Match>> entry : expectedMatchesMap.entrySet()) { - Assert.assertTrue("Collection for key " + entry.getKey() + " is not empty", entry.getValue().isEmpty()); - } - } - catch (Exception e) { - e.printStackTrace(); - Assert.fail("An exception occurred during the test: " + e.getMessage()); - } - } - - @Test - public void testMergeWithHighNumberOfCommonKeys() - { - // the size of the left and right inputs - final int INPUT_1_SIZE = 200; - final int INPUT_2_SIZE = 100; - - final int INPUT_1_DUPLICATES = 10; - final int INPUT_2_DUPLICATES = 4000; - final int DUPLICATE_KEY = 13; - - try { - final Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH); - final Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH); - - final TestData.GeneratorIterator gen1Iter = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE); - final TestData.GeneratorIterator gen2Iter = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE); - - final TestData.ConstantValueIterator const1Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES); - final TestData.ConstantValueIterator const2Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES); - - final List<MutableObjectIterator<Record>> inList1 = new ArrayList<MutableObjectIterator<Record>>(); - inList1.add(gen1Iter); - inList1.add(const1Iter); - - final List<MutableObjectIterator<Record>> inList2 = new ArrayList<MutableObjectIterator<Record>>(); - inList2.add(gen2Iter); - inList2.add(const2Iter); - - MutableObjectIterator<Record> input1 = new MergeIterator<Record>(inList1, comparator1.duplicate()); - MutableObjectIterator<Record> input2 = new MergeIterator<Record>(inList2, comparator2.duplicate()); - - // collect expected data - final Map<TestData.Key, Collection<Match>> expectedMatchesMap = matchValues( - collectData(input1), - collectData(input2)); - - // re-create the whole thing for actual processing - - // reset the generators and iterators - generator1.reset(); - generator2.reset(); - const1Iter.reset(); - const2Iter.reset(); - gen1Iter.reset(); - gen2Iter.reset(); - - inList1.clear(); - inList1.add(gen1Iter); - inList1.add(const1Iter); - - inList2.clear(); - inList2.add(gen2Iter); - inList2.add(const2Iter); - - input1 = new MergeIterator<Record>(inList1, comparator1.duplicate()); - input2 = new MergeIterator<Record>(inList2, comparator2.duplicate()); - - final JoinFunction matcher = new MatchRemovingMatcher(expectedMatchesMap); - - final Collector<Record> collector = new DiscardingOutputCollector<Record>(); - - - // we create this sort-merge iterator with little memory for the block-nested-loops fall-back to make sure it - // needs to spill for the duplicate keys - ReusingMergeMatchIterator<Record, Record, Record> iterator = - new ReusingMergeMatchIterator<Record, Record, Record>( - input1, input2, this.serializer1, this.comparator1, this.serializer2, this.comparator2, - this.pairComparator, this.memoryManager, this.ioManager, PAGES_FOR_BNLJN, this.parentTask); - - iterator.open(); - - while (iterator.callWithNextKey(matcher, collector)); - - iterator.close(); - - // assert that each expected match was seen - for (Entry<TestData.Key, Collection<Match>> entry : expectedMatchesMap.entrySet()) { - if (!entry.getValue().isEmpty()) { - Assert.fail("Collection for key " + entry.getKey() + " is not empty"); - } - } - } - catch (Exception e) { - e.printStackTrace(); - Assert.fail("An exception occurred during the test: " + e.getMessage()); - } - } - - - - // -------------------------------------------------------------------------------------------- - // Utilities - // -------------------------------------------------------------------------------------------- - - private Map<TestData.Key, Collection<Match>> matchValues( - Map<TestData.Key, Collection<TestData.Value>> leftMap, - Map<TestData.Key, Collection<TestData.Value>> rightMap) - { - Map<TestData.Key, Collection<Match>> map = new HashMap<TestData.Key, Collection<Match>>(); - - for (TestData.Key key : leftMap.keySet()) { - Collection<TestData.Value> leftValues = leftMap.get(key); - Collection<TestData.Value> rightValues = rightMap.get(key); - - if (rightValues == null) { - continue; - } - - if (!map.containsKey(key)) { - map.put(key, new ArrayList<Match>()); - } - - Collection<Match> matchedValues = map.get(key); - - for (TestData.Value leftValue : leftValues) { - for (TestData.Value rightValue : rightValues) { - matchedValues.add(new Match(leftValue, rightValue)); - } - } - } - - return map; - } - - - private Map<TestData.Key, Collection<TestData.Value>> collectData(MutableObjectIterator<Record> iter) - throws Exception - { - Map<TestData.Key, Collection<TestData.Value>> map = new HashMap<TestData.Key, Collection<TestData.Value>>(); - Record pair = new Record(); - - while ((pair = iter.next(pair)) != null) { - TestData.Key key = pair.getField(0, TestData.Key.class); - - if (!map.containsKey(key)) { - map.put(new TestData.Key(key.getKey()), new ArrayList<TestData.Value>()); - } - - Collection<TestData.Value> values = map.get(key); - values.add(new TestData.Value(pair.getField(1, TestData.Value.class).getValue())); - } - - return map; - } - - /** - * Private class used for storage of the expected matches in a hashmap. - */ - private static class Match { - private final Value left; - - private final Value right; - - public Match(Value left, Value right) { - this.left = left; - this.right = right; - } - - @Override - public boolean equals(Object obj) { - Match o = (Match) obj; - return this.left.equals(o.left) && this.right.equals(o.right); - } - - @Override - public int hashCode() { - return this.left.hashCode() ^ this.right.hashCode(); - } - - @Override - public String toString() { - return left + ", " + right; - } - } - - private static final class MatchRemovingMatcher extends JoinFunction { - private static final long serialVersionUID = 1L; - - private final Map<TestData.Key, Collection<Match>> toRemoveFrom; - - protected MatchRemovingMatcher(Map<TestData.Key, Collection<Match>> map) { - this.toRemoveFrom = map; - } - - @Override - public void join(Record rec1, Record rec2, Collector<Record> out) throws Exception { - TestData.Key key = rec1.getField(0, TestData.Key.class); - TestData.Value value1 = rec1.getField(1, TestData.Value.class); - TestData.Value value2 = rec2.getField(1, TestData.Value.class); - - Collection<Match> matches = this.toRemoveFrom.get(key); - if (matches == null) { - Assert.fail("Match " + key + " - " + value1 + ":" + value2 + " is unexpected."); - } - - boolean contained = matches.remove(new Match(value1, value2)); - if (!contained) { - Assert.fail("Produced match was not contained: " + key + " - " + value1 + ":" + value2); - } - if (matches.isEmpty()) { - this.toRemoveFrom.remove(key); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/CollectionIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/CollectionIterator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/CollectionIterator.java new file mode 100644 index 0000000..7fd1b6c --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/CollectionIterator.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.testutils; + +import org.apache.flink.runtime.util.ResettableMutableObjectIterator; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; + + +public class CollectionIterator<T> implements ResettableMutableObjectIterator<T> { + + private final Collection<T> collection; + private Iterator<T> iterator; + + public CollectionIterator(Collection<T> collection) { + this.collection = collection; + this.iterator = collection.iterator(); + } + + @Override + public T next(T reuse) throws IOException { + return next(); + } + + @Override + public T next() throws IOException { + if (!iterator.hasNext()) { + return null; + } else { + return iterator.next(); + } + } + + @Override + public void reset() throws IOException { + iterator = collection.iterator(); + } + + public static <T> CollectionIterator<T> of(T... values) { + return new CollectionIterator<T>(Arrays.asList(values)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df9f4819/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java new file mode 100644 index 0000000..539d864 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/Match.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.testutils; + +/** + * Utility class for keeping track of matches in join operator tests. + * + * @see org.apache.flink.runtime.operators.testutils.MatchRemovingMatcher + */ +public class Match { + private final String left; + + private final String right; + + public Match(String left, String right) { + this.left = left; + this.right = right; + } + + @Override + public boolean equals(Object obj) { + Match o = (Match) obj; + if (left == null && o.left == null && right.equals(o.right)) { + return true; + } else if (right == null && o.right == null && left.equals(o.left)) { + return true; + } else { + return this.left.equals(o.left) && this.right.equals(o.right); + } + } + + @Override + public int hashCode() { + if (left == null) { + return right.hashCode(); + } else if (right == null) { + return left.hashCode(); + } else { + return this.left.hashCode() ^ this.right.hashCode(); + } + } + + @Override + public String toString() { + return left + ", " + right; + } +}