Repository: flink
Updated Branches:
  refs/heads/master 1c1562ab7 -> d03dd63b7


[FLINK-1795] [runtime] Add test for duplicate elimination in the solution set.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/923a2ae2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/923a2ae2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/923a2ae2

Branch: refs/heads/master
Commit: 923a2ae259bd72a2d48639ae0e64db0a04a4aa91
Parents: 1c1562a
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Mar 27 17:44:21 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Mar 29 18:34:22 2015 +0200

----------------------------------------------------------------------
 .../iterative/task/IterationHeadPactTask.java   | 36 +++-----
 .../operators/hash/CompactingHashTable.java     | 10 +++
 .../iterative/SolutionSetDuplicatesITCase.java  | 87 ++++++++++++++++++++
 3 files changed, 110 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/923a2ae2/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
index abaf311..cf02bdf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
@@ -85,20 +85,14 @@ public class IterationHeadPactTask<X, Y, S extends 
Function, OT> extends Abstrac
 
        private Collector<X> finalOutputCollector;
 
-       private List<RecordWriter<?>> finalOutputWriters;
-
        private TypeSerializerFactory<Y> feedbackTypeSerializer;
 
        private TypeSerializerFactory<X> solutionTypeSerializer;
 
        private ResultPartitionWriter toSync;
 
-       private int initialSolutionSetInput; // undefined for bulk iterations
-
        private int feedbackDataInput; // workset or bulk partial solution
 
-       private RuntimeAggregatorRegistry aggregatorRegistry;
-
        // 
--------------------------------------------------------------------------------------------
 
        @Override
@@ -115,15 +109,15 @@ public class IterationHeadPactTask<X, Y, S extends 
Function, OT> extends Abstrac
 
                // at this time, the outputs to the step function are created
                // add the outputs for the final solution
-               this.finalOutputWriters = new ArrayList<RecordWriter<?>>();
+               List<RecordWriter<?>> finalOutputWriters = new 
ArrayList<RecordWriter<?>>();
                final TaskConfig finalOutConfig = 
this.config.getIterationHeadFinalOutputConfig();
                final ClassLoader userCodeClassLoader = 
getUserCodeClassLoader();
                this.finalOutputCollector = 
RegularPactTask.getOutputCollector(this, finalOutConfig,
-                       userCodeClassLoader, this.finalOutputWriters, 
config.getNumOutputs(), finalOutConfig.getNumOutputs());
+                       userCodeClassLoader, finalOutputWriters, 
config.getNumOutputs(), finalOutConfig.getNumOutputs());
 
                // sanity check the setup
                final int writersIntoStepFunction = this.eventualOutputs.size();
-               final int writersIntoFinalResult = 
this.finalOutputWriters.size();
+               final int writersIntoFinalResult = finalOutputWriters.size();
                final int syncGateIndex = 
this.config.getIterationHeadIndexOfSyncOutput();
 
                if (writersIntoStepFunction + writersIntoFinalResult != 
syncGateIndex) {
@@ -207,13 +201,12 @@ public class IterationHeadPactTask<X, Y, S extends 
Function, OT> extends Abstrac
                TypeSerializer<BT> solutionTypeSerializer = 
solutionTypeSerializerFactory.getSerializer();
                TypeComparator<BT> solutionTypeComparator = 
solutionTypeComparatorFactory.createComparator();
                
-               JoinHashMap<BT> map = new 
JoinHashMap<BT>(solutionTypeSerializer, solutionTypeComparator);
-               return map;
+               return new JoinHashMap<BT>(solutionTypeSerializer, 
solutionTypeComparator);
        }
        
        private void readInitialSolutionSet(CompactingHashTable<X> solutionSet, 
MutableObjectIterator<X> solutionSetInput) throws IOException {
                solutionSet.open();
-               solutionSet.buildTable(solutionSetInput);
+               solutionSet.buildTableWithUniqueKey(solutionSetInput);
        }
        
        private void readInitialSolutionSet(JoinHashMap<X> solutionSet, 
MutableObjectIterator<X> solutionSetInput) throws IOException {
@@ -255,14 +248,13 @@ public class IterationHeadPactTask<X, Y, S extends 
Function, OT> extends Abstrac
                        SolutionSetUpdateBarrier solutionSetUpdateBarrier = 
null;
 
                        feedbackDataInput = 
config.getIterationHeadPartialSolutionOrWorksetInputIndex();
-                       feedbackTypeSerializer = 
this.<Y>getInputSerializer(feedbackDataInput);
+                       feedbackTypeSerializer = 
this.getInputSerializer(feedbackDataInput);
                        excludeFromReset(feedbackDataInput);
 
+                       int initialSolutionSetInput;
                        if (isWorksetIteration) {
                                initialSolutionSetInput = 
config.getIterationHeadSolutionSetInputIndex();
-                               TypeSerializerFactory<X> 
solutionTypeSerializerFactory = config
-                                               
.getSolutionSetSerializer(getUserCodeClassLoader());
-                               solutionTypeSerializer = 
solutionTypeSerializerFactory;
+                               solutionTypeSerializer = 
config.getSolutionSetSerializer(getUserCodeClassLoader());
 
                                // setup the index for the solution set
                                @SuppressWarnings("unchecked")
@@ -283,10 +275,9 @@ public class IterationHeadPactTask<X, Y, S extends 
Function, OT> extends Abstrac
                                        solutionSetUpdateBarrier = new 
SolutionSetUpdateBarrier();
                                        
SolutionSetUpdateBarrierBroker.instance().handIn(brokerKey, 
solutionSetUpdateBarrier);
                                }
-                       } else {
+                       }
+                       else {
                                // bulk iteration case
-                               initialSolutionSetInput = -1;
-
                                @SuppressWarnings("unchecked")
                                TypeSerializerFactory<X> solSer = 
(TypeSerializerFactory<X>) feedbackTypeSerializer;
                                solutionTypeSerializer = solSer;
@@ -299,7 +290,7 @@ public class IterationHeadPactTask<X, Y, S extends 
Function, OT> extends Abstrac
                        }
 
                        // instantiate all aggregators and register them at the 
