Repository: flink Updated Branches: refs/heads/master 1c1562ab7 -> d03dd63b7
[FLINK-1795] [runtime] Add test for duplicate elimination in the solution set. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/923a2ae2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/923a2ae2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/923a2ae2 Branch: refs/heads/master Commit: 923a2ae259bd72a2d48639ae0e64db0a04a4aa91 Parents: 1c1562a Author: Stephan Ewen <se...@apache.org> Authored: Fri Mar 27 17:44:21 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Sun Mar 29 18:34:22 2015 +0200 ---------------------------------------------------------------------- .../iterative/task/IterationHeadPactTask.java | 36 +++----- .../operators/hash/CompactingHashTable.java | 10 +++ .../iterative/SolutionSetDuplicatesITCase.java | 87 ++++++++++++++++++++ 3 files changed, 110 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/923a2ae2/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java index abaf311..cf02bdf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java @@ -85,20 +85,14 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac private Collector<X> finalOutputCollector; - private List<RecordWriter<?>> finalOutputWriters; - private TypeSerializerFactory<Y> feedbackTypeSerializer; private TypeSerializerFactory<X> solutionTypeSerializer; private ResultPartitionWriter toSync; - private int initialSolutionSetInput; // undefined for bulk iterations - private int feedbackDataInput; // workset or bulk partial solution - private RuntimeAggregatorRegistry aggregatorRegistry; - // -------------------------------------------------------------------------------------------- @Override @@ -115,15 +109,15 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac // at this time, the outputs to the step function are created // add the outputs for the final solution - this.finalOutputWriters = new ArrayList<RecordWriter<?>>(); + List<RecordWriter<?>> finalOutputWriters = new ArrayList<RecordWriter<?>>(); final TaskConfig finalOutConfig = this.config.getIterationHeadFinalOutputConfig(); final ClassLoader userCodeClassLoader = getUserCodeClassLoader(); this.finalOutputCollector = RegularPactTask.getOutputCollector(this, finalOutConfig, - userCodeClassLoader, this.finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs()); + userCodeClassLoader, finalOutputWriters, config.getNumOutputs(), finalOutConfig.getNumOutputs()); // sanity check the setup final int writersIntoStepFunction = this.eventualOutputs.size(); - final int writersIntoFinalResult = this.finalOutputWriters.size(); + final int writersIntoFinalResult = finalOutputWriters.size(); final int syncGateIndex = this.config.getIterationHeadIndexOfSyncOutput(); if (writersIntoStepFunction + writersIntoFinalResult != syncGateIndex) { @@ -207,13 +201,12 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac TypeSerializer<BT> solutionTypeSerializer = solutionTypeSerializerFactory.getSerializer(); TypeComparator<BT> solutionTypeComparator = solutionTypeComparatorFactory.createComparator(); - JoinHashMap<BT> map = new JoinHashMap<BT>(solutionTypeSerializer, solutionTypeComparator); - return map; + return new JoinHashMap<BT>(solutionTypeSerializer, solutionTypeComparator); } private void readInitialSolutionSet(CompactingHashTable<X> solutionSet, MutableObjectIterator<X> solutionSetInput) throws IOException { solutionSet.open(); - solutionSet.buildTable(solutionSetInput); + solutionSet.buildTableWithUniqueKey(solutionSetInput); } private void readInitialSolutionSet(JoinHashMap<X> solutionSet, MutableObjectIterator<X> solutionSetInput) throws IOException { @@ -255,14 +248,13 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac SolutionSetUpdateBarrier solutionSetUpdateBarrier = null; feedbackDataInput = config.getIterationHeadPartialSolutionOrWorksetInputIndex(); - feedbackTypeSerializer = this.<Y>getInputSerializer(feedbackDataInput); + feedbackTypeSerializer = this.getInputSerializer(feedbackDataInput); excludeFromReset(feedbackDataInput); + int initialSolutionSetInput; if (isWorksetIteration) { initialSolutionSetInput = config.getIterationHeadSolutionSetInputIndex(); - TypeSerializerFactory<X> solutionTypeSerializerFactory = config - .getSolutionSetSerializer(getUserCodeClassLoader()); - solutionTypeSerializer = solutionTypeSerializerFactory; + solutionTypeSerializer = config.getSolutionSetSerializer(getUserCodeClassLoader()); // setup the index for the solution set @SuppressWarnings("unchecked") @@ -283,10 +275,9 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac solutionSetUpdateBarrier = new SolutionSetUpdateBarrier(); SolutionSetUpdateBarrierBroker.instance().handIn(brokerKey, solutionSetUpdateBarrier); } - } else { + } + else { // bulk iteration case - initialSolutionSetInput = -1; - @SuppressWarnings("unchecked") TypeSerializerFactory<X> solSer = (TypeSerializerFactory<X>) feedbackTypeSerializer; solutionTypeSerializer = solSer; @@ -299,7 +290,7 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac } // instantiate all aggregators and register them at the iteration global registry - aggregatorRegistry = new RuntimeAggregatorRegistry(config.getIterationAggregators + RuntimeAggregatorRegistry aggregatorRegistry = new RuntimeAggregatorRegistry(config.getIterationAggregators (getUserCodeClassLoader())); IterationAggregatorBroker.instance().handIn(brokerKey, aggregatorRegistry); @@ -392,7 +383,6 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac if (solutionSet != null) { solutionSet.close(); - solutionSet = null; } } } @@ -434,8 +424,8 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac log.debug(formatLogString("Sending end-of-superstep to all iteration outputs.")); } - for (int outputIndex = 0; outputIndex < this.eventualOutputs.size(); outputIndex++) { - this.eventualOutputs.get(outputIndex).sendEndOfSuperstep(); + for (RecordWriter<?> eventualOutput : this.eventualOutputs) { + eventualOutput.sendEndOfSuperstep(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/923a2ae2/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java index 7107972..6533e19 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java @@ -337,6 +337,16 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{ insert(record); } } + + public void buildTableWithUniqueKey(final MutableObjectIterator<T> input) throws IOException { + T record = this.buildSideSerializer.createInstance(); + T tmp = this.buildSideSerializer.createInstance(); + + // go over the complete input and insert every element into the hash table + while (this.running && ((record = input.next(record)) != null)) { + insertOrReplaceRecord(record, tmp); + } + } public final void insert(T record) throws IOException { if(this.closed.get()) { http://git-wip-us.apache.org/repos/asf/flink/blob/923a2ae2/flink-tests/src/test/java/org/apache/flink/test/iterative/SolutionSetDuplicatesITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/SolutionSetDuplicatesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/SolutionSetDuplicatesITCase.java new file mode 100644 index 0000000..c987dfd --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/SolutionSetDuplicatesITCase.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.iterative; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.DeltaIteration; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.util.Collector; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +@SuppressWarnings("serial") +@RunWith(Parameterized.class) +public class SolutionSetDuplicatesITCase extends MultipleProgramsTestBase { + + public SolutionSetDuplicatesITCase(TestExecutionMode mode) { + super(mode); + } + + @Test + public void testProgram() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<Long, Long>> data = env + .generateSequence(0, 10) + .flatMap(new FlatMapFunction<Long, Tuple2<Long, Long>>() { + @Override + public void flatMap(Long value, Collector<Tuple2<Long, Long>> out) { + out.collect(new Tuple2<Long, Long>(value, value)); + out.collect(new Tuple2<Long, Long>(value, value)); + out.collect(new Tuple2<Long, Long>(value, value)); + } + }) + .rebalance(); + + DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iter = data.iterateDelta(data, 10, 0); + + List<Integer> result = iter + .closeWith(iter.getWorkset(), iter.getWorkset()) + .map(new MapFunction<Tuple2<Long,Long>, Integer>() { + @Override + public Integer map(Tuple2<Long, Long> value) { + return value.f0.intValue(); + } + }) + .collect(); + + assertEquals(11, result.size()); + + Collections.sort(result); + assertEquals(Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10), result); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +}