[FLINK-1285] Make execution mode configurable
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7b32a05 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7b32a05 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7b32a05 Branch: refs/heads/master Commit: b7b32a050e4b77012b9b70bb98b2a22d293dfbad Parents: 3832d7b Author: Aljoscha Krettek <[email protected]> Authored: Mon Dec 15 18:40:11 2014 +0100 Committer: Stephan Ewen <[email protected]> Committed: Wed Jan 7 19:16:10 2015 +0100 ---------------------------------------------------------------------- .../plantranslate/NepheleJobGraphGenerator.java | 14 +- .../flink/api/common/ExecutionConfig.java | 41 +- .../java/org/apache/flink/api/common/Plan.java | 42 +- .../flink/api/java/CollectionEnvironment.java | 7 +- .../flink/api/java/ExecutionEnvironment.java | 1 + .../java/typeutils/runtime/TupleSerializer.java | 1 + .../AbstractCachedBuildSideMatchDriver.java | 141 ++-- .../runtime/operators/AllGroupReduceDriver.java | 56 +- .../runtime/operators/AllReduceDriver.java | 50 +- .../flink/runtime/operators/CoGroupDriver.java | 34 +- .../CoGroupWithSolutionSetFirstDriver.java | 121 ++- .../CoGroupWithSolutionSetSecondDriver.java | 122 ++- .../runtime/operators/CollectorMapDriver.java | 34 +- .../flink/runtime/operators/CrossDriver.java | 211 +++-- .../flink/runtime/operators/DataSinkTask.java | 44 +- .../flink/runtime/operators/DataSourceTask.java | 88 ++- .../flink/runtime/operators/FlatMapDriver.java | 35 +- .../operators/GroupReduceCombineDriver.java | 49 +- .../runtime/operators/GroupReduceDriver.java | 26 +- .../JoinWithSolutionSetFirstDriver.java | 93 ++- .../JoinWithSolutionSetSecondDriver.java | 90 ++- .../flink/runtime/operators/MapDriver.java | 6 + .../runtime/operators/MapPartitionDriver.java | 28 +- .../flink/runtime/operators/MatchDriver.java | 68 +- .../flink/runtime/operators/NoOpDriver.java | 38 +- .../runtime/operators/PactTaskContext.java | 5 +- .../runtime/operators/ReduceCombineDriver.java | 85 +- .../flink/runtime/operators/ReduceDriver.java | 80 +- .../runtime/operators/RegularPactTask.java | 26 +- .../operators/chaining/ChainedDriver.java | 10 +- .../SynchronousChainedCombineDriver.java | 52 +- .../hash/BuildFirstHashMatchIterator.java | 168 ---- .../BuildFirstReOpenableHashMatchIterator.java | 78 -- .../hash/BuildSecondHashMatchIterator.java | 166 ---- .../BuildSecondReOpenableHashMatchIterator.java | 77 -- .../operators/hash/CompactingHashTable.java | 83 +- .../operators/hash/HashMatchIteratorBase.java | 51 ++ .../operators/hash/InMemoryPartition.java | 5 + .../NonReusingBuildFirstHashMatchIterator.java | 152 ++++ ...ngBuildFirstReOpenableHashMatchIterator.java | 80 ++ .../NonReusingBuildSecondHashMatchIterator.java | 150 ++++ ...gBuildSecondReOpenableHashMatchIterator.java | 81 ++ .../ReusingBuildFirstHashMatchIterator.java | 154 ++++ ...ngBuildFirstReOpenableHashMatchIterator.java | 80 ++ .../ReusingBuildSecondHashMatchIterator.java | 152 ++++ ...gBuildSecondReOpenableHashMatchIterator.java | 79 ++ .../sort/CombiningUnilateralSortMerger.java | 4 +- .../operators/sort/FixedLengthRecordSorter.java | 4 +- .../operators/sort/MergeMatchIterator.java | 14 +- .../NonReusingSortMergeCoGroupIterator.java | 157 ++++ .../sort/ReusingSortMergeCoGroupIterator.java | 158 ++++ .../sort/SortMergeCoGroupIterator.java | 155 ---- .../flink/runtime/util/KeyGroupedIterator.java | 239 ------ .../util/KeyGroupedIteratorImmutable.java | 222 ------ .../util/MutableToRegularIteratorWrapper.java | 102 --- .../util/NonReusingKeyGroupedIterator.java | 222 ++++++ ...nReusingMutableToRegularIteratorWrapper.java | 96 +++ .../runtime/util/ReusingKeyGroupedIterator.java | 242 ++++++ .../ReusingMutableToRegularIteratorWrapper.java | 103 +++ .../runtime/operators/CachedMatchTaskTest.java | 5 +- .../operators/CoGroupTaskExternalITCase.java | 5 +- .../runtime/operators/CoGroupTaskTest.java | 5 +- .../operators/CombineTaskExternalITCase.java | 5 +- .../runtime/operators/CombineTaskTest.java | 5 +- .../operators/CrossTaskExternalITCase.java | 5 +- .../flink/runtime/operators/CrossTaskTest.java | 5 +- .../flink/runtime/operators/MapTaskTest.java | 5 +- .../operators/MatchTaskExternalITCase.java | 5 +- .../flink/runtime/operators/MatchTaskTest.java | 5 +- .../operators/ReduceTaskExternalITCase.java | 5 +- .../flink/runtime/operators/ReduceTaskTest.java | 5 +- .../operators/drivers/TestTaskContext.java | 8 + .../operators/hash/HashMatchIteratorITCase.java | 778 ------------------- .../hash/NonReusingHashMatchIteratorITCase.java | 778 +++++++++++++++++++ .../NonReusingReOpenableHashTableITCase.java | 533 +++++++++++++ .../hash/ReOpenableHashTableITCase.java | 534 ------------- .../hash/ReusingHashMatchIteratorITCase.java | 778 +++++++++++++++++++ .../hash/ReusingReOpenableHashTableITCase.java | 532 +++++++++++++ .../CombiningUnilateralSortMergerITCase.java | 4 +- .../sort/MassiveStringValueSortingITCase.java | 10 + ...onReusingSortMergeCoGroupIteratorITCase.java | 227 ++++++ .../ReusingSortMergeCoGroupIteratorITCase.java | 227 ++++++ .../sort/SortMergeCoGroupIteratorITCase.java | 227 ------ .../operators/testutils/DriverTestBase.java | 73 +- .../operators/testutils/MockEnvironment.java | 10 +- .../operators/util/HashVsSortMiniBenchmark.java | 12 +- .../util/KeyGroupedIteratorImmutableTest.java | 371 --------- .../runtime/util/KeyGroupedIteratorTest.java | 401 ---------- .../util/NonReusingKeyGroupedIteratorTest.java | 371 +++++++++ .../util/ReusingKeyGroupedIteratorTest.java | 400 ++++++++++ .../flink/test/util/JavaProgramTestBase.java | 51 +- .../CoGroupConnectedComponentsITCase.java | 4 +- .../DependencyConnectedComponentsITCase.java | 3 + .../test/javaApiOperators/ReduceITCase.java | 37 - .../flink/test/operators/ObjectReuseITCase.java | 247 ++++++ 95 files changed, 7532 insertions(+), 4116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java index 9c2efb3..260b30c 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java @@ -18,6 +18,7 @@ package org.apache.flink.compiler.plantranslate; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -27,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.aggregators.AggregatorRegistry; import org.apache.flink.api.common.aggregators.AggregatorWithName; import org.apache.flink.api.common.aggregators.ConvergenceCriterion; @@ -82,6 +84,7 @@ import org.apache.flink.runtime.operators.chaining.ChainedDriver; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.runtime.operators.util.LocalStrategy; import org.apache.flink.runtime.operators.util.TaskConfig; +import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Visitor; /** @@ -209,7 +212,16 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> { for (Entry<String, DistributedCacheEntry> e : program.getOriginalPactPlan().getCachedFiles()) { DistributedCache.writeFileInfoToConfig(e.getKey(), e.getValue(), graph.getJobConfiguration()); } - + + try { + InstantiationUtil.writeObjectToConfig( + program.getOriginalPactPlan().getExecutionConfig(), + graph.getJobConfiguration(), + ExecutionConfig.CONFIG_KEY); + } catch (IOException e) { + throw new RuntimeException("Config object could not be written to Job Configuration: " + e); + } + // release all references again this.vertices = null; this.chainedTasks = null; http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 03d5e3a..17e683f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -37,7 +37,7 @@ public class ExecutionConfig implements Serializable { // For future use... // private boolean forceGenericSerializer = false; -// private boolean objectReuse = false; + private boolean objectReuse = false; /** * Enables the ClosureCleaner. This analyzes user code functions and sets fields to null @@ -143,17 +143,30 @@ public class ExecutionConfig implements Serializable { // return forceGenericSerializer; // } // -// public ExecutionConfig enableObjectReuse() { -// objectReuse = true; -// return this; -// } -// -// public ExecutionConfig disableObjectReuse() { -// objectReuse = false; -// return this; -// } -// -// public boolean isObjectReuseEnabled() { -// return objectReuse; -// } + + /** + * Enables reusing objects that Flink internally uses for deserialization and passing + * data to user-code functions. Keep in mind that this can lead to bugs when the + * user-code function of an operation is not aware of this behaviour. + */ + public ExecutionConfig enableObjectReuse() { + objectReuse = true; + return this; + } + + /** + * Disables reusing objects that Flink internally uses for deserialization and passing + * data to user-code functions. @see #enableObjectReuse() + */ + public ExecutionConfig disableObjectReuse() { + objectReuse = false; + return this; + } + + /** + * Returns whether object reuse has been enabled or disabled. @see #enableObjectReuse() + */ + public boolean isObjectReuseEnabled() { + return objectReuse; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-core/src/main/java/org/apache/flink/api/common/Plan.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java index f299ef4..4a975d2 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java @@ -69,14 +69,14 @@ public class Plan implements Visitable<Operator<?>> { protected int defaultParallelism = DEFAULT_PARALELLISM; /** - * The number of times failed tasks are re-executed. + * Hash map for files in the distributed cache: registered name to cache entry. */ - protected int numberOfExecutionRetries; + protected HashMap<String, DistributedCacheEntry> cacheFile = new HashMap<String, DistributedCacheEntry>(); /** - * Hash map for files in the distributed cache: registered name to cache entry. + * Config object for runtime execution parameters. */ - protected HashMap<String, DistributedCacheEntry> cacheFile = new HashMap<String, DistributedCacheEntry>(); + protected ExecutionConfig executionConfig = new ExecutionConfig(); // ------------------------------------------------------------------------ @@ -264,20 +264,6 @@ public class Plan implements Visitable<Operator<?>> { } /** - * Sets the number of times that failed tasks are re-executed. A value of zero - * effectively disables fault tolerance. A value of {@code -1} indicates that the system - * default value (as defined in the configuration) should be used. - * - * @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks. - */ - public void setNumberOfExecutionRetries(int numberOfExecutionRetries) { - if (numberOfExecutionRetries < -1) { - throw new IllegalArgumentException("The number of execution retries must be non-negative, or -1 (use system default)"); - } - this.numberOfExecutionRetries = numberOfExecutionRetries; - } - - /** * Gets the number of times the system will try to re-execute failed tasks. A value * of {@code -1} indicates that the system default value (as defined in the configuration) * should be used. @@ -285,7 +271,7 @@ public class Plan implements Visitable<Operator<?>> { * @return The number of times the system will try to re-execute failed tasks. */ public int getNumberOfExecutionRetries() { - return numberOfExecutionRetries; + return executionConfig.getNumberOfExecutionRetries(); } /** @@ -297,7 +283,23 @@ public class Plan implements Visitable<Operator<?>> { public String getPostPassClassName() { return "org.apache.flink.compiler.postpass.RecordModelPostPass"; } - + + /** + * Sets the runtime config object. + * @return + */ + public ExecutionConfig getExecutionConfig() { + return executionConfig; + } + + /** + * Gets the runtime config object. + * @param executionConfig + */ + public void setExecutionConfig(ExecutionConfig executionConfig) { + this.executionConfig = executionConfig; + } + // ------------------------------------------------------------------------ /** http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java index 3e9ff66..2d57490 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java @@ -24,13 +24,12 @@ import org.apache.flink.api.common.operators.CollectionExecutor; public class CollectionEnvironment extends ExecutionEnvironment { - private boolean mutableObjectSafeMode = true; - @Override public JobExecutionResult execute(String jobName) throws Exception { Plan p = createProgramPlan(jobName); - - CollectionExecutor exec = new CollectionExecutor(mutableObjectSafeMode); + + // We need to reverse here. Object-Reuse enabled, means safe mode is disabled. + CollectionExecutor exec = new CollectionExecutor(!getConfig().isObjectReuseEnabled()); return exec.execute(p); } http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index c19e9aa..f7057ef 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -749,6 +749,7 @@ public abstract class ExecutionEnvironment { if (getDegreeOfParallelism() > 0) { plan.setDefaultParallelism(getDegreeOfParallelism()); } + plan.setExecutionConfig(getConfig()); try { registerCachedFilesWithPlan(plan); http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java index ae429a7..9564c01 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java @@ -53,6 +53,7 @@ public final class TupleSerializer<T extends Tuple> extends TupleSerializerBase< @Override public T createInstance(Object[] fields) { + try { T t = tupleClass.newInstance(); http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java index 286d830..f3b2dfd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java @@ -19,12 +19,15 @@ package org.apache.flink.runtime.operators; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparatorFactory; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.operators.hash.BuildFirstReOpenableHashMatchIterator; -import org.apache.flink.runtime.operators.hash.BuildSecondReOpenableHashMatchIterator; +import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstReOpenableHashMatchIterator; +import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondReOpenableHashMatchIterator; +import org.apache.flink.runtime.operators.hash.ReusingBuildFirstReOpenableHashMatchIterator; +import org.apache.flink.runtime.operators.hash.ReusingBuildSecondReOpenableHashMatchIterator; import org.apache.flink.runtime.operators.util.JoinTaskIterator; import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.util.Collector; @@ -37,7 +40,10 @@ public abstract class AbstractCachedBuildSideMatchDriver<IT1, IT2, OT> extends M private final int buildSideIndex; private final int probeSideIndex; - + + private boolean objectReuseEnabled = false; + + protected AbstractCachedBuildSideMatchDriver(int buildSideIndex, int probeSideIndex) { this.buildSideIndex = buildSideIndex; this.probeSideIndex = probeSideIndex; @@ -69,34 +75,67 @@ public abstract class AbstractCachedBuildSideMatchDriver<IT1, IT2, OT> extends M double availableMemory = config.getRelativeMemoryDriver(); - if (buildSideIndex == 0 && probeSideIndex == 1) { - - matchIterator = - new BuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>(input1, input2, - serializer1, comparator1, - serializer2, comparator2, - pairComparatorFactory.createComparator21(comparator1, comparator2), - this.taskContext.getMemoryManager(), - this.taskContext.getIOManager(), - this.taskContext.getOwningNepheleTask(), - availableMemory - ); - - } else if (buildSideIndex == 1 && probeSideIndex == 0) { - - matchIterator = - new BuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>(input1, input2, - serializer1, comparator1, - serializer2, comparator2, - pairComparatorFactory.createComparator12(comparator1, comparator2), - this.taskContext.getMemoryManager(), - this.taskContext.getIOManager(), - this.taskContext.getOwningNepheleTask(), - availableMemory - ); - + ExecutionConfig executionConfig = taskContext.getExecutionConfig(); + objectReuseEnabled = executionConfig.isObjectReuseEnabled(); + + if (objectReuseEnabled) { + if (buildSideIndex == 0 && probeSideIndex == 1) { + + matchIterator = new ReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>( + input1, input2, + serializer1, comparator1, + serializer2, comparator2, + pairComparatorFactory.createComparator21(comparator1, comparator2), + this.taskContext.getMemoryManager(), + this.taskContext.getIOManager(), + this.taskContext.getOwningNepheleTask(), + availableMemory); + + + } else if (buildSideIndex == 1 && probeSideIndex == 0) { + + matchIterator = new ReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>( + input1, input2, + serializer1, comparator1, + serializer2, comparator2, + pairComparatorFactory.createComparator12(comparator1, comparator2), + this.taskContext.getMemoryManager(), + this.taskContext.getIOManager(), + this.taskContext.getOwningNepheleTask(), + availableMemory); + + } else { + throw new Exception("Error: Inconsistent setup for repeatable hash join driver."); + } } else { - throw new Exception("Error: Inconcistent setup for repeatable hash join driver."); + if (buildSideIndex == 0 && probeSideIndex == 1) { + + matchIterator = new NonReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>( + input1, input2, + serializer1, comparator1, + serializer2, comparator2, + pairComparatorFactory.createComparator21(comparator1, comparator2), + this.taskContext.getMemoryManager(), + this.taskContext.getIOManager(), + this.taskContext.getOwningNepheleTask(), + availableMemory); + + + } else if (buildSideIndex == 1 && probeSideIndex == 0) { + + matchIterator = new NonReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>( + input1, input2, + serializer1, comparator1, + serializer2, comparator2, + pairComparatorFactory.createComparator12(comparator1, comparator2), + this.taskContext.getMemoryManager(), + this.taskContext.getIOManager(), + this.taskContext.getOwningNepheleTask(), + availableMemory); + + } else { + throw new Exception("Error: Inconsistent setup for repeatable hash join driver."); + } } this.matchIterator.open(); @@ -113,21 +152,8 @@ public abstract class AbstractCachedBuildSideMatchDriver<IT1, IT2, OT> extends M final FlatJoinFunction<IT1, IT2, OT> matchStub = this.taskContext.getStub(); final Collector<OT> collector = this.taskContext.getOutputCollector(); - if (buildSideIndex == 0) { - - final BuildFirstReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (BuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator; - - while (this.running && matchIterator != null && matchIterator.callWithNextKey(matchStub, collector)); - - } else if (buildSideIndex == 1) { + while (this.running && matchIterator != null && matchIterator.callWithNextKey(matchStub, collector)); - final BuildSecondReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (BuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator; - - while (this.running && matchIterator != null && matchIterator.callWithNextKey(matchStub, collector)); - - } else { - throw new Exception(); - } } @Override @@ -138,14 +164,25 @@ public abstract class AbstractCachedBuildSideMatchDriver<IT1, IT2, OT> extends M MutableObjectIterator<IT1> input1 = this.taskContext.getInput(0); MutableObjectIterator<IT2> input2 = this.taskContext.getInput(1); - - if (buildSideIndex == 0 && probeSideIndex == 1) { - final BuildFirstReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (BuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator; - matchIterator.reopenProbe(input2); - } - else { - final BuildSecondReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (BuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator; - matchIterator.reopenProbe(input1); + + if (objectReuseEnabled) { + if (buildSideIndex == 0 && probeSideIndex == 1) { + final ReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (ReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator; + + matchIterator.reopenProbe(input2); + } else { + final ReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (ReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator; + matchIterator.reopenProbe(input1); + } + } else { + if (buildSideIndex == 0 && probeSideIndex == 1) { + final NonReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (NonReusingBuildFirstReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator; + + matchIterator.reopenProbe(input2); + } else { + final NonReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT> matchIterator = (NonReusingBuildSecondReOpenableHashMatchIterator<IT1, IT2, OT>) this.matchIterator; + matchIterator.reopenProbe(input1); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java index 5e68bab..854dbd5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java @@ -19,13 +19,15 @@ package org.apache.flink.runtime.operators; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.runtime.util.NonReusingMutableToRegularIteratorWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.functions.FlatCombineFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.operators.util.TaskConfig; -import org.apache.flink.runtime.util.MutableToRegularIteratorWrapper; +import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; @@ -51,6 +53,8 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunct private DriverStrategy strategy; + private boolean objectReuseEnabled = false; + // ------------------------------------------------------------------------ @Override @@ -92,6 +96,13 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunct } this.serializer = this.taskContext.<IT>getInputSerializer(0).getSerializer(); this.input = this.taskContext.getInput(0); + + ExecutionConfig executionConfig = taskContext.getExecutionConfig(); + this.objectReuseEnabled = executionConfig.isObjectReuseEnabled(); + + if (LOG.isDebugEnabled()) { + LOG.debug("AllGroupReduceDriver object reuse: " + (this.objectReuseEnabled ? "ENABLED" : "DISABLED") + "."); + } } @Override @@ -100,21 +111,36 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunct LOG.debug(this.taskContext.formatLogString("AllGroupReduce preprocessing done. Running Reducer code.")); } - final MutableToRegularIteratorWrapper<IT> inIter = new MutableToRegularIteratorWrapper<IT>(this.input, this.serializer); - - // single UDF call with the single group - if (inIter.hasNext()) { - if (strategy == DriverStrategy.ALL_GROUP_REDUCE) { - final GroupReduceFunction<IT, OT> reducer = this.taskContext.getStub(); - final Collector<OT> output = this.taskContext.getOutputCollector(); - reducer.reduce(inIter, output); + if (objectReuseEnabled) { + final ReusingMutableToRegularIteratorWrapper<IT> inIter = new ReusingMutableToRegularIteratorWrapper<IT>(this.input, this.serializer); + + // single UDF call with the single group + if (inIter.hasNext()) { + if (strategy == DriverStrategy.ALL_GROUP_REDUCE) { + final GroupReduceFunction<IT, OT> reducer = this.taskContext.getStub(); + final Collector<OT> output = this.taskContext.getOutputCollector(); + reducer.reduce(inIter, output); + } else { + @SuppressWarnings("unchecked") final FlatCombineFunction<IT> combiner = (FlatCombineFunction<IT>) this.taskContext.getStub(); + @SuppressWarnings("unchecked") final Collector<IT> output = (Collector<IT>) this.taskContext.getOutputCollector(); + combiner.combine(inIter, output); + } } - else { - @SuppressWarnings("unchecked") - final FlatCombineFunction<IT> combiner = (FlatCombineFunction<IT>) this.taskContext.getStub(); - @SuppressWarnings("unchecked") - final Collector<IT> output = (Collector<IT>) this.taskContext.getOutputCollector(); - combiner.combine(inIter, output); + + } else { + final NonReusingMutableToRegularIteratorWrapper<IT> inIter = new NonReusingMutableToRegularIteratorWrapper<IT>(this.input, this.serializer); + + // single UDF call with the single group + if (inIter.hasNext()) { + if (strategy == DriverStrategy.ALL_GROUP_REDUCE) { + final GroupReduceFunction<IT, OT> reducer = this.taskContext.getStub(); + final Collector<OT> output = this.taskContext.getOutputCollector(); + reducer.reduce(inIter, output); + } else { + @SuppressWarnings("unchecked") final FlatCombineFunction<IT> combiner = (FlatCombineFunction<IT>) this.taskContext.getStub(); + @SuppressWarnings("unchecked") final Collector<IT> output = (Collector<IT>) this.taskContext.getOutputCollector(); + combiner.combine(inIter, output); + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java index e26f4eb..dff2dbd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.operators; +import org.apache.flink.api.common.ExecutionConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.functions.ReduceFunction; @@ -49,6 +50,8 @@ public class AllReduceDriver<T> implements PactDriver<ReduceFunction<T>, T> { private boolean running; + private boolean objectReuseEnabled = false; + // ------------------------------------------------------------------------ @Override @@ -86,6 +89,13 @@ public class AllReduceDriver<T> implements PactDriver<ReduceFunction<T>, T> { TypeSerializerFactory<T> serializerFactory = this.taskContext.getInputSerializer(0); this.serializer = serializerFactory.getSerializer(); this.input = this.taskContext.getInput(0); + + ExecutionConfig executionConfig = taskContext.getExecutionConfig(); + this.objectReuseEnabled = executionConfig.isObjectReuseEnabled(); + + if (LOG.isDebugEnabled()) { + LOG.debug("AllReduceDriver object reuse: " + (this.objectReuseEnabled ? "ENABLED" : "DISABLED") + "."); + } } @Override @@ -97,19 +107,35 @@ public class AllReduceDriver<T> implements PactDriver<ReduceFunction<T>, T> { final ReduceFunction<T> stub = this.taskContext.getStub(); final MutableObjectIterator<T> input = this.input; final TypeSerializer<T> serializer = this.serializer; - - T val1 = serializer.createInstance(); - - if ((val1 = input.next(val1)) == null) { - return; - } - - T val2; - while (running && (val2 = input.next(serializer.createInstance())) != null) { - val1 = stub.reduce(val1, val2); + + + if (objectReuseEnabled) { + T val1 = serializer.createInstance(); + + if ((val1 = input.next(val1)) == null) { + return; + } + + T val2 = serializer.createInstance(); + while (running && (val2 = input.next(val2)) != null) { + val1 = stub.reduce(val1, val2); + } + + this.taskContext.getOutputCollector().collect(val1); + } else { + T val1 = serializer.createInstance(); + + if ((val1 = input.next(val1)) == null) { + return; + } + + T val2; + while (running && (val2 = input.next(serializer.createInstance())) != null) { + val1 = stub.reduce(val1, val2); + } + + this.taskContext.getOutputCollector().collect(val1); } - - this.taskContext.getOutputCollector().collect(val1); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java index 55498fb..6ace918 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java @@ -19,13 +19,15 @@ package org.apache.flink.runtime.operators; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.runtime.operators.sort.NonReusingSortMergeCoGroupIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparatorFactory; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.operators.sort.SortMergeCoGroupIterator; +import org.apache.flink.runtime.operators.sort.ReusingSortMergeCoGroupIterator; import org.apache.flink.runtime.operators.util.CoGroupTaskIterator; import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.util.Collector; @@ -39,7 +41,7 @@ import org.apache.flink.util.MutableObjectIterator; * The CoGroupTask group all pairs that share the same key from both inputs. Each for each key, the sets of values that * were pair with that key of both inputs are handed to the <code>coGroup()</code> method of the CoGroupFunction. * - * @see org.apache.flink.api.java.record.functions.CoGroupFunction + * @see org.apache.flink.api.common.functions.CoGroupFunction */ public class CoGroupDriver<IT1, IT2, OT> implements PactDriver<CoGroupFunction<IT1, IT2, OT>, OT> { @@ -52,6 +54,8 @@ public class CoGroupDriver<IT1, IT2, OT> implements PactDriver<CoGroupFunction<I private volatile boolean running; + private boolean objectReuseEnabled = false; + // ------------------------------------------------------------------------ @@ -105,10 +109,28 @@ public class CoGroupDriver<IT1, IT2, OT> implements PactDriver<CoGroupFunction<I throw new Exception("Missing pair comparator factory for CoGroup driver"); } - // create CoGropuTaskIterator according to provided local strategy. - this.coGroupIterator = new SortMergeCoGroupIterator<IT1, IT2>(in1, in2, - serializer1, groupComparator1, serializer2, groupComparator2, - pairComparatorFactory.createComparator12(groupComparator1, groupComparator2)); + ExecutionConfig executionConfig = taskContext.getExecutionConfig(); + this.objectReuseEnabled = executionConfig.isObjectReuseEnabled(); + + if (LOG.isDebugEnabled()) { + LOG.debug("CoGroupDriver object reuse: " + (this.objectReuseEnabled ? "ENABLED" : "DISABLED") + "."); + } + + if (objectReuseEnabled) { + // create CoGropuTaskIterator according to provided local strategy. + this.coGroupIterator = new ReusingSortMergeCoGroupIterator<IT1, IT2>( + in1, in2, + serializer1, groupComparator1, + serializer2, groupComparator2, + pairComparatorFactory.createComparator12(groupComparator1, groupComparator2)); + } else { + // create CoGropuTaskIterator according to provided local strategy. + this.coGroupIterator = new NonReusingSortMergeCoGroupIterator<IT1, IT2>( + in1, in2, + serializer1, groupComparator1, + serializer2, groupComparator2, + pairComparatorFactory.createComparator12(groupComparator1, groupComparator2)); + } // open CoGroupTaskIterator - this triggers the sorting and blocks until the iterator is ready this.coGroupIterator.open(); http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java index 5e0ca6d..a3c69a3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.operators; import java.util.Collections; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.operators.util.JoinHashMap; import org.apache.flink.api.common.typeutils.TypeComparator; @@ -31,7 +32,8 @@ import org.apache.flink.runtime.iterative.concurrent.SolutionSetBroker; import org.apache.flink.runtime.iterative.task.AbstractIterativePactTask; import org.apache.flink.runtime.operators.hash.CompactingHashTable; import org.apache.flink.runtime.operators.util.TaskConfig; -import org.apache.flink.runtime.util.KeyGroupedIterator; +import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator; +import org.apache.flink.runtime.util.ReusingKeyGroupedIterator; import org.apache.flink.runtime.util.SingleElementIterator; import org.apache.flink.util.Collector; @@ -46,13 +48,18 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements Resettab private TypeSerializer<IT2> probeSideSerializer; private TypeComparator<IT2> probeSideComparator; - + + private TypeSerializer<IT1> solutionSetSerializer; + + private TypePairComparator<IT2, IT1> pairComparator; private IT1 solutionSideRecord; protected volatile boolean running; + private boolean objectReuseEnabled = false; + // -------------------------------------------------------------------------------------------- @Override @@ -96,7 +103,6 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements Resettab @SuppressWarnings("unchecked") public void initialize() { - final TypeSerializer<IT1> solutionSetSerializer; final TypeComparator<IT1> solutionSetComparator; // grab a handle to the hash table from the iteration broker @@ -130,7 +136,12 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements Resettab this.probeSideSerializer = taskContext.<IT2>getInputSerializer(0).getSerializer(); this.probeSideComparator = probeSideComparatorFactory.createComparator(); - solutionSideRecord = solutionSetSerializer.createInstance(); + ExecutionConfig executionConfig = taskContext.getExecutionConfig(); + objectReuseEnabled = executionConfig.isObjectReuseEnabled(); + + if (objectReuseEnabled) { + solutionSideRecord = solutionSetSerializer.createInstance(); + } TypePairComparatorFactory<IT1, IT2> factory = taskContext.getTaskConfig().getPairComparatorFactory(taskContext.getUserCodeClassLoader()); pairComparator = factory.createComparator21(solutionSetComparator, this.probeSideComparator); @@ -149,46 +160,84 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements Resettab final CoGroupFunction<IT1, IT2, OT> coGroupStub = taskContext.getStub(); final Collector<OT> collector = taskContext.getOutputCollector(); - final KeyGroupedIterator<IT2> probeSideInput = new KeyGroupedIterator<IT2>(taskContext.<IT2>getInput(0), probeSideSerializer, probeSideComparator); final SingleElementIterator<IT1> siIter = new SingleElementIterator<IT1>(); final Iterable<IT1> emptySolutionSide = Collections.emptySet(); - - if (this.hashTable != null) { - final CompactingHashTable<IT1> join = hashTable; - final CompactingHashTable<IT1>.HashTableProber<IT2> prober = join.getProber(this.probeSideComparator, this.pairComparator); - - IT1 buildSideRecord = solutionSideRecord; - - while (this.running && probeSideInput.nextKey()) { - IT2 current = probeSideInput.getCurrent(); - - buildSideRecord = prober.getMatchFor(current, buildSideRecord); - if (buildSideRecord != null) { - siIter.set(buildSideRecord); - coGroupStub.coGroup(siIter, probeSideInput.getValues(), collector); + + if (objectReuseEnabled) { + final ReusingKeyGroupedIterator<IT2> probeSideInput = new ReusingKeyGroupedIterator<IT2>(taskContext.<IT2>getInput(0), probeSideSerializer, probeSideComparator); + if (this.hashTable != null) { + final CompactingHashTable<IT1> join = hashTable; + final CompactingHashTable<IT1>.HashTableProber<IT2> prober = join.getProber(this.probeSideComparator, this.pairComparator); + + + IT1 buildSideRecord = solutionSideRecord; + + while (this.running && probeSideInput.nextKey()) { + IT2 current = probeSideInput.getCurrent(); + + buildSideRecord = prober.getMatchFor(current, buildSideRecord); + if (buildSideRecord != null) { + siIter.set(buildSideRecord); + coGroupStub.coGroup(siIter, probeSideInput.getValues(), collector); + } else { + coGroupStub.coGroup(emptySolutionSide, probeSideInput.getValues(), collector); + } } - else { - coGroupStub.coGroup(emptySolutionSide, probeSideInput.getValues(), collector); + } else { + final JoinHashMap<IT1> join = this.objectMap; + final JoinHashMap<IT1>.Prober<IT2> prober = join.createProber(this.probeSideComparator, this.pairComparator); + final TypeSerializer<IT1> serializer = join.getBuildSerializer(); + + while (this.running && probeSideInput.nextKey()) { + IT2 current = probeSideInput.getCurrent(); + + IT1 buildSideRecord = prober.lookupMatch(current); + if (buildSideRecord != null) { + siIter.set(serializer.copy(buildSideRecord)); + coGroupStub.coGroup(siIter, probeSideInput.getValues(), collector); + } else { + coGroupStub.coGroup(emptySolutionSide, probeSideInput.getValues(), collector); + } } } - } - else { - final JoinHashMap<IT1> join = this.objectMap; - final JoinHashMap<IT1>.Prober<IT2> prober = join.createProber(this.probeSideComparator, this.pairComparator); - final TypeSerializer<IT1> serializer = join.getBuildSerializer(); - - while (this.running && probeSideInput.nextKey()) { - IT2 current = probeSideInput.getCurrent(); - - IT1 buildSideRecord = prober.lookupMatch(current); - if (buildSideRecord != null) { - siIter.set(serializer.copy(buildSideRecord)); - coGroupStub.coGroup(siIter, probeSideInput.getValues(), collector); + } else { + final NonReusingKeyGroupedIterator<IT2> probeSideInput = new NonReusingKeyGroupedIterator<IT2>(taskContext.<IT2>getInput(0), probeSideSerializer, probeSideComparator); + if (this.hashTable != null) { + final CompactingHashTable<IT1> join = hashTable; + final CompactingHashTable<IT1>.HashTableProber<IT2> prober = join.getProber(this + .probeSideComparator, this.pairComparator); + + IT1 buildSideRecord; + + while (this.running && probeSideInput.nextKey()) { + IT2 current = probeSideInput.getCurrent(); + + buildSideRecord = prober.getMatchFor(current); + if (buildSideRecord != null) { + siIter.set(solutionSetSerializer.copy(buildSideRecord)); + coGroupStub.coGroup(siIter, probeSideInput.getValues(), collector); + } else { + coGroupStub.coGroup(emptySolutionSide, probeSideInput.getValues(), collector); + } } - else { - coGroupStub.coGroup(emptySolutionSide, probeSideInput.getValues(), collector); + } else { + final JoinHashMap<IT1> join = this.objectMap; + final JoinHashMap<IT1>.Prober<IT2> prober = join.createProber(this.probeSideComparator, this.pairComparator); + final TypeSerializer<IT1> serializer = join.getBuildSerializer(); + + while (this.running && probeSideInput.nextKey()) { + IT2 current = probeSideInput.getCurrent(); + + IT1 buildSideRecord = prober.lookupMatch(current); + if (buildSideRecord != null) { + siIter.set(serializer.copy(buildSideRecord)); + coGroupStub.coGroup(siIter, probeSideInput.getValues(), collector); + } else { + coGroupStub.coGroup(emptySolutionSide, probeSideInput.getValues(), collector); + } } } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java index fb88505..17fc471 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.operators; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.operators.util.JoinHashMap; import org.apache.flink.api.common.typeutils.TypeComparator; @@ -30,7 +31,8 @@ import org.apache.flink.runtime.iterative.task.AbstractIterativePactTask; import org.apache.flink.runtime.operators.hash.CompactingHashTable; import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.runtime.util.EmptyIterator; -import org.apache.flink.runtime.util.KeyGroupedIterator; +import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator; +import org.apache.flink.runtime.util.ReusingKeyGroupedIterator; import org.apache.flink.runtime.util.SingleElementIterator; import org.apache.flink.util.Collector; @@ -45,13 +47,17 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resetta private TypeSerializer<IT1> probeSideSerializer; private TypeComparator<IT1> probeSideComparator; - + + private TypeSerializer<IT2> solutionSetSerializer; + private TypePairComparator<IT1, IT2> pairComparator; private IT2 solutionSideRecord; protected volatile boolean running; + private boolean objectReuseEnabled = false; + // -------------------------------------------------------------------------------------------- @Override @@ -95,7 +101,6 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resetta @SuppressWarnings("unchecked") public void initialize() throws Exception { - final TypeSerializer<IT2> solutionSetSerializer; final TypeComparator<IT2> solutionSetComparator; // grab a handle to the hash table from the iteration broker @@ -129,8 +134,13 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resetta this.probeSideSerializer = taskContext.<IT1>getInputSerializer(0).getSerializer(); this.probeSideComparator = probeSideComparatorFactory.createComparator(); - - solutionSideRecord = solutionSetSerializer.createInstance(); + + ExecutionConfig executionConfig = taskContext.getExecutionConfig(); + objectReuseEnabled = executionConfig.isObjectReuseEnabled(); + + if (objectReuseEnabled) { + solutionSideRecord = solutionSetSerializer.createInstance(); + }; TypePairComparatorFactory<IT1, IT2> factory = taskContext.getTaskConfig().getPairComparatorFactory(taskContext.getUserCodeClassLoader()); pairComparator = factory.createComparator12(this.probeSideComparator, solutionSetComparator); @@ -149,46 +159,84 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resetta final CoGroupFunction<IT1, IT2, OT> coGroupStub = taskContext.getStub(); final Collector<OT> collector = taskContext.getOutputCollector(); - final KeyGroupedIterator<IT1> probeSideInput = new KeyGroupedIterator<IT1>(taskContext.<IT1>getInput(0), probeSideSerializer, probeSideComparator); final SingleElementIterator<IT2> siIter = new SingleElementIterator<IT2>(); final Iterable<IT2> emptySolutionSide = EmptyIterator.<IT2>get(); - - if (this.hashTable != null) { - final CompactingHashTable<IT2> join = hashTable; - final CompactingHashTable<IT2>.HashTableProber<IT1> prober = join.getProber(this.probeSideComparator, this.pairComparator); - - IT2 buildSideRecord = solutionSideRecord; - - while (this.running && probeSideInput.nextKey()) { - IT1 current = probeSideInput.getCurrent(); - - buildSideRecord = prober.getMatchFor(current, buildSideRecord); - if (buildSideRecord != null) { - siIter.set(buildSideRecord); - coGroupStub.coGroup(probeSideInput.getValues(), siIter, collector); + + if (objectReuseEnabled) { + final ReusingKeyGroupedIterator<IT1> probeSideInput = new ReusingKeyGroupedIterator<IT1>(taskContext.<IT1>getInput(0), probeSideSerializer, probeSideComparator); + + if (this.hashTable != null) { + final CompactingHashTable<IT2> join = hashTable; + final CompactingHashTable<IT2>.HashTableProber<IT1> prober = join.getProber(this.probeSideComparator, this.pairComparator); + + IT2 buildSideRecord = solutionSideRecord; + + while (this.running && probeSideInput.nextKey()) { + IT1 current = probeSideInput.getCurrent(); + + buildSideRecord = prober.getMatchFor(current, buildSideRecord); + if (buildSideRecord != null) { + siIter.set(buildSideRecord); + coGroupStub.coGroup(probeSideInput.getValues(), siIter, collector); + } else { + coGroupStub.coGroup(probeSideInput.getValues(), emptySolutionSide, collector); + } } - else { - coGroupStub.coGroup(probeSideInput.getValues(), emptySolutionSide, collector); + } else { + final JoinHashMap<IT2> join = this.objectMap; + final JoinHashMap<IT2>.Prober<IT1> prober = join.createProber(this.probeSideComparator, this.pairComparator); + final TypeSerializer<IT2> serializer = join.getBuildSerializer(); + + while (this.running && probeSideInput.nextKey()) { + IT1 current = probeSideInput.getCurrent(); + + IT2 buildSideRecord = prober.lookupMatch(current); + if (buildSideRecord != null) { + siIter.set(serializer.copy(buildSideRecord)); + coGroupStub.coGroup(probeSideInput.getValues(), siIter, collector); + } else { + coGroupStub.coGroup(probeSideInput.getValues(), emptySolutionSide, collector); + } } } - } - else { - final JoinHashMap<IT2> join = this.objectMap; - final JoinHashMap<IT2>.Prober<IT1> prober = join.createProber(this.probeSideComparator, this.pairComparator); - final TypeSerializer<IT2> serializer = join.getBuildSerializer(); - - while (this.running && probeSideInput.nextKey()) { - IT1 current = probeSideInput.getCurrent(); - - IT2 buildSideRecord = prober.lookupMatch(current); - if (buildSideRecord != null) { - siIter.set(serializer.copy(buildSideRecord)); - coGroupStub.coGroup(probeSideInput.getValues(), siIter, collector); + } else { + final NonReusingKeyGroupedIterator<IT1> probeSideInput = new NonReusingKeyGroupedIterator<IT1>(taskContext.<IT1>getInput(0), probeSideSerializer, probeSideComparator); + + if (this.hashTable != null) { + final CompactingHashTable<IT2> join = hashTable; + final CompactingHashTable<IT2>.HashTableProber<IT1> prober = join.getProber(this.probeSideComparator, this.pairComparator); + + IT2 buildSideRecord; + + while (this.running && probeSideInput.nextKey()) { + IT1 current = probeSideInput.getCurrent(); + + buildSideRecord = prober.getMatchFor(current); + if (buildSideRecord != null) { + siIter.set(solutionSetSerializer.copy(buildSideRecord)); + coGroupStub.coGroup(probeSideInput.getValues(), siIter, collector); + } else { + coGroupStub.coGroup(probeSideInput.getValues(), emptySolutionSide, collector); + } } - else { - coGroupStub.coGroup(probeSideInput.getValues(), emptySolutionSide, collector); + } else { + final JoinHashMap<IT2> join = this.objectMap; + final JoinHashMap<IT2>.Prober<IT1> prober = join.createProber(this.probeSideComparator, this.pairComparator); + final TypeSerializer<IT2> serializer = join.getBuildSerializer(); + + while (this.running && probeSideInput.nextKey()) { + IT1 current = probeSideInput.getCurrent(); + + IT2 buildSideRecord = prober.lookupMatch(current); + if (buildSideRecord != null) { + siIter.set(serializer.copy(buildSideRecord)); + coGroupStub.coGroup(probeSideInput.getValues(), siIter, collector); + } else { + coGroupStub.coGroup(probeSideInput.getValues(), emptySolutionSide, collector); + } } } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java index 3858864..766a9d9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java @@ -19,9 +19,12 @@ package org.apache.flink.runtime.operators; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.GenericCollectorMap; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Map task which is executed by a Nephele task manager. The task has a single @@ -38,12 +41,16 @@ import org.apache.flink.util.MutableObjectIterator; */ @SuppressWarnings("deprecation") public class CollectorMapDriver<IT, OT> implements PactDriver<GenericCollectorMap<IT, OT>, OT> { - + + private static final Logger LOG = LoggerFactory.getLogger(CollectorMapDriver.class); + + private PactTaskContext<GenericCollectorMap<IT, OT>, OT> taskContext; private volatile boolean running; - - + + private boolean objectReuseEnabled = false; + @Override public void setup(PactTaskContext<GenericCollectorMap<IT, OT>, OT> context) { this.taskContext = context; @@ -69,7 +76,12 @@ public class CollectorMapDriver<IT, OT> implements PactDriver<GenericCollectorMa @Override public void prepare() { - // nothing, since a mapper does not need any preparation + ExecutionConfig executionConfig = taskContext.getExecutionConfig(); + this.objectReuseEnabled = executionConfig.isObjectReuseEnabled(); + + if (LOG.isDebugEnabled()) { + LOG.debug("CollectorMapDriver object reuse: " + (this.objectReuseEnabled ? "ENABLED" : "DISABLED") + "."); + } } @Override @@ -79,10 +91,18 @@ public class CollectorMapDriver<IT, OT> implements PactDriver<GenericCollectorMa final GenericCollectorMap<IT, OT> stub = this.taskContext.getStub(); final Collector<OT> output = this.taskContext.getOutputCollector(); - IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance(); + if (objectReuseEnabled) { + IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance(); + - while (this.running && ((record = input.next(record)) != null)) { - stub.map(record, output); + while (this.running && ((record = input.next(record)) != null)) { + stub.map(record, output); + } + } else { + IT record; + while (this.running && ((record = input.next()) != null)) { + stub.map(record, output); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java index 4e6745a..197c08d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.operators; +import org.apache.flink.api.common.ExecutionConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.functions.CrossFunction; @@ -62,7 +63,9 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<CrossFunction<T1, T2, private boolean firstIsOuter; private volatile boolean running; - + + private boolean objectReuseEnabled = false; + // ------------------------------------------------------------------------ @@ -140,6 +143,13 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<CrossFunction<T1, T2, } this.memPagesForBlockSide = numPages - this.memPagesForSpillingSide; } + + ExecutionConfig executionConfig = taskContext.getExecutionConfig(); + this.objectReuseEnabled = executionConfig.isObjectReuseEnabled(); + + if (LOG.isDebugEnabled()) { + LOG.debug("CrossDriver object reuse: " + (this.objectReuseEnabled ? "ENABLED" : "DISABLED") + "."); + } } @@ -201,30 +211,47 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<CrossFunction<T1, T2, this.taskContext.getOwningNepheleTask()); this.spillIter = spillVals; - T1 val1; - final T1 val1Reuse = serializer1.createInstance(); - T2 val2; - final T2 val2Reuse = serializer2.createInstance(); - T2 val2Copy = serializer2.createInstance(); - + final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub(); final Collector<OT> collector = this.taskContext.getOutputCollector(); - - // for all blocks - do { - // for all values from the spilling side - while (this.running && ((val2 = spillVals.next(val2Reuse)) != null)) { - // for all values in the block - while ((val1 = blockVals.next(val1Reuse)) != null) { - val2Copy = serializer2.copy(val2, val2Copy); - collector.collect(crosser.cross(val1,val2Copy)); - //crosser.cross(val1, val2Copy, collector); + + + if (objectReuseEnabled) { + final T1 val1Reuse = serializer1.createInstance(); + final T2 val2Reuse = serializer2.createInstance(); + T1 val1; + T2 val2; + + // for all blocks + do { + // for all values from the spilling side + while (this.running && ((val2 = spillVals.next(val2Reuse)) != null)) { + // for all values in the block + while ((val1 = blockVals.next(val1Reuse)) != null) { + collector.collect(crosser.cross(val1, val2)); + } + blockVals.reset(); } - blockVals.reset(); - } - spillVals.reset(); + spillVals.reset(); + } while (this.running && blockVals.nextBlock()); + } else { + T1 val1; + T2 val2; + + // for all blocks + do { + // for all values from the spilling side + while (this.running && ((val2 = spillVals.next()) != null)) { + // for all values in the block + while ((val1 = blockVals.next()) != null) { + collector.collect(crosser.cross(val1, serializer2.copy(val2))); + } + blockVals.reset(); + } + spillVals.reset(); + } while (this.running && blockVals.nextBlock()); + } - while (this.running && blockVals.nextBlock()); } private void runBlockedOuterSecond() throws Exception { @@ -249,30 +276,45 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<CrossFunction<T1, T2, this.taskContext.getOwningNepheleTask()); this.blockIter = blockVals; - T1 val1; - final T1 val1Reuse = serializer1.createInstance(); - T1 val1Copy = serializer1.createInstance(); - T2 val2; - final T2 val2Reuse = serializer2.createInstance(); - final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub(); final Collector<OT> collector = this.taskContext.getOutputCollector(); - - // for all blocks - do { - // for all values from the spilling side - while (this.running && ((val1 = spillVals.next(val1Reuse)) != null)) { - // for all values in the block - while (this.running && ((val2 = blockVals.next(val2Reuse)) != null)) { - val1Copy = serializer1.copy(val1, val1Copy); - collector.collect(crosser.cross(val1Copy, val2)); - //crosser.cross(val1Copy, val2, collector); + + if (objectReuseEnabled) { + final T1 val1Reuse = serializer1.createInstance(); + final T2 val2Reuse = serializer2.createInstance(); + T1 val1; + T2 val2; + + // for all blocks + do { + // for all values from the spilling side + while (this.running && ((val1 = spillVals.next(val1Reuse)) != null)) { + // for all values in the block + while (this.running && ((val2 = blockVals.next(val2Reuse)) != null)) { + collector.collect(crosser.cross(val1, val2)); + } + blockVals.reset(); } - blockVals.reset(); - } - spillVals.reset(); + spillVals.reset(); + } while (this.running && blockVals.nextBlock()); + } else { + T1 val1; + T2 val2; + + // for all blocks + do { + // for all values from the spilling side + while (this.running && ((val1 = spillVals.next()) != null)) { + // for all values in the block + while (this.running && ((val2 = blockVals.next()) != null)) { + collector.collect(crosser.cross(serializer1.copy(val1), val2)); + } + blockVals.reset(); + } + spillVals.reset(); + } while (this.running && blockVals.nextBlock()); + } - while (this.running && blockVals.nextBlock()); } private void runStreamedOuterFirst() throws Exception { @@ -292,24 +334,36 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<CrossFunction<T1, T2, this.taskContext.getOwningNepheleTask()); this.spillIter = spillVals; - T1 val1; - final T1 val1Reuse = serializer1.createInstance(); - T1 val1Copy = serializer1.createInstance(); - T2 val2; - final T2 val2Reuse = serializer2.createInstance(); - final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub(); final Collector<OT> collector = this.taskContext.getOutputCollector(); - - // for all blocks - while (this.running && ((val1 = in1.next(val1Reuse)) != null)) { - // for all values from the spilling side - while (this.running && ((val2 = spillVals.next(val2Reuse)) != null)) { - val1Copy = serializer1.copy(val1, val1Copy); - collector.collect(crosser.cross(val1Copy, val2)); - //crosser.cross(val1Copy, val2, collector); + + if (objectReuseEnabled) { + final T1 val1Reuse = serializer1.createInstance(); + final T2 val2Reuse = serializer2.createInstance(); + T1 val1; + T2 val2; + + // for all blocks + while (this.running && ((val1 = in1.next(val1Reuse)) != null)) { + // for all values from the spilling side + while (this.running && ((val2 = spillVals.next(val2Reuse)) != null)) { + collector.collect(crosser.cross(val1, val2)); + } + spillVals.reset(); } - spillVals.reset(); + } else { + T1 val1; + T2 val2; + + // for all blocks + while (this.running && ((val1 = in1.next()) != null)) { + // for all values from the spilling side + while (this.running && ((val2 = spillVals.next()) != null)) { + collector.collect(crosser.cross(serializer1.copy(val1), val2)); + } + spillVals.reset(); + } + } } @@ -328,25 +382,38 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<CrossFunction<T1, T2, in1, serializer1, this.memManager, this.taskContext.getIOManager(), this.memPagesForSpillingSide, this.taskContext.getOwningNepheleTask()); this.spillIter = spillVals; - - T1 val1; - final T1 val1Reuse = serializer1.createInstance(); - T2 val2; - final T2 val2Reuse = serializer2.createInstance(); - T2 val2Copy = serializer2.createInstance(); - + final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub(); final Collector<OT> collector = this.taskContext.getOutputCollector(); - - // for all blocks - while (this.running && (val2 = in2.next(val2Reuse)) != null) { - // for all values from the spilling side - while (this.running && (val1 = spillVals.next(val1Reuse)) != null) { - val2Copy = serializer2.copy(val2, val2Copy); - collector.collect(crosser.cross(val1, val2Copy)); - //crosser.cross(val1, val2Copy, collector); + + if (objectReuseEnabled) { + final T1 val1Reuse = serializer1.createInstance(); + final T2 val2Reuse = serializer2.createInstance(); + T1 val1; + T2 val2; + + // for all blocks + while (this.running && (val2 = in2.next(val2Reuse)) != null) { + // for all values from the spilling side + while (this.running && (val1 = spillVals.next(val1Reuse)) != null) { + collector.collect(crosser.cross(val1, val2)); + //crosser.cross(val1, val2Copy, collector); + } + spillVals.reset(); + } + } else { + T1 val1; + T2 val2; + + // for all blocks + while (this.running && (val2 = in2.next()) != null) { + // for all values from the spilling side + while (this.running && (val1 = spillVals.next()) != null) { + collector.collect(crosser.cross(val1, serializer2.copy(val2))); + } + spillVals.reset(); } - spillVals.reset(); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java index 610ab06..e3ffdba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java @@ -19,7 +19,9 @@ package org.apache.flink.runtime.operators; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.runtime.plugable.DeserializationDelegate; +import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.io.CleanupWhenUnsuccessful; @@ -41,6 +43,8 @@ import org.apache.flink.runtime.operators.util.ReaderIterator; import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.util.MutableObjectIterator; +import java.io.IOException; + /** * DataSinkTask which is executed by a task manager. The task hands the data to an output format. * @@ -75,7 +79,6 @@ public class DataSinkTask<IT> extends AbstractInvokable { private volatile boolean taskCanceled; private volatile boolean cleanupCalled; - @Override public void registerInputOutput() { @@ -106,6 +109,22 @@ public class DataSinkTask<IT> extends AbstractInvokable { if (LOG.isDebugEnabled()) { LOG.debug(getLogString("Starting data sink operator")); } + + ExecutionConfig executionConfig = new ExecutionConfig(); + try { + ExecutionConfig c = (ExecutionConfig) InstantiationUtil.readObjectFromConfig( + getJobConfiguration(), + ExecutionConfig.CONFIG_KEY, + this.getClass().getClassLoader()); + if (c != null) { + executionConfig = c; + } + } catch (IOException e) { + throw new RuntimeException("Could not load ExecutionConfig from Job Configuration: " + e); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Could not load ExecutionConfig from Job Configuration: " + e); + } + boolean objectReuseEnabled = executionConfig.isObjectReuseEnabled(); try { @@ -150,10 +169,8 @@ public class DataSinkTask<IT> extends AbstractInvokable { final TypeSerializer<IT> serializer = this.inputTypeSerializerFactory.getSerializer(); final MutableObjectIterator<IT> input = this.input; final OutputFormat<IT> format = this.format; - - - IT record = serializer.createInstance(); - + + // check if task has been canceled if (this.taskCanceled) { return; @@ -166,9 +183,20 @@ public class DataSinkTask<IT> extends AbstractInvokable { // open format.open(this.getEnvironment().getIndexInSubtaskGroup(), this.getEnvironment().getCurrentNumberOfSubtasks()); - // work! - while (!this.taskCanceled && ((record = input.next(record)) != null)) { - format.writeRecord(record); + if (objectReuseEnabled) { + IT record = serializer.createInstance(); + + // work! + while (!this.taskCanceled && ((record = input.next(record)) != null)) { + format.writeRecord(record); + } + } else { + IT record; + + // work! + while (!this.taskCanceled && ((record = input.next()) != null)) { + format.writeRecord(record); + } } // close. We close here such that a regular close throwing an exception marks a task as failed. http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java index 2db652f..b50bf36 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java @@ -18,12 +18,15 @@ package org.apache.flink.runtime.operators; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.accumulators.Accumulator; @@ -36,10 +39,8 @@ import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.io.network.api.BufferWriter; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; -import org.apache.flink.runtime.operators.chaining.ChainedCollectorMapDriver; import org.apache.flink.runtime.operators.chaining.ChainedDriver; import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException; -import org.apache.flink.runtime.operators.shipping.OutputCollector; import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.util.Collector; @@ -74,6 +75,23 @@ public class DataSourceTask<OT> extends AbstractInvokable { // cancel flag private volatile boolean taskCanceled = false; + private ExecutionConfig getExecutionConfig() { + ExecutionConfig executionConfig = new ExecutionConfig(); + try { + ExecutionConfig c = (ExecutionConfig) InstantiationUtil.readObjectFromConfig( + getJobConfiguration(), + ExecutionConfig.CONFIG_KEY, + this.getClass().getClassLoader()); + if (c != null) { + executionConfig = c; + } + } catch (IOException e) { + throw new RuntimeException("Could not load ExecutionConfig from Job Configuration: " + e); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Could not load ExecutionConfig from Job Configuration: " + e); + } + return executionConfig; + } @Override public void registerInputOutput() { @@ -102,6 +120,27 @@ public class DataSourceTask<OT> extends AbstractInvokable { if (LOG.isDebugEnabled()) { LOG.debug(getLogString("Starting data source operator")); } + + ExecutionConfig executionConfig = new ExecutionConfig(); + try { + ExecutionConfig c = (ExecutionConfig) InstantiationUtil.readObjectFromConfig( + getJobConfiguration(), + ExecutionConfig.CONFIG_KEY, + this.getClass().getClassLoader()); + if (c != null) { + executionConfig = c; + } + } catch (IOException e) { + throw new RuntimeException("Could not load ExecutionConfig from Job Configuration: " + e); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Could not load ExecutionConfig from Job Configuration: " + e); + } + + boolean objectReuseEnabled = executionConfig.isObjectReuseEnabled(); + + if (LOG.isDebugEnabled()) { + LOG.debug("DataSourceTask object reuse: " + (objectReuseEnabled ? "ENABLED" : "DISABLED") + "."); + } final TypeSerializer<OT> serializer = this.serializerFactory.getSerializer(); @@ -121,8 +160,6 @@ public class DataSourceTask<OT> extends AbstractInvokable { // get start and end final InputSplit split = splitIterator.next(); - OT record = serializer.createInstance(); - if (LOG.isDebugEnabled()) { LOG.debug(getLogString("Opening input split " + split.toString())); } @@ -137,48 +174,31 @@ public class DataSourceTask<OT> extends AbstractInvokable { } try { - // special case to make the loops tighter - if (this.output instanceof OutputCollector) { - final OutputCollector<OT> output = (OutputCollector<OT>) this.output; - - // as long as there is data to read - while (!this.taskCanceled && !format.reachedEnd()) { - - OT returned; - if ((returned = format.nextRecord(record)) != null) { - output.collect(returned); - record = returned; - } - } - } - else if (this.output instanceof ChainedCollectorMapDriver) { - @SuppressWarnings("unchecked") - final ChainedCollectorMapDriver<OT, ?> output = (ChainedCollectorMapDriver<OT, ?>) this.output; - + + final Collector<OT> output = this.output; + + if (objectReuseEnabled) { + OT reuse = serializer.createInstance(); + // as long as there is data to read while (!this.taskCanceled && !format.reachedEnd()) { - + OT returned; - if ((returned = format.nextRecord(record)) != null) { + if ((returned = format.nextRecord(reuse)) != null) { output.collect(returned); - record = returned; } } - } - else { - final Collector<OT> output = this.output; - + } else { // as long as there is data to read while (!this.taskCanceled && !format.reachedEnd()) { - + OT returned; - if ((returned = format.nextRecord(record)) != null) { + if ((returned = format.nextRecord(serializer.createInstance())) != null) { output.collect(returned); - record = returned; } } } - + if (LOG.isDebugEnabled() && !this.taskCanceled) { LOG.debug(getLogString("Closing input split " + split.toString())); } @@ -285,7 +305,7 @@ public class DataSourceTask<OT> extends AbstractInvokable { private void initOutputs(ClassLoader cl) throws Exception { this.chainedTasks = new ArrayList<ChainedDriver<?, ?>>(); this.eventualOutputs = new ArrayList<BufferWriter>(); - this.output = RegularPactTask.initOutputs(this, cl, this.config, this.chainedTasks, this.eventualOutputs); + this.output = RegularPactTask.initOutputs(this, cl, this.config, this.chainedTasks, this.eventualOutputs, getExecutionConfig()); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java index f4ae62d..d63a3e3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java @@ -19,9 +19,12 @@ package org.apache.flink.runtime.operators; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Map task which is executed by a Nephele task manager. The task has a single @@ -37,12 +40,15 @@ import org.apache.flink.util.MutableObjectIterator; * @param <OT> The mapper's output data type. */ public class FlatMapDriver<IT, OT> implements PactDriver<FlatMapFunction<IT, OT>, OT> { - + + private static final Logger LOG = LoggerFactory.getLogger(FlatMapDriver.class); + private PactTaskContext<FlatMapFunction<IT, OT>, OT> taskContext; private volatile boolean running; - - + + private boolean objectReuseEnabled = false; + @Override public void setup(PactTaskContext<FlatMapFunction<IT, OT>, OT> context) { this.taskContext = context; @@ -68,8 +74,12 @@ public class FlatMapDriver<IT, OT> implements PactDriver<FlatMapFunction<IT, OT> @Override public void prepare() { - // nothing, since a mapper does not need any preparation - } + ExecutionConfig executionConfig = taskContext.getExecutionConfig(); + this.objectReuseEnabled = executionConfig.isObjectReuseEnabled(); + + if (LOG.isDebugEnabled()) { + LOG.debug("FlatMapDriver object reuse: " + (this.objectReuseEnabled ? "ENABLED" : "DISABLED") + "."); + } } @Override public void run() throws Exception { @@ -78,10 +88,19 @@ public class FlatMapDriver<IT, OT> implements PactDriver<FlatMapFunction<IT, OT> final FlatMapFunction<IT, OT> function = this.taskContext.getStub(); final Collector<OT> output = this.taskContext.getOutputCollector(); - IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance(); + if (objectReuseEnabled) { + IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance(); + + + while (this.running && ((record = input.next(record)) != null)) { + function.flatMap(record, output); + } + } else { + IT record; - while (this.running && ((record = input.next(record)) != null)) { - function.flatMap(record, output); + while (this.running && ((record = input.next()) != null)) { + function.flatMap(record, output); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java index 4323eae..8d8d5dc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java @@ -19,6 +19,8 @@ package org.apache.flink.runtime.operators; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.functions.FlatCombineFunction; @@ -31,7 +33,7 @@ import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter; import org.apache.flink.runtime.operators.sort.InMemorySorter; import org.apache.flink.runtime.operators.sort.NormalizedKeySorter; import org.apache.flink.runtime.operators.sort.QuickSort; -import org.apache.flink.runtime.util.KeyGroupedIterator; +import org.apache.flink.runtime.util.ReusingKeyGroupedIterator; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; @@ -72,6 +74,8 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti private volatile boolean running = true; + private boolean objectReuseEnabled = false; + // ------------------------------------------------------------------------ @Override @@ -125,6 +129,13 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti } else { this.sorter = new NormalizedKeySorter<T>(this.serializer, this.sortingComparator.duplicate(), memory); } + + ExecutionConfig executionConfig = taskContext.getExecutionConfig(); + this.objectReuseEnabled = executionConfig.isObjectReuseEnabled(); + + if (LOG.isDebugEnabled()) { + LOG.debug("GroupReduceCombineDriver object reuse: " + (this.objectReuseEnabled ? "ENABLED" : "DISABLED") + "."); + } } @Override @@ -162,19 +173,37 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti private void sortAndCombine() throws Exception { final InMemorySorter<T> sorter = this.sorter; - if (!sorter.isEmpty()) { - this.sortAlgo.sort(sorter); + if (objectReuseEnabled) { + if (!sorter.isEmpty()) { + this.sortAlgo.sort(sorter); + + final ReusingKeyGroupedIterator<T> keyIter = new ReusingKeyGroupedIterator<T>(sorter.getIterator(), this.serializer, this.groupingComparator); - final KeyGroupedIterator<T> keyIter = new KeyGroupedIterator<T>(sorter.getIterator(), this.serializer, - this.groupingComparator); - final FlatCombineFunction<T> combiner = this.combiner; - final Collector<T> output = this.output; + final FlatCombineFunction<T> combiner = this.combiner; + final Collector<T> output = this.output; - // iterate over key groups - while (this.running && keyIter.nextKey()) { - combiner.combine(keyIter.getValues(), output); + // iterate over key groups + while (this.running && keyIter.nextKey()) { + combiner.combine(keyIter.getValues(), output); + } } + } else { + if (!sorter.isEmpty()) { + this.sortAlgo.sort(sorter); + + final NonReusingKeyGroupedIterator<T> keyIter = new NonReusingKeyGroupedIterator<T>(sorter.getIterator(), this.serializer, this.groupingComparator); + + + final FlatCombineFunction<T> combiner = this.combiner; + final Collector<T> output = this.output; + + // iterate over key groups + while (this.running && keyIter.nextKey()) { + combiner.combine(keyIter.getValues(), output); + } + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/b7b32a05/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java index 2ec9873..9d9f994 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java @@ -19,14 +19,15 @@ package org.apache.flink.runtime.operators; +import org.apache.flink.api.common.ExecutionConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.operators.util.TaskConfig; -import org.apache.flink.runtime.util.KeyGroupedIterator; -import org.apache.flink.runtime.util.KeyGroupedIteratorImmutable; +import org.apache.flink.runtime.util.ReusingKeyGroupedIterator; +import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; @@ -51,11 +52,11 @@ public class GroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunction private TypeSerializer<IT> serializer; private TypeComparator<IT> comparator; - - private boolean mutableObjectMode = false; - + private volatile boolean running; + private boolean objectReuseEnabled = false; + // ------------------------------------------------------------------------ @Override @@ -92,11 +93,12 @@ public class GroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunction this.serializer = this.taskContext.<IT>getInputSerializer(0).getSerializer(); this.comparator = this.taskContext.getDriverComparator(0); this.input = this.taskContext.getInput(0); - - this.mutableObjectMode = config.getMutableObjectMode(); - + + ExecutionConfig executionConfig = taskContext.getExecutionConfig(); + this.objectReuseEnabled = executionConfig.isObjectReuseEnabled(); + if (LOG.isDebugEnabled()) { - LOG.debug("GroupReduceDriver uses " + (this.mutableObjectMode ? "MUTABLE" : "IMMUTABLE") + " object mode."); + LOG.debug("GroupReduceDriver object reuse: " + (this.objectReuseEnabled ? "ENABLED" : "DISABLED") + "."); } } @@ -110,15 +112,15 @@ public class GroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunction final GroupReduceFunction<IT, OT> stub = this.taskContext.getStub(); final Collector<OT> output = this.taskContext.getOutputCollector(); - if (mutableObjectMode) { - final KeyGroupedIterator<IT> iter = new KeyGroupedIterator<IT>(this.input, this.serializer, this.comparator); + if (objectReuseEnabled) { + final ReusingKeyGroupedIterator<IT> iter = new ReusingKeyGroupedIterator<IT>(this.input, this.serializer, this.comparator); // run stub implementation while (this.running && iter.nextKey()) { stub.reduce(iter.getValues(), output); } } else { - final KeyGroupedIteratorImmutable<IT> iter = new KeyGroupedIteratorImmutable<IT>(this.input, this.serializer, this.comparator); + final NonReusingKeyGroupedIterator<IT> iter = new NonReusingKeyGroupedIterator<IT>(this.input, this.serializer, this.comparator); // run stub implementation while (this.running && iter.nextKey()) { stub.reduce(iter.getValues(), output);
