[FLINK-1085] [runtime] Combiner forwards oversized records, rather than failing on them.
This closes #854 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/72718811 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/72718811 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/72718811 Branch: refs/heads/master Commit: 7271881163d240ad1106a77036dce981dafb82f3 Parents: 7761ddb Author: dabaitu <[email protected]> Authored: Sat Jun 20 15:35:48 2015 -0700 Committer: Stephan Ewen <[email protected]> Committed: Mon Jul 13 16:29:38 2015 +0200 ---------------------------------------------------------------------- .../operators/GroupReduceCombineDriver.java | 53 +++++++++----------- .../runtime/operators/CombineTaskTest.java | 48 ++++++++++++++---- .../java/org/apache/flink/yarn/UtilsTest.java | 2 +- 3 files changed, 63 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/72718811/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java index 493eb4f..c426295 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java @@ -37,7 +37,6 @@ import org.apache.flink.runtime.util.ReusingKeyGroupedIterator; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; -import java.io.IOException; import java.util.List; /** @@ -79,6 +78,8 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin private Collector<OUT> output; + private long oversizedRecordCount = 0L; + private volatile boolean running = true; private boolean objectReuseEnabled = false; @@ -142,7 +143,7 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin this.objectReuseEnabled = executionConfig.isObjectReuseEnabled(); if (LOG.isDebugEnabled()) { - LOG.debug("GroupReduceCombineDriver object reuse: " + (this.objectReuseEnabled ? "ENABLED" : "DISABLED") + "."); + LOG.debug("GroupReduceCombineDriver object reuse: {}.", (this.objectReuseEnabled ? "ENABLED" : "DISABLED")); } } @@ -170,7 +171,10 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin // write the value again if (!this.sorter.write(value)) { - throw new IOException("Cannot write record to fresh sort buffer. Record too large."); + ++oversizedRecordCount; + LOG.debug("Cannot write record to fresh sort buffer. Record too large. Oversized record count: {}", oversizedRecordCount); + // simply forward the record + this.output.collect((OUT)value); } } @@ -179,39 +183,28 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin } private void sortAndCombine() throws Exception { + if (sorter.isEmpty()) { + return; + } + final InMemorySorter<IN> sorter = this.sorter; + this.sortAlgo.sort(sorter); + final GroupCombineFunction<IN, OUT> combiner = this.combiner; + final Collector<OUT> output = this.output; + // iterate over key groups if (objectReuseEnabled) { - if (!sorter.isEmpty()) { - this.sortAlgo.sort(sorter); - - final ReusingKeyGroupedIterator<IN> keyIter = - new ReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.serializer, this.groupingComparator); - - final GroupCombineFunction<IN, OUT> combiner = this.combiner; - final Collector<OUT> output = this.output; - - // iterate over key groups - while (this.running && keyIter.nextKey()) { - combiner.combine(keyIter.getValues(), output); - } + final ReusingKeyGroupedIterator<IN> keyIter = + new ReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.serializer, this.groupingComparator); + while (this.running && keyIter.nextKey()) { + combiner.combine(keyIter.getValues(), output); } } else { - if (!sorter.isEmpty()) { - this.sortAlgo.sort(sorter); - - final NonReusingKeyGroupedIterator<IN> keyIter = - new NonReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.groupingComparator); - - final GroupCombineFunction<IN, OUT> combiner = this.combiner; - final Collector<OUT> output = this.output; - - // iterate over key groups - while (this.running && keyIter.nextKey()) { - combiner.combine(keyIter.getValues(), output); - } + final NonReusingKeyGroupedIterator<IN> keyIter = + new NonReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.groupingComparator); + while (this.running && keyIter.nextKey()) { + combiner.combine(keyIter.getValues(), output); } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/72718811/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java index 3d9e991..7772151 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java @@ -19,23 +19,21 @@ package org.apache.flink.runtime.operators; import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.runtime.operators.testutils.*; import org.junit.Assert; import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.common.typeutils.record.RecordComparator; import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable; -import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator; -import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector; -import org.apache.flink.runtime.operators.testutils.DriverTestBase; -import org.apache.flink.runtime.operators.testutils.ExpectedTestException; -import org.apache.flink.runtime.operators.testutils.TaskCancelThread; -import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator; import org.apache.flink.types.IntValue; import org.apache.flink.types.Key; import org.apache.flink.types.Record; import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.apache.flink.runtime.operators.testutils.TestData.Generator; import org.junit.Test; public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Record, ?>> @@ -65,7 +63,7 @@ public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Reco addDriverComparator(this.comparator); addDriverComparator(this.comparator); setOutput(this.outList); - + getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE); getTaskConfig().setRelativeMemoryDriver(combine_frac); getTaskConfig().setFilehandlesDriver(2); @@ -92,7 +90,39 @@ public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Reco this.outList.clear(); } - + + @Test + public void testOversizedRecordCombineTask() { + int tenMil = 10000000; + Generator g = new Generator(561349061987311L, 1, tenMil); + //generate 10 records each of size 10MB + final TestData.GeneratorIterator gi = new TestData.GeneratorIterator(g, 10); + List<MutableObjectIterator<Record>> inputs = new ArrayList<MutableObjectIterator<Record>>(); + inputs.add(gi); + + addInput(new UnionIterator<Record>(inputs)); + addDriverComparator(this.comparator); + addDriverComparator(this.comparator); + setOutput(this.outList); + + getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE); + getTaskConfig().setRelativeMemoryDriver(combine_frac); + getTaskConfig().setFilehandlesDriver(2); + + final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<Record, Record>(); + + try { + testDriver(testTask, MockCombiningReduceStub.class); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("Invoke method caused exception."); + } + + Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+10, this.outList.size() == 10); + + this.outList.clear(); + } + @Test public void testFailingCombineTask() { int keyCnt = 100; @@ -119,7 +149,7 @@ public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Reco Assert.fail("Test failed due to an exception."); } } - + @Test public void testCancelCombineTaskSorting() { http://git-wip-us.apache.org/repos/asf/flink/blob/72718811/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java index 25a1413..9ee60a5 100644 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java +++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java @@ -36,8 +36,8 @@ public class UtilsTest { @Test public void testUberjarLocator() { File dir = YarnTestBase.findFile(".", new YarnTestBase.RootDirFilenameFilter()); - Assert.assertTrue(dir.getName().endsWith(".jar")); Assert.assertNotNull(dir); + Assert.assertTrue(dir.getName().endsWith(".jar")); dir = dir.getParentFile().getParentFile(); // from uberjar to lib to root Assert.assertTrue(dir.exists()); Assert.assertTrue(dir.isDirectory());