iteration global registry
-                       aggregatorRegistry = new 
RuntimeAggregatorRegistry(config.getIterationAggregators
+                       RuntimeAggregatorRegistry aggregatorRegistry = new 
RuntimeAggregatorRegistry(config.getIterationAggregators
                                        (getUserCodeClassLoader()));
                        IterationAggregatorBroker.instance().handIn(brokerKey, 
aggregatorRegistry);
 
@@ -392,7 +383,6 @@ public class IterationHeadPactTask<X, Y, S extends 
Function, OT> extends Abstrac
 
                        if (solutionSet != null) {
                                solutionSet.close();
-                               solutionSet = null;
                        }
                }
        }
@@ -434,8 +424,8 @@ public class IterationHeadPactTask<X, Y, S extends 
Function, OT> extends Abstrac
                        log.debug(formatLogString("Sending end-of-superstep to 
all iteration outputs."));
                }
 
-               for (int outputIndex = 0; outputIndex < 
this.eventualOutputs.size(); outputIndex++) {
-                       
this.eventualOutputs.get(outputIndex).sendEndOfSuperstep();
+               for (RecordWriter<?> eventualOutput : this.eventualOutputs) {
+                       eventualOutput.sendEndOfSuperstep();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/923a2ae2/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
index 7107972..6533e19 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
@@ -337,6 +337,16 @@ public class CompactingHashTable<T> extends 
AbstractMutableHashTable<T>{
                        insert(record);
                }
        }
+
+       public void buildTableWithUniqueKey(final MutableObjectIterator<T> 
input) throws IOException {
+               T record = this.buildSideSerializer.createInstance();
+               T tmp = this.buildSideSerializer.createInstance();
+
+               // go over the complete input and insert every element into the 
hash table
+               while (this.running && ((record = input.next(record)) != null)) 
{
+                       insertOrReplaceRecord(record, tmp);
+               }
+       }
        
        public final void insert(T record) throws IOException {
                if(this.closed.get()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/923a2ae2/flink-tests/src/test/java/org/apache/flink/test/iterative/SolutionSetDuplicatesITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/SolutionSetDuplicatesITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/SolutionSetDuplicatesITCase.java
new file mode 100644
index 0000000..c987dfd
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/SolutionSetDuplicatesITCase.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.iterative;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@SuppressWarnings("serial")
+@RunWith(Parameterized.class)
+public class SolutionSetDuplicatesITCase extends MultipleProgramsTestBase {
+
+       public SolutionSetDuplicatesITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       @Test
+       public void testProgram() {
+               try {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                       DataSet<Tuple2<Long, Long>> data = env
+                               .generateSequence(0, 10)
+                               .flatMap(new FlatMapFunction<Long, Tuple2<Long, 
Long>>() {
+                                       @Override
+                                       public void flatMap(Long value, 
Collector<Tuple2<Long, Long>> out) {
+                                               out.collect(new Tuple2<Long, 
Long>(value, value));
+                                               out.collect(new Tuple2<Long, 
Long>(value, value));
+                                               out.collect(new Tuple2<Long, 
Long>(value, value));
+                                       }
+                               })
+                               .rebalance();
+
+                       DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> 
iter = data.iterateDelta(data, 10, 0);
+
+                       List<Integer> result = iter
+                                       .closeWith(iter.getWorkset(), 
iter.getWorkset())
+                                       .map(new MapFunction<Tuple2<Long,Long>, 
Integer>() {
+                                               @Override
+                                               public Integer map(Tuple2<Long, 
Long> value) {
+                                                       return 
value.f0.intValue();
+                                               }
+                                       })
+                                       .collect();
+
+                       assertEquals(11, result.size());
+
+                       Collections.sort(result);
+                       assertEquals(Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 
9, 10), result);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

Reply via email to