Repository: flink Updated Branches: refs/heads/master 7ff071f66 -> bbb75c599
http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java deleted file mode 100644 index 6c4659d..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java +++ /dev/null @@ -1,1019 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.operators; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.FlatJoinFunction; -import org.apache.flink.api.common.typeutils.record.RecordComparator; -import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory; -import org.apache.flink.api.java.record.functions.JoinFunction; -import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator; -import org.apache.flink.runtime.operators.testutils.DriverTestBase; -import org.apache.flink.runtime.operators.testutils.ExpectedTestException; -import org.apache.flink.runtime.operators.testutils.NirvanaOutputList; -import org.apache.flink.runtime.operators.testutils.TaskCancelThread; -import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.Key; -import org.apache.flink.types.Record; -import org.apache.flink.util.Collector; - -import org.junit.Assert; -import org.junit.Test; - -import static org.junit.Assert.*; - -@SuppressWarnings("deprecation") -public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Record, Record>> { - - private static final long HASH_MEM = 6*1024*1024; - - private static final long SORT_MEM = 3*1024*1024; - - private static final int NUM_SORTER = 2; - - private static final long BNLJN_MEM = 10 * PAGE_SIZE; - - private final double bnljn_frac; - - private final double hash_frac; - - @SuppressWarnings("unchecked") - private final RecordComparator comparator1 = new RecordComparator( - new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ IntValue.class }); - - @SuppressWarnings("unchecked") - private final RecordComparator comparator2 = new RecordComparator( - new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ IntValue.class }); - - private final List<Record> outList = new ArrayList<Record>(); - - - public MatchTaskTest(ExecutionConfig config) { - super(config, HASH_MEM, NUM_SORTER, SORT_MEM); - bnljn_frac = (double)BNLJN_MEM/this.getMemoryManager().getMemorySize(); - hash_frac = (double)HASH_MEM/this.getMemoryManager().getMemorySize(); - } - - - @Test - public void testSortBoth1MatchTask() { - final int keyCnt1 = 20; - final int valCnt1 = 1; - - final int keyCnt2 = 10; - final int valCnt2 = 2; - - setOutput(this.outList); - addDriverComparator(this.comparator1); - addDriverComparator(this.comparator2); - getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); - getTaskConfig().setRelativeMemoryDriver(bnljn_frac); - setNumFileHandlesForSort(4); - - final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>(); - - try { - addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate()); - addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate()); - testDriver(testTask, MockMatchStub.class); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("The test caused an exception."); - } - - final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2); - Assert.assertTrue("Resultset size was " + this.outList.size() + ". Expected was " + expCnt, this.outList.size() == expCnt); - - this.outList.clear(); - } - - @Test - public void testSortBoth2MatchTask() { - - int keyCnt1 = 20; - int valCnt1 = 1; - - int keyCnt2 = 20; - int valCnt2 = 1; - - setOutput(this.outList); - addDriverComparator(this.comparator1); - addDriverComparator(this.comparator2); - getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); - getTaskConfig().setRelativeMemoryDriver(bnljn_frac); - setNumFileHandlesForSort(4); - - final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>(); - - try { - addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate()); - addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate()); - testDriver(testTask, MockMatchStub.class); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("The test caused an exception."); - } - - int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2); - - Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt); - - this.outList.clear(); - - } - - @Test - public void testSortBoth3MatchTask() { - - int keyCnt1 = 20; - int valCnt1 = 1; - - int keyCnt2 = 20; - int valCnt2 = 20; - - setOutput(this.outList); - addDriverComparator(this.comparator1); - addDriverComparator(this.comparator2); - getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); - getTaskConfig().setRelativeMemoryDriver(bnljn_frac); - setNumFileHandlesForSort(4); - - final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>(); - - try { - addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate()); - addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate()); - testDriver(testTask, MockMatchStub.class); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("The test caused an exception."); - } - - int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2); - - Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt); - - this.outList.clear(); - - } - - @Test - public void testSortBoth4MatchTask() { - - int keyCnt1 = 20; - int valCnt1 = 20; - - int keyCnt2 = 20; - int valCnt2 = 1; - - setOutput(this.outList); - addDriverComparator(this.comparator1); - addDriverComparator(this.comparator2); - getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); - getTaskConfig().setRelativeMemoryDriver(bnljn_frac); - setNumFileHandlesForSort(4); - - final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>(); - - try { - addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate()); - addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate()); - testDriver(testTask, MockMatchStub.class); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("The test caused an exception."); - } - - int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2); - - Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt); - - this.outList.clear(); - - } - - @Test - public void testSortBoth5MatchTask() { - - int keyCnt1 = 20; - int valCnt1 = 20; - - int keyCnt2 = 20; - int valCnt2 = 20; - - setOutput(this.outList); - addDriverComparator(this.comparator1); - addDriverComparator(this.comparator2); - getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); - getTaskConfig().setRelativeMemoryDriver(bnljn_frac); - setNumFileHandlesForSort(4); - - final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>(); - - try { - addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate()); - addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate()); - testDriver(testTask, MockMatchStub.class); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("The test caused an exception."); - } - - int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2); - - Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt); - - this.outList.clear(); - - } - - @Test - public void testSortFirstMatchTask() { - - int keyCnt1 = 20; - int valCnt1 = 20; - - int keyCnt2 = 20; - int valCnt2 = 20; - - setOutput(this.outList); - addDriverComparator(this.comparator1); - addDriverComparator(this.comparator2); - getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); - getTaskConfig().setRelativeMemoryDriver(bnljn_frac); - setNumFileHandlesForSort(4); - - final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>(); - - try { - addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate()); - addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true)); - testDriver(testTask, MockMatchStub.class); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("The test caused an exception."); - } - - int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2); - - Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt); - - this.outList.clear(); - - } - - @Test - public void testSortSecondMatchTask() { - - int keyCnt1 = 20; - int valCnt1 = 20; - - int keyCnt2 = 20; - int valCnt2 = 20; - - setOutput(this.outList); - addDriverComparator(this.comparator1); - addDriverComparator(this.comparator2); - getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); - getTaskConfig().setRelativeMemoryDriver(bnljn_frac); - setNumFileHandlesForSort(4); - - final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>(); - - try { - addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true)); - addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate()); - testDriver(testTask, MockMatchStub.class); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("The test caused an exception."); - } - - int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2); - - Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt); - - this.outList.clear(); - - } - - @Test - public void testMergeMatchTask() { - int keyCnt1 = 20; - int valCnt1 = 20; - - int keyCnt2 = 20; - int valCnt2 = 20; - - setOutput(this.outList); - addDriverComparator(this.comparator1); - addDriverComparator(this.comparator2); - getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); - getTaskConfig().setRelativeMemoryDriver(bnljn_frac); - setNumFileHandlesForSort(4); - - final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>(); - - addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true)); - addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true)); - - try { - testDriver(testTask, MockMatchStub.class); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("The test caused an exception."); - } - - int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2); - - Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt); - - this.outList.clear(); - - } - - @Test - public void testFailingMatchTask() { - int keyCnt1 = 20; - int valCnt1 = 20; - - int keyCnt2 = 20; - int valCnt2 = 20; - - setOutput(new NirvanaOutputList()); - addDriverComparator(this.comparator1); - addDriverComparator(this.comparator2); - getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); - getTaskConfig().setRelativeMemoryDriver(bnljn_frac); - setNumFileHandlesForSort(4); - - final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>(); - - addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true)); - addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true)); - - try { - testDriver(testTask, MockFailingMatchStub.class); - Assert.fail("Driver did not forward Exception."); - } catch (ExpectedTestException e) { - // good! - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("The test caused an exception."); - } - } - - @Test - public void testCancelMatchTaskWhileSort1() { - final int keyCnt = 20; - final int valCnt = 20; - - try { - setOutput(new NirvanaOutputList()); - addDriverComparator(this.comparator1); - addDriverComparator(this.comparator2); - getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); - getTaskConfig().setRelativeMemoryDriver(bnljn_frac); - setNumFileHandlesForSort(4); - - final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>(); - - try { - addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.duplicate()); - addInput(new UniformRecordGenerator(keyCnt, valCnt, true)); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("The test caused an exception."); - } - - final AtomicReference<Throwable> error = new AtomicReference<>(); - - Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileSort1()") { - @Override - public void run() { - try { - testDriver(testTask, MockMatchStub.class); - } - catch (Throwable t) { - error.set(t); - } - } - }; - taskRunner.start(); - - Thread.sleep(1000); - - cancel(); - taskRunner.interrupt(); - - taskRunner.join(60000); - - assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive()); - - Throwable taskError = error.get(); - if (taskError != null) { - taskError.printStackTrace(); - fail("Error in task while canceling: " + taskError.getMessage()); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testCancelMatchTaskWhileSort2() { - final int keyCnt = 20; - final int valCnt = 20; - - try { - setOutput(new NirvanaOutputList()); - addDriverComparator(this.comparator1); - addDriverComparator(this.comparator2); - getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); - getTaskConfig().setRelativeMemoryDriver(bnljn_frac); - setNumFileHandlesForSort(4); - - final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>(); - - try { - addInput(new UniformRecordGenerator(keyCnt, valCnt, true)); - addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.duplicate()); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("The test caused an exception."); - } - - final AtomicReference<Throwable> error = new AtomicReference<>(); - - Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileSort2()") { - @Override - public void run() { - try { - testDriver(testTask, MockMatchStub.class); - } - catch (Throwable t) { - error.set(t); - } - } - }; - taskRunner.start(); - - Thread.sleep(1000); - - cancel(); - taskRunner.interrupt(); - - taskRunner.join(60000); - - assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive()); - - Throwable taskError = error.get(); - if (taskError != null) { - taskError.printStackTrace(); - fail("Error in task while canceling: " + taskError.getMessage()); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testCancelMatchTaskWhileMatching() { - final int keyCnt = 20; - final int valCnt = 20; - - try { - setOutput(new NirvanaOutputList()); - addDriverComparator(this.comparator1); - addDriverComparator(this.comparator2); - getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE); - getTaskConfig().setRelativeMemoryDriver(bnljn_frac); - setNumFileHandlesForSort(4); - - final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>(); - - addInput(new UniformRecordGenerator(keyCnt, valCnt, true)); - addInput(new UniformRecordGenerator(keyCnt, valCnt, true)); - - final AtomicReference<Throwable> error = new AtomicReference<>(); - - Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileMatching()") { - @Override - public void run() { - try { - testDriver(testTask, MockDelayingMatchStub.class); - } - catch (Throwable t) { - error.set(t); - } - } - }; - taskRunner.start(); - - Thread.sleep(1000); - - cancel(); - taskRunner.interrupt(); - - taskRunner.join(60000); - - assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive()); - - Throwable taskError = error.get(); - if (taskError != null) { - taskError.printStackTrace(); - fail("Error in task while canceling: " + taskError.getMessage()); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testHash1MatchTask() { - int keyCnt1 = 20; - int valCnt1 = 1; - - int keyCnt2 = 10; - int valCnt2 = 2; - - addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false)); - addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false)); - addDriverComparator(this.comparator1); - addDriverComparator(this.comparator2); - getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - setOutput(this.outList); - getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST); - getTaskConfig().setRelativeMemoryDriver(hash_frac); - - JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>(); - - try { - testDriver(testTask, MockMatchStub.class); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("Test caused an exception."); - } - - final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2); - Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size()); - this.outList.clear(); - } - - @Test - public void testHash2MatchTask() { - int keyCnt1 = 20; - int valCnt1 = 1; - - int keyCnt2 = 20; - int valCnt2 = 1; - - addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false)); - addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false)); - addDriverComparator(this.comparator1); - addDriverComparator(this.comparator2); - getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - setOutput(this.outList); - getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND); - getTaskConfig().setRelativeMemoryDriver(hash_frac); - - JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>(); - - try { - testDriver(testTask, MockMatchStub.class); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("Test caused an exception."); - } - - final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2); - Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size()); - this.outList.clear(); - } - - @Test - public void testHash3MatchTask() { - int keyCnt1 = 20; - int valCnt1 = 1; - - int keyCnt2 = 20; - int valCnt2 = 20; - - addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false)); - addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false)); - addDriverComparator(this.comparator1); - addDriverComparator(this.comparator2); - getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - setOutput(this.outList); - getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST); - getTaskConfig().setRelativeMemoryDriver(hash_frac); - - JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>(); - - try { - testDriver(testTask, MockMatchStub.class); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("Test caused an exception."); - } - - final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2); - Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size()); - this.outList.clear(); - } - - @Test - public void testHash4MatchTask() { - int keyCnt1 = 20; - int valCnt1 = 20; - - int keyCnt2 = 20; - int valCnt2 = 1; - - addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false)); - addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false)); - addDriverComparator(this.comparator1); - addDriverComparator(this.comparator2); - getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - setOutput(this.outList); - getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND); - getTaskConfig().setRelativeMemoryDriver(hash_frac); - - JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>(); - - try { - testDriver(testTask, MockMatchStub.class); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("Test caused an exception."); - } - - final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2); - Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size()); - this.outList.clear(); - } - - @Test - public void testHash5MatchTask() { - int keyCnt1 = 20; - int valCnt1 = 20; - - int keyCnt2 = 20; - int valCnt2 = 20; - - addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false)); - addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false)); - addDriverComparator(this.comparator1); - addDriverComparator(this.comparator2); - getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - setOutput(this.outList); - getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST); - getTaskConfig().setRelativeMemoryDriver(hash_frac); - - JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>(); - - try { - testDriver(testTask, MockMatchStub.class); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("Test caused an exception."); - } - - final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2); - Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size()); - this.outList.clear(); - } - - @Test - public void testFailingHashFirstMatchTask() { - int keyCnt1 = 20; - int valCnt1 = 20; - - int keyCnt2 = 20; - int valCnt2 = 20; - - addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false)); - addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false)); - addDriverComparator(this.comparator1); - addDriverComparator(this.comparator2); - getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - setOutput(new NirvanaOutputList()); - getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST); - getTaskConfig().setRelativeMemoryDriver(hash_frac); - - JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>(); - - try { - testDriver(testTask, MockFailingMatchStub.class); - Assert.fail("Function exception was not forwarded."); - } catch (ExpectedTestException etex) { - // good! - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("Test caused an exception."); - } - } - - @Test - public void testFailingHashSecondMatchTask() { - int keyCnt1 = 20; - int valCnt1 = 20; - - int keyCnt2 = 20; - int valCnt2 = 20; - - addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false)); - addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false)); - addDriverComparator(this.comparator1); - addDriverComparator(this.comparator2); - getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - setOutput(new NirvanaOutputList()); - getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND); - getTaskConfig().setRelativeMemoryDriver(hash_frac); - - JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>(); - - try { - testDriver(testTask, MockFailingMatchStub.class); - Assert.fail("Function exception was not forwarded."); - } catch (ExpectedTestException etex) { - // good! - } catch (Exception e) { - e.printStackTrace(); - Assert.fail("Test caused an exception."); - } - } - - @Test - public void testCancelHashMatchTaskWhileBuildFirst() { - final int keyCnt = 20; - final int valCnt = 20; - - try { - addInput(new DelayingInfinitiveInputIterator(100)); - addInput(new UniformRecordGenerator(keyCnt, valCnt, false)); - - addDriverComparator(this.comparator1); - addDriverComparator(this.comparator2); - - getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - - setOutput(new NirvanaOutputList()); - - getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST); - getTaskConfig().setRelativeMemoryDriver(hash_frac); - - final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>(); - - final AtomicBoolean success = new AtomicBoolean(false); - - Thread taskRunner = new Thread() { - @Override - public void run() { - try { - testDriver(testTask, MockMatchStub.class); - success.set(true); - } catch (Exception ie) { - ie.printStackTrace(); - } - } - }; - taskRunner.start(); - - Thread.sleep(1000); - cancel(); - - try { - taskRunner.join(); - } - catch (InterruptedException ie) { - Assert.fail("Joining threads failed"); - } - - Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get()); - } - catch (Exception e) { - e.printStackTrace(); - Assert.fail(e.getMessage()); - } - } - - @Test - public void testHashCancelMatchTaskWhileBuildSecond() { - final int keyCnt = 20; - final int valCnt = 20; - - try { - addInput(new UniformRecordGenerator(keyCnt, valCnt, false)); - addInput(new DelayingInfinitiveInputIterator(100)); - - addDriverComparator(this.comparator1); - addDriverComparator(this.comparator2); - - getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - - setOutput(new NirvanaOutputList()); - - getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND); - getTaskConfig().setRelativeMemoryDriver(hash_frac); - - final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>(); - - final AtomicBoolean success = new AtomicBoolean(false); - - Thread taskRunner = new Thread() { - @Override - public void run() { - try { - testDriver(testTask, MockMatchStub.class); - success.set(true); - } catch (Exception ie) { - ie.printStackTrace(); - } - } - }; - taskRunner.start(); - - Thread.sleep(1000); - cancel(); - - try { - taskRunner.join(); - } - catch (InterruptedException ie) { - Assert.fail("Joining threads failed"); - } - - Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get()); - } - catch (Exception e) { - e.printStackTrace(); - Assert.fail(e.getMessage()); - } - } - - @Test - public void testHashFirstCancelMatchTaskWhileMatching() { - int keyCnt = 20; - int valCnt = 20; - - addInput(new UniformRecordGenerator(keyCnt, valCnt, false)); - addInput(new UniformRecordGenerator(keyCnt, valCnt, false)); - addDriverComparator(this.comparator1); - addDriverComparator(this.comparator2); - getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - setOutput(new NirvanaOutputList()); - getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST); - getTaskConfig().setRelativeMemoryDriver(hash_frac); - - final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>(); - - final AtomicBoolean success = new AtomicBoolean(false); - - Thread taskRunner = new Thread() { - @Override - public void run() { - try { - testDriver(testTask, MockMatchStub.class); - success.set(true); - } catch (Exception ie) { - ie.printStackTrace(); - } - } - }; - taskRunner.start(); - - TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this); - tct.start(); - - try { - tct.join(); - taskRunner.join(); - } catch(InterruptedException ie) { - Assert.fail("Joining threads failed"); - } - - Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get()); - } - - @Test - public void testHashSecondCancelMatchTaskWhileMatching() { - int keyCnt = 20; - int valCnt = 20; - - addInput(new UniformRecordGenerator(keyCnt, valCnt, false)); - addInput(new UniformRecordGenerator(keyCnt, valCnt, false)); - addDriverComparator(this.comparator1); - addDriverComparator(this.comparator2); - getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get()); - setOutput(new NirvanaOutputList()); - getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND); - getTaskConfig().setRelativeMemoryDriver(hash_frac); - - final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>(); - - final AtomicBoolean success = new AtomicBoolean(false); - - Thread taskRunner = new Thread() { - @Override - public void run() { - try { - testDriver(testTask, MockMatchStub.class); - success.set(true); - } catch (Exception ie) { - ie.printStackTrace(); - } - } - }; - taskRunner.start(); - - TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this); - tct.start(); - - try { - tct.join(); - taskRunner.join(); - } catch(InterruptedException ie) { - Assert.fail("Joining threads failed"); - } - - Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get()); - } - - // ================================================================================================= - - public static final class MockMatchStub extends JoinFunction { - private static final long serialVersionUID = 1L; - - @Override - public void join(Record record1, Record record2, Collector<Record> out) throws Exception { - out.collect(record1); - } - } - - public static final class MockFailingMatchStub extends JoinFunction { - private static final long serialVersionUID = 1L; - - private int cnt = 0; - - @Override - public void join(Record record1, Record record2, Collector<Record> out) throws Exception { - if (++this.cnt >= 10) { - throw new ExpectedTestException(); - } - out.collect(record1); - } - } - - public static final class MockDelayingMatchStub extends JoinFunction { - private static final long serialVersionUID = 1L; - - @Override - public void join(Record record1, Record record2, Collector<Record> out) throws Exception { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java index f59c4a3..415b6bc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java @@ -26,9 +26,9 @@ import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.functions.RichGroupReduceFunction; +import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable; import org.apache.flink.api.common.typeutils.record.RecordComparator; import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory; -import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable; import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger; import org.apache.flink.runtime.operators.testutils.DriverTestBase; import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator; @@ -46,7 +46,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc private final RecordComparator comparator = new RecordComparator( new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class }); - private final List<Record> outList = new ArrayList<Record>(); + private final List<Record> outList = new ArrayList<>(); public ReduceTaskExternalITCase(ExecutionConfig config) { @@ -68,7 +68,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc try { addInputSorted(new UniformRecordGenerator(keyCnt, valCnt, false), this.comparator.duplicate()); - GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>(); + GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>(); testDriver(testTask, MockReduceStub.class); } catch (Exception e) { @@ -100,7 +100,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc try { addInputSorted(new UniformRecordGenerator(keyCnt, valCnt, false), this.comparator.duplicate()); - GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>(); + GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>(); testDriver(testTask, MockReduceStub.class); } catch (Exception e) { @@ -130,14 +130,14 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc CombiningUnilateralSortMerger<Record> sorter = null; try { - sorter = new CombiningUnilateralSortMerger<Record>(new MockCombiningReduceStub(), + sorter = new CombiningUnilateralSortMerger<>(new MockCombiningReduceStub(), getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false), getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(), this.perSortFractionMem, 2, 0.8f, true); addInput(sorter.getIterator()); - GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>(); + GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>(); testDriver(testTask, MockCombiningReduceStub.class); } catch (Exception e) { @@ -176,14 +176,14 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc CombiningUnilateralSortMerger<Record> sorter = null; try { - sorter = new CombiningUnilateralSortMerger<Record>(new MockCombiningReduceStub(), + sorter = new CombiningUnilateralSortMerger<>(new MockCombiningReduceStub(), getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false), getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(), this.perSortFractionMem, 2, 0.8f, false); addInput(sorter.getIterator()); - GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>(); + GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>(); testDriver(testTask, MockCombiningReduceStub.class); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java index cc25c99..8bc7fe5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java @@ -27,9 +27,9 @@ import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.functions.RichGroupReduceFunction; +import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable; import org.apache.flink.api.common.typeutils.record.RecordComparator; import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory; -import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable; import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger; import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator; import org.apache.flink.runtime.operators.testutils.DriverTestBase; @@ -37,6 +37,7 @@ import org.apache.flink.runtime.operators.testutils.ExpectedTestException; import org.apache.flink.runtime.operators.testutils.NirvanaOutputList; import org.apache.flink.runtime.operators.testutils.TaskCancelThread; import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator; + import org.apache.flink.types.IntValue; import org.apache.flink.types.Key; import org.apache.flink.types.Record; @@ -51,7 +52,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor private final RecordComparator comparator = new RecordComparator( new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class }); - private final List<Record> outList = new ArrayList<Record>(); + private final List<Record> outList = new ArrayList<>(); public ReduceTaskTest(ExecutionConfig config) { super(config, 0, 1, 3*1024*1024); @@ -69,7 +70,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor try { addInputSorted(new UniformRecordGenerator(keyCnt, valCnt, false), this.comparator.duplicate()); - GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>(); + GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>(); testDriver(testTask, MockReduceStub.class); } catch (Exception e) { @@ -96,7 +97,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor setOutput(this.outList); getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE); - GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>(); + GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>(); try { testDriver(testTask, MockReduceStub.class); @@ -125,13 +126,13 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor CombiningUnilateralSortMerger<Record> sorter = null; try { - sorter = new CombiningUnilateralSortMerger<Record>(new MockCombiningReduceStub(), + sorter = new CombiningUnilateralSortMerger<>(new MockCombiningReduceStub(), getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false), getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(), this.perSortFractionMem, 4, 0.8f, true); addInput(sorter.getIterator()); - GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>(); + GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>(); testDriver(testTask, MockCombiningReduceStub.class); } catch (Exception e) { @@ -168,7 +169,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor setOutput(this.outList); getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE); - GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>(); + GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>(); try { testDriver(testTask, MockFailingReduceStub.class); @@ -190,7 +191,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor setOutput(new NirvanaOutputList()); getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE); - final GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>(); + final GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>(); try { addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator.duplicate()); @@ -238,7 +239,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor setOutput(new NirvanaOutputList()); getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE); - final GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>(); + final GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>(); final AtomicBoolean success = new AtomicBoolean(false); http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java index 1f19699..542812c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java @@ -21,17 +21,17 @@ package org.apache.flink.runtime.operators.chaining; import java.util.ArrayList; import java.util.List; -import org.apache.flink.api.common.functions.GenericCollectorMap; +import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory; import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory; import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; -import org.apache.flink.runtime.operators.CollectorMapDriver; import org.apache.flink.runtime.operators.DriverStrategy; import org.apache.flink.runtime.operators.BatchTask; -import org.apache.flink.runtime.operators.MapTaskTest.MockMapStub; +import org.apache.flink.runtime.operators.FlatMapDriver; +import org.apache.flink.runtime.operators.FlatMapTaskTest.MockMapStub; import org.apache.flink.runtime.operators.ReduceTaskTest.MockReduceStub; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.runtime.operators.testutils.TaskTestBase; @@ -49,14 +49,13 @@ import org.powermock.modules.junit4.PowerMockRunner; @RunWith(PowerMockRunner.class) @PrepareForTest({Task.class, ResultPartitionWriter.class}) -@SuppressWarnings("deprecation") public class ChainTaskTest extends TaskTestBase { private static final int MEMORY_MANAGER_SIZE = 1024 * 1024 * 3; private static final int NETWORK_BUFFER_SIZE = 1024; - private final List<Record> outList = new ArrayList<Record>(); + private final List<Record> outList = new ArrayList<>(); @SuppressWarnings("unchecked") private final RecordComparatorFactory compFact = new RecordComparatorFactory(new int[]{0}, new Class[]{IntValue.class}, new boolean[] {true}); @@ -95,16 +94,16 @@ public class ChainTaskTest extends TaskTestBase { combineConfig.setRelativeMemoryDriver(memoryFraction); // udf - combineConfig.setStubWrapper(new UserCodeClassWrapper<MockReduceStub>(MockReduceStub.class)); + combineConfig.setStubWrapper(new UserCodeClassWrapper<>(MockReduceStub.class)); getTaskConfig().addChainedTask(SynchronousChainedCombineDriver.class, combineConfig, "combine"); } // chained map+combine { - BatchTask<GenericCollectorMap<Record, Record>, Record> testTask = - new BatchTask<GenericCollectorMap<Record, Record>, Record>(); - registerTask(testTask, CollectorMapDriver.class, MockMapStub.class); + BatchTask<FlatMapFunction<Record, Record>, Record> testTask = + new BatchTask<>(); + registerTask(testTask, FlatMapDriver.class, MockMapStub.class); try { testTask.invoke(); @@ -156,17 +155,17 @@ public class ChainTaskTest extends TaskTestBase { combineConfig.setRelativeMemoryDriver(memoryFraction); // udf - combineConfig.setStubWrapper(new UserCodeClassWrapper<MockFailingCombineStub>(MockFailingCombineStub.class)); + combineConfig.setStubWrapper(new UserCodeClassWrapper<>(MockFailingCombineStub.class)); getTaskConfig().addChainedTask(SynchronousChainedCombineDriver.class, combineConfig, "combine"); } // chained map+combine { - final BatchTask<GenericCollectorMap<Record, Record>, Record> testTask = - new BatchTask<GenericCollectorMap<Record, Record>, Record>(); + final BatchTask<FlatMapFunction<Record, Record>, Record> testTask = + new BatchTask<>(); - super.registerTask(testTask, CollectorMapDriver.class, MockMapStub.class); + super.registerTask(testTask, FlatMapDriver.class, MockMapStub.class); boolean stubFailed = false; http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java index 777bfc8..63f54ea 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java @@ -19,11 +19,11 @@ package org.apache.flink.runtime.operators.testutils; import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.io.DelimitedInputFormat; +import org.apache.flink.api.common.io.FileOutputFormat; import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory; -import org.apache.flink.api.java.record.io.DelimitedInputFormat; -import org.apache.flink.api.java.record.io.FileOutputFormat; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.core.fs.Path; @@ -94,7 +94,7 @@ public abstract class TaskTestBase extends TestLogger { final TaskConfig config = new TaskConfig(this.mockEnv.getTaskConfiguration()); config.setDriver(driver); - config.setStubWrapper(new UserCodeClassWrapper<RichFunction>(stubClass)); + config.setStubWrapper(new UserCodeClassWrapper<>(stubClass)); task.setEnvironment(this.mockEnv); @@ -116,17 +116,17 @@ public abstract class TaskTestBase extends TestLogger { } } - public void registerFileOutputTask(AbstractInvokable outTask, Class<? extends FileOutputFormat> stubClass, String outPath) { + public void registerFileOutputTask(AbstractInvokable outTask, Class<? extends FileOutputFormat<Record>> stubClass, String outPath) { registerFileOutputTask(outTask, InstantiationUtil.instantiate(stubClass, FileOutputFormat.class), outPath); } - public void registerFileOutputTask(AbstractInvokable outTask, FileOutputFormat outputFormat, String outPath) { + public void registerFileOutputTask(AbstractInvokable outTask, FileOutputFormat<Record> outputFormat, String outPath) { TaskConfig dsConfig = new TaskConfig(this.mockEnv.getTaskConfiguration()); outputFormat.setOutputFilePath(new Path(outPath)); outputFormat.setWriteMode(WriteMode.OVERWRITE); - dsConfig.setStubWrapper(new UserCodeObjectWrapper<FileOutputFormat>(outputFormat)); + dsConfig.setStubWrapper(new UserCodeObjectWrapper<>(outputFormat)); outTask.setEnvironment(this.mockEnv); @@ -139,9 +139,9 @@ public abstract class TaskTestBase extends TestLogger { } public void registerFileInputTask(AbstractInvokable inTask, - Class<? extends DelimitedInputFormat> stubClass, String inPath, String delimiter) + Class<? extends DelimitedInputFormat<Record>> stubClass, String inPath, String delimiter) { - DelimitedInputFormat format; + DelimitedInputFormat<Record> format; try { format = stubClass.newInstance(); } @@ -153,7 +153,7 @@ public abstract class TaskTestBase extends TestLogger { format.setDelimiter(delimiter); TaskConfig dsConfig = new TaskConfig(this.mockEnv.getTaskConfiguration()); - dsConfig.setStubWrapper(new UserCodeObjectWrapper<DelimitedInputFormat>(format)); + dsConfig.setStubWrapper(new UserCodeObjectWrapper<>(format)); this.inputSplitProvider.addInputSplits(inPath, 5);