http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java index da0f222..0727d63 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java @@ -21,8 +21,8 @@ package org.apache.flink.runtime.operators; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.flink.api.common.functions.GenericCombine; -import org.apache.flink.api.common.functions.GenericGroupReduce; +import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.runtime.util.MutableToRegularIteratorWrapper; @@ -37,13 +37,13 @@ import org.apache.flink.util.MutableObjectIterator; * The GroupReduceTask creates a iterator over all records from its input. The iterator returns all records grouped by their * key. The iterator is handed to the <code>reduce()</code> method of the GroupReduceFunction. * - * @see GenericGroupReduce + * @see org.apache.flink.api.common.functions.GroupReduceFunction */ -public class AllGroupReduceDriver<IT, OT> implements PactDriver<GenericGroupReduce<IT, OT>, OT> { +public class AllGroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunction<IT, OT>, OT> { private static final Log LOG = LogFactory.getLog(AllGroupReduceDriver.class); - private PactTaskContext<GenericGroupReduce<IT, OT>, OT> taskContext; + private PactTaskContext<GroupReduceFunction<IT, OT>, OT> taskContext; private MutableObjectIterator<IT> input; @@ -54,7 +54,7 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GenericGroupRedu // ------------------------------------------------------------------------ @Override - public void setup(PactTaskContext<GenericGroupReduce<IT, OT>, OT> context) { + public void setup(PactTaskContext<GroupReduceFunction<IT, OT>, OT> context) { this.taskContext = context; } @@ -64,9 +64,9 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GenericGroupRedu } @Override - public Class<GenericGroupReduce<IT, OT>> getStubType() { + public Class<GroupReduceFunction<IT, OT>> getStubType() { @SuppressWarnings("unchecked") - final Class<GenericGroupReduce<IT, OT>> clazz = (Class<GenericGroupReduce<IT, OT>>) (Class<?>) GenericGroupReduce.class; + final Class<GroupReduceFunction<IT, OT>> clazz = (Class<GroupReduceFunction<IT, OT>>) (Class<?>) GroupReduceFunction.class; return clazz; } @@ -83,8 +83,8 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GenericGroupRedu this.strategy = config.getDriverStrategy(); if (strategy == DriverStrategy.ALL_GROUP_COMBINE) { - if (!(this.taskContext.getStub() instanceof GenericCombine)) { - throw new Exception("Using combiner on a UDF that does not implement the combiner interface " + GenericCombine.class.getName()); + if (!(this.taskContext.getStub() instanceof FlatCombineFunction)) { + throw new Exception("Using combiner on a UDF that does not implement the combiner interface " + FlatCombineFunction.class.getName()); } } else if (strategy != DriverStrategy.ALL_GROUP_REDUCE) { @@ -105,13 +105,13 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GenericGroupRedu // single UDF call with the single group if (inIter.hasNext()) { if (strategy == DriverStrategy.ALL_GROUP_REDUCE) { - final GenericGroupReduce<IT, OT> reducer = this.taskContext.getStub(); + final GroupReduceFunction<IT, OT> reducer = this.taskContext.getStub(); final Collector<OT> output = this.taskContext.getOutputCollector(); reducer.reduce(inIter, output); } else { @SuppressWarnings("unchecked") - final GenericCombine<IT> combiner = (GenericCombine<IT>) this.taskContext.getStub(); + final FlatCombineFunction<IT> combiner = (FlatCombineFunction<IT>) this.taskContext.getStub(); @SuppressWarnings("unchecked") final Collector<IT> output = (Collector<IT>) this.taskContext.getOutputCollector(); combiner.combine(inIter, output);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java index 30bfae3..721f4f6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.flink.api.common.functions.GenericReduce; +import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.runtime.operators.util.TaskConfig; @@ -35,13 +35,13 @@ import org.apache.flink.util.MutableObjectIterator; * The ReduceTask creates a iterator over all records from its input. The iterator returns all records grouped by their * key. The iterator is handed to the <code>reduce()</code> method of the ReduceFunction. * - * @see GenericReduce + * @see org.apache.flink.api.common.functions.ReduceFunction */ -public class AllReduceDriver<T> implements PactDriver<GenericReduce<T>, T> { +public class AllReduceDriver<T> implements PactDriver<ReduceFunction<T>, T> { private static final Log LOG = LogFactory.getLog(AllReduceDriver.class); - private PactTaskContext<GenericReduce<T>, T> taskContext; + private PactTaskContext<ReduceFunction<T>, T> taskContext; private MutableObjectIterator<T> input; @@ -52,7 +52,7 @@ public class AllReduceDriver<T> implements PactDriver<GenericReduce<T>, T> { // ------------------------------------------------------------------------ @Override - public void setup(PactTaskContext<GenericReduce<T>, T> context) { + public void setup(PactTaskContext<ReduceFunction<T>, T> context) { this.taskContext = context; this.running = true; } @@ -63,9 +63,9 @@ public class AllReduceDriver<T> implements PactDriver<GenericReduce<T>, T> { } @Override - public Class<GenericReduce<T>> getStubType() { + public Class<ReduceFunction<T>> getStubType() { @SuppressWarnings("unchecked") - final Class<GenericReduce<T>> clazz = (Class<GenericReduce<T>>) (Class<?>) GenericReduce.class; + final Class<ReduceFunction<T>> clazz = (Class<ReduceFunction<T>>) (Class<?>) ReduceFunction.class; return clazz; } @@ -94,7 +94,7 @@ public class AllReduceDriver<T> implements PactDriver<GenericReduce<T>, T> { LOG.debug(this.taskContext.formatLogString("AllReduce preprocessing done. Running Reducer code.")); } - final GenericReduce<T> stub = this.taskContext.getStub(); + final ReduceFunction<T> stub = this.taskContext.getStub(); final MutableObjectIterator<T> input = this.input; final TypeSerializer<T> serializer = this.serializer; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java index 3da451a..8ff0262 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.flink.api.common.functions.GenericCoGrouper; +import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparatorFactory; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -41,12 +41,12 @@ import org.apache.flink.util.MutableObjectIterator; * * @see org.apache.flink.api.java.record.functions.CoGroupFunction */ -public class CoGroupDriver<IT1, IT2, OT> implements PactDriver<GenericCoGrouper<IT1, IT2, OT>, OT> { +public class CoGroupDriver<IT1, IT2, OT> implements PactDriver<CoGroupFunction<IT1, IT2, OT>, OT> { private static final Log LOG = LogFactory.getLog(CoGroupDriver.class); - private PactTaskContext<GenericCoGrouper<IT1, IT2, OT>, OT> taskContext; + private PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> taskContext; private CoGroupTaskIterator<IT1, IT2> coGroupIterator; // the iterator that does the actual cogroup @@ -56,7 +56,7 @@ public class CoGroupDriver<IT1, IT2, OT> implements PactDriver<GenericCoGrouper< @Override - public void setup(PactTaskContext<GenericCoGrouper<IT1, IT2, OT>, OT> context) { + public void setup(PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> context) { this.taskContext = context; this.running = true; } @@ -69,9 +69,9 @@ public class CoGroupDriver<IT1, IT2, OT> implements PactDriver<GenericCoGrouper< @Override - public Class<GenericCoGrouper<IT1, IT2, OT>> getStubType() { + public Class<CoGroupFunction<IT1, IT2, OT>> getStubType() { @SuppressWarnings("unchecked") - final Class<GenericCoGrouper<IT1, IT2, OT>> clazz = (Class<GenericCoGrouper<IT1, IT2, OT>>) (Class<?>) GenericCoGrouper.class; + final Class<CoGroupFunction<IT1, IT2, OT>> clazz = (Class<CoGroupFunction<IT1, IT2, OT>>) (Class<?>) CoGroupFunction.class; return clazz; } @@ -122,7 +122,7 @@ public class CoGroupDriver<IT1, IT2, OT> implements PactDriver<GenericCoGrouper< @Override public void run() throws Exception { - final GenericCoGrouper<IT1, IT2, OT> coGroupStub = this.taskContext.getStub(); + final CoGroupFunction<IT1, IT2, OT> coGroupStub = this.taskContext.getStub(); final Collector<OT> collector = this.taskContext.getOutputCollector(); final CoGroupTaskIterator<IT1, IT2> coGroupIterator = this.coGroupIterator; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java index 9d06618..8761a2e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators; import java.util.Iterator; -import org.apache.flink.api.common.functions.GenericCoGrouper; +import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparator; import org.apache.flink.api.common.typeutils.TypePairComparatorFactory; @@ -34,9 +34,9 @@ import org.apache.flink.runtime.util.KeyGroupedIterator; import org.apache.flink.runtime.util.SingleElementIterator; import org.apache.flink.util.Collector; -public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettablePactDriver<GenericCoGrouper<IT1, IT2, OT>, OT> { +public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettablePactDriver<CoGroupFunction<IT1, IT2, OT>, OT> { - private PactTaskContext<GenericCoGrouper<IT1, IT2, OT>, OT> taskContext; + private PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> taskContext; private CompactingHashTable<IT1> hashTable; @@ -53,7 +53,7 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements Resettab // -------------------------------------------------------------------------------------------- @Override - public void setup(PactTaskContext<GenericCoGrouper<IT1, IT2, OT>, OT> context) { + public void setup(PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> context) { this.taskContext = context; this.running = true; } @@ -64,9 +64,9 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements Resettab } @Override - public Class<GenericCoGrouper<IT1, IT2, OT>> getStubType() { + public Class<CoGroupFunction<IT1, IT2, OT>> getStubType() { @SuppressWarnings("unchecked") - final Class<GenericCoGrouper<IT1, IT2, OT>> clazz = (Class<GenericCoGrouper<IT1, IT2, OT>>) (Class<?>) GenericCoGrouper.class; + final Class<CoGroupFunction<IT1, IT2, OT>> clazz = (Class<CoGroupFunction<IT1, IT2, OT>>) (Class<?>) CoGroupFunction.class; return clazz; } @@ -123,7 +123,7 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements Resettab @Override public void run() throws Exception { - final GenericCoGrouper<IT1, IT2, OT> coGroupStub = taskContext.getStub(); + final CoGroupFunction<IT1, IT2, OT> coGroupStub = taskContext.getStub(); final Collector<OT> collector = taskContext.getOutputCollector(); IT1 buildSideRecord = solutionSideRecord; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java index 80fa855..f2020c7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators; import java.util.Iterator; -import org.apache.flink.api.common.functions.GenericCoGrouper; +import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparator; import org.apache.flink.api.common.typeutils.TypePairComparatorFactory; @@ -34,9 +34,9 @@ import org.apache.flink.runtime.util.KeyGroupedIterator; import org.apache.flink.runtime.util.SingleElementIterator; import org.apache.flink.util.Collector; -public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements ResettablePactDriver<GenericCoGrouper<IT1, IT2, OT>, OT> { +public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements ResettablePactDriver<CoGroupFunction<IT1, IT2, OT>, OT> { - private PactTaskContext<GenericCoGrouper<IT1, IT2, OT>, OT> taskContext; + private PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> taskContext; private CompactingHashTable<IT2> hashTable; @@ -53,7 +53,7 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resetta // -------------------------------------------------------------------------------------------- @Override - public void setup(PactTaskContext<GenericCoGrouper<IT1, IT2, OT>, OT> context) { + public void setup(PactTaskContext<CoGroupFunction<IT1, IT2, OT>, OT> context) { this.taskContext = context; this.running = true; } @@ -64,9 +64,9 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resetta } @Override - public Class<GenericCoGrouper<IT1, IT2, OT>> getStubType() { + public Class<CoGroupFunction<IT1, IT2, OT>> getStubType() { @SuppressWarnings("unchecked") - final Class<GenericCoGrouper<IT1, IT2, OT>> clazz = (Class<GenericCoGrouper<IT1, IT2, OT>>) (Class<?>) GenericCoGrouper.class; + final Class<CoGroupFunction<IT1, IT2, OT>> clazz = (Class<CoGroupFunction<IT1, IT2, OT>>) (Class<?>) CoGroupFunction.class; return clazz; } @@ -123,7 +123,7 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resetta @Override public void run() throws Exception { - final GenericCoGrouper<IT1, IT2, OT> coGroupStub = taskContext.getStub(); + final CoGroupFunction<IT1, IT2, OT> coGroupStub = taskContext.getStub(); final Collector<OT> collector = taskContext.getOutputCollector(); IT2 buildSideRecord = solutionSideRecord; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java index 7c311ed..b68f24d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.flink.api.common.functions.GenericCrosser; +import org.apache.flink.api.common.functions.CrossFunction; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.memorymanager.MemoryManager; import org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator; @@ -40,12 +40,12 @@ import org.apache.flink.util.MutableObjectIterator; * * @see org.apache.flink.api.java.functions.CrossFunction */ -public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2, OT>, OT> { +public class CrossDriver<T1, T2, OT> implements PactDriver<CrossFunction<T1, T2, OT>, OT> { private static final Log LOG = LogFactory.getLog(CrossDriver.class); - private PactTaskContext<GenericCrosser<T1, T2, OT>, OT> taskContext; + private PactTaskContext<CrossFunction<T1, T2, OT>, OT> taskContext; private MemoryManager memManager; @@ -67,7 +67,7 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2 @Override - public void setup(PactTaskContext<GenericCrosser<T1, T2, OT>, OT> context) { + public void setup(PactTaskContext<CrossFunction<T1, T2, OT>, OT> context) { this.taskContext = context; this.running = true; } @@ -80,9 +80,9 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2 @Override - public Class<GenericCrosser<T1, T2, OT>> getStubType() { + public Class<CrossFunction<T1, T2, OT>> getStubType() { @SuppressWarnings("unchecked") - final Class<GenericCrosser<T1, T2, OT>> clazz = (Class<GenericCrosser<T1, T2, OT>>) (Class<?>) GenericCrosser.class; + final Class<CrossFunction<T1, T2, OT>> clazz = (Class<CrossFunction<T1, T2, OT>>) (Class<?>) CrossFunction.class; return clazz; } @@ -207,7 +207,7 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2 final T2 val2Reuse = serializer2.createInstance(); T2 val2Copy = serializer2.createInstance(); - final GenericCrosser<T1, T2, OT> crosser = this.taskContext.getStub(); + final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub(); final Collector<OT> collector = this.taskContext.getOutputCollector(); // for all blocks @@ -217,7 +217,8 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2 // for all values in the block while ((val1 = blockVals.next(val1Reuse)) != null) { val2Copy = serializer2.copy(val2, val2Copy); - crosser.cross(val1, val2Copy, collector); + collector.collect(crosser.cross(val1,val2Copy)); + //crosser.cross(val1, val2Copy, collector); } blockVals.reset(); } @@ -254,7 +255,7 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2 T2 val2; final T2 val2Reuse = serializer2.createInstance(); - final GenericCrosser<T1, T2, OT> crosser = this.taskContext.getStub(); + final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub(); final Collector<OT> collector = this.taskContext.getOutputCollector(); // for all blocks @@ -264,7 +265,8 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2 // for all values in the block while (this.running && ((val2 = blockVals.next(val2Reuse)) != null)) { val1Copy = serializer1.copy(val1, val1Copy); - crosser.cross(val1Copy, val2, collector); + collector.collect(crosser.cross(val1Copy, val2)); + //crosser.cross(val1Copy, val2, collector); } blockVals.reset(); } @@ -296,7 +298,7 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2 T2 val2; final T2 val2Reuse = serializer2.createInstance(); - final GenericCrosser<T1, T2, OT> crosser = this.taskContext.getStub(); + final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub(); final Collector<OT> collector = this.taskContext.getOutputCollector(); // for all blocks @@ -304,7 +306,8 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2 // for all values from the spilling side while (this.running && ((val2 = spillVals.next(val2Reuse)) != null)) { val1Copy = serializer1.copy(val1, val1Copy); - crosser.cross(val1Copy, val2, collector); + collector.collect(crosser.cross(val1Copy, val2)); + //crosser.cross(val1Copy, val2, collector); } spillVals.reset(); } @@ -332,7 +335,7 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2 final T2 val2Reuse = serializer2.createInstance(); T2 val2Copy = serializer2.createInstance(); - final GenericCrosser<T1, T2, OT> crosser = this.taskContext.getStub(); + final CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub(); final Collector<OT> collector = this.taskContext.getOutputCollector(); // for all blocks @@ -340,7 +343,8 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<GenericCrosser<T1, T2 // for all values from the spilling side while (this.running && (val1 = spillVals.next(val1Reuse)) != null) { val2Copy = serializer2.copy(val2, val2Copy); - crosser.cross(val1, val2Copy, collector); + collector.collect(crosser.cross(val1, val2Copy)); + //crosser.cross(val1, val2Copy, collector); } spillVals.reset(); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java index 44f22a0..130601b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.operators; -import org.apache.flink.api.common.functions.GenericFlatMap; +import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; @@ -31,20 +31,20 @@ import org.apache.flink.util.MutableObjectIterator; * The MapTask creates an iterator over all key-value pairs of its input and hands that to the <code>map()</code> method * of the MapFunction. * - * @see GenericFlatMap + * @see org.apache.flink.api.common.functions.FlatMapFunction * * @param <IT> The mapper's input data type. * @param <OT> The mapper's output data type. */ -public class FlatMapDriver<IT, OT> implements PactDriver<GenericFlatMap<IT, OT>, OT> { +public class FlatMapDriver<IT, OT> implements PactDriver<FlatMapFunction<IT, OT>, OT> { - private PactTaskContext<GenericFlatMap<IT, OT>, OT> taskContext; + private PactTaskContext<FlatMapFunction<IT, OT>, OT> taskContext; private volatile boolean running; @Override - public void setup(PactTaskContext<GenericFlatMap<IT, OT>, OT> context) { + public void setup(PactTaskContext<FlatMapFunction<IT, OT>, OT> context) { this.taskContext = context; this.running = true; } @@ -55,9 +55,9 @@ public class FlatMapDriver<IT, OT> implements PactDriver<GenericFlatMap<IT, OT>, } @Override - public Class<GenericFlatMap<IT, OT>> getStubType() { + public Class<FlatMapFunction<IT, OT>> getStubType() { @SuppressWarnings("unchecked") - final Class<GenericFlatMap<IT, OT>> clazz = (Class<GenericFlatMap<IT, OT>>) (Class<?>) GenericFlatMap.class; + final Class<FlatMapFunction<IT, OT>> clazz = (Class<FlatMapFunction<IT, OT>>) (Class<?>) FlatMapFunction.class; return clazz; } @@ -75,7 +75,7 @@ public class FlatMapDriver<IT, OT> implements PactDriver<GenericFlatMap<IT, OT>, public void run() throws Exception { // cache references on the stack final MutableObjectIterator<IT> input = this.taskContext.getInput(0); - final GenericFlatMap<IT, OT> function = this.taskContext.getStub(); + final FlatMapFunction<IT, OT> function = this.taskContext.getStub(); final Collector<OT> output = this.taskContext.getOutputCollector(); IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance(); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java index 1d0749c..f786c56 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.flink.api.common.functions.GenericCombine; +import org.apache.flink.api.common.functions.FlatCombineFunction; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.runtime.memorymanager.MemoryManager; @@ -39,12 +39,12 @@ import org.apache.flink.util.MutableObjectIterator; * * @param <T> The data type consumed and produced by the combiner. */ -public class GroupReduceCombineDriver<T> implements PactDriver<GenericCombine<T>, T> { +public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFunction<T>, T> { private static final Log LOG = LogFactory.getLog(GroupReduceCombineDriver.class); - private PactTaskContext<GenericCombine<T>, T> taskContext; + private PactTaskContext<FlatCombineFunction<T>, T> taskContext; private CloseableInputProvider<T> input; @@ -57,7 +57,7 @@ public class GroupReduceCombineDriver<T> implements PactDriver<GenericCombine<T> // ------------------------------------------------------------------------ @Override - public void setup(PactTaskContext<GenericCombine<T>, T> context) { + public void setup(PactTaskContext<FlatCombineFunction<T>, T> context) { this.taskContext = context; this.running = true; } @@ -68,9 +68,9 @@ public class GroupReduceCombineDriver<T> implements PactDriver<GenericCombine<T> } @Override - public Class<GenericCombine<T>> getStubType() { + public Class<FlatCombineFunction<T>> getStubType() { @SuppressWarnings("unchecked") - final Class<GenericCombine<T>> clazz = (Class<GenericCombine<T>>) (Class<?>) GenericCombine.class; + final Class<FlatCombineFunction<T>> clazz = (Class<FlatCombineFunction<T>>) (Class<?>) FlatCombineFunction.class; return clazz; } @@ -111,7 +111,7 @@ public class GroupReduceCombineDriver<T> implements PactDriver<GenericCombine<T> this.serializerFactory.getSerializer(), this.comparator); // cache references on the stack - final GenericCombine<T> stub = this.taskContext.getStub(); + final FlatCombineFunction<T> stub = this.taskContext.getStub(); final Collector<T> output = this.taskContext.getOutputCollector(); // run stub implementation http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java index 1ab080c..960143d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.flink.api.common.functions.GenericGroupReduce; +import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.operators.util.TaskConfig; @@ -37,13 +37,13 @@ import org.apache.flink.util.MutableObjectIterator; * The GroupReduceTask creates a iterator over all records from its input. The iterator returns all records grouped by their * key. The iterator is handed to the <code>reduce()</code> method of the GroupReduceFunction. * - * @see GenericGroupReduce + * @see org.apache.flink.api.common.functions.GroupReduceFunction */ -public class GroupReduceDriver<IT, OT> implements PactDriver<GenericGroupReduce<IT, OT>, OT> { +public class GroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunction<IT, OT>, OT> { private static final Log LOG = LogFactory.getLog(GroupReduceDriver.class); - private PactTaskContext<GenericGroupReduce<IT, OT>, OT> taskContext; + private PactTaskContext<GroupReduceFunction<IT, OT>, OT> taskContext; private MutableObjectIterator<IT> input; @@ -56,7 +56,7 @@ public class GroupReduceDriver<IT, OT> implements PactDriver<GenericGroupReduce< // ------------------------------------------------------------------------ @Override - public void setup(PactTaskContext<GenericGroupReduce<IT, OT>, OT> context) { + public void setup(PactTaskContext<GroupReduceFunction<IT, OT>, OT> context) { this.taskContext = context; this.running = true; } @@ -67,9 +67,9 @@ public class GroupReduceDriver<IT, OT> implements PactDriver<GenericGroupReduce< } @Override - public Class<GenericGroupReduce<IT, OT>> getStubType() { + public Class<GroupReduceFunction<IT, OT>> getStubType() { @SuppressWarnings("unchecked") - final Class<GenericGroupReduce<IT, OT>> clazz = (Class<GenericGroupReduce<IT, OT>>) (Class<?>) GenericGroupReduce.class; + final Class<GroupReduceFunction<IT, OT>> clazz = (Class<GroupReduceFunction<IT, OT>>) (Class<?>) GroupReduceFunction.class; return clazz; } @@ -100,7 +100,7 @@ public class GroupReduceDriver<IT, OT> implements PactDriver<GenericGroupReduce< final KeyGroupedIterator<IT> iter = new KeyGroupedIterator<IT>(this.input, this.serializer, this.comparator); // cache references on the stack - final GenericGroupReduce<IT, OT> stub = this.taskContext.getStub(); + final GroupReduceFunction<IT, OT> stub = this.taskContext.getStub(); final Collector<OT> output = this.taskContext.getOutputCollector(); // run stub implementation http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java index b23b0cd..342f307 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.operators; -import org.apache.flink.api.common.functions.GenericJoiner; +import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeComparatorFactory; import org.apache.flink.api.common.typeutils.TypePairComparator; @@ -32,9 +32,9 @@ import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; -public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettablePactDriver<GenericJoiner<IT1, IT2, OT>, OT> { +public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettablePactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> { - private PactTaskContext<GenericJoiner<IT1, IT2, OT>, OT> taskContext; + private PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext; private CompactingHashTable<IT1> hashTable; @@ -50,7 +50,7 @@ public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettableP // -------------------------------------------------------------------------------------------- @Override - public void setup(PactTaskContext<GenericJoiner<IT1, IT2, OT>, OT> context) { + public void setup(PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> context) { this.taskContext = context; this.running = true; } @@ -61,9 +61,9 @@ public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettableP } @Override - public Class<GenericJoiner<IT1, IT2, OT>> getStubType() { + public Class<FlatJoinFunction<IT1, IT2, OT>> getStubType() { @SuppressWarnings("unchecked") - final Class<GenericJoiner<IT1, IT2, OT>> clazz = (Class<GenericJoiner<IT1, IT2, OT>>) (Class<?>) GenericJoiner.class; + final Class<FlatJoinFunction<IT1, IT2, OT>> clazz = (Class<FlatJoinFunction<IT1, IT2, OT>>) (Class<?>) FlatJoinFunction.class; return clazz; } @@ -126,7 +126,7 @@ public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettableP @Override public void run() throws Exception { - final GenericJoiner<IT1, IT2, OT> joinFunction = taskContext.getStub(); + final FlatJoinFunction<IT1, IT2, OT> joinFunction = taskContext.getStub(); final Collector<OT> collector = taskContext.getOutputCollector(); IT1 buildSideRecord = this.solutionSideRecord; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java index 4fa5c5a..c38a81a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.operators; -import org.apache.flink.api.common.functions.GenericJoiner; +import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeComparatorFactory; import org.apache.flink.api.common.typeutils.TypePairComparator; @@ -32,9 +32,9 @@ import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; -public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements ResettablePactDriver<GenericJoiner<IT1, IT2, OT>, OT> { +public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements ResettablePactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> { - private PactTaskContext<GenericJoiner<IT1, IT2, OT>, OT> taskContext; + private PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext; private CompactingHashTable<IT2> hashTable; @@ -50,7 +50,7 @@ public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resettable // -------------------------------------------------------------------------------------------- @Override - public void setup(PactTaskContext<GenericJoiner<IT1, IT2, OT>, OT> context) { + public void setup(PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> context) { this.taskContext = context; this.running = true; } @@ -61,9 +61,9 @@ public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resettable } @Override - public Class<GenericJoiner<IT1, IT2, OT>> getStubType() { + public Class<FlatJoinFunction<IT1, IT2, OT>> getStubType() { @SuppressWarnings("unchecked") - final Class<GenericJoiner<IT1, IT2, OT>> clazz = (Class<GenericJoiner<IT1, IT2, OT>>) (Class<?>) GenericJoiner.class; + final Class<FlatJoinFunction<IT1, IT2, OT>> clazz = (Class<FlatJoinFunction<IT1, IT2, OT>>) (Class<?>) FlatJoinFunction.class; return clazz; } @@ -126,7 +126,7 @@ public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resettable @Override public void run() throws Exception { - final GenericJoiner<IT1, IT2, OT> joinFunction = taskContext.getStub(); + final FlatJoinFunction<IT1, IT2, OT> joinFunction = taskContext.getStub(); final Collector<OT> collector = taskContext.getOutputCollector(); IT2 buildSideRecord = this.solutionSideRecord; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java index fe1e0c1..89fbf4d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.operators; -import org.apache.flink.api.common.functions.GenericMap; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; @@ -31,20 +31,20 @@ import org.apache.flink.util.MutableObjectIterator; * The MapTask creates an iterator over all key-value pairs of its input and hands that to the <code>map()</code> method * of the MapFunction. * - * @see GenericMap + * @see org.apache.flink.api.common.functions.MapFunction * * @param <IT> The mapper's input data type. * @param <OT> The mapper's output data type. */ -public class MapDriver<IT, OT> implements PactDriver<GenericMap<IT, OT>, OT> { +public class MapDriver<IT, OT> implements PactDriver<MapFunction<IT, OT>, OT> { - private PactTaskContext<GenericMap<IT, OT>, OT> taskContext; + private PactTaskContext<MapFunction<IT, OT>, OT> taskContext; private volatile boolean running; @Override - public void setup(PactTaskContext<GenericMap<IT, OT>, OT> context) { + public void setup(PactTaskContext<MapFunction<IT, OT>, OT> context) { this.taskContext = context; this.running = true; } @@ -55,9 +55,9 @@ public class MapDriver<IT, OT> implements PactDriver<GenericMap<IT, OT>, OT> { } @Override - public Class<GenericMap<IT, OT>> getStubType() { + public Class<MapFunction<IT, OT>> getStubType() { @SuppressWarnings("unchecked") - final Class<GenericMap<IT, OT>> clazz = (Class<GenericMap<IT, OT>>) (Class<?>) GenericMap.class; + final Class<MapFunction<IT, OT>> clazz = (Class<MapFunction<IT, OT>>) (Class<?>) MapFunction.class; return clazz; } @@ -75,7 +75,7 @@ public class MapDriver<IT, OT> implements PactDriver<GenericMap<IT, OT>, OT> { public void run() throws Exception { // cache references on the stack final MutableObjectIterator<IT> input = this.taskContext.getInput(0); - final GenericMap<IT, OT> function = this.taskContext.getStub(); + final MapFunction<IT, OT> function = this.taskContext.getStub(); final Collector<OT> output = this.taskContext.getOutputCollector(); IT record = this.taskContext.<IT>getInputSerializer(0).getSerializer().createInstance(); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java index a29afa2..e205f1e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.flink.api.common.functions.GenericJoiner; +import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparatorFactory; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -42,13 +42,13 @@ import org.apache.flink.util.MutableObjectIterator; * The MatchTask matches all pairs of records that share the same key and come from different inputs. Each pair of * matching records is handed to the <code>match()</code> method of the JoinFunction. * - * @see GenericJoiner + * @see org.apache.flink.api.common.functions.FlatJoinFunction */ -public class MatchDriver<IT1, IT2, OT> implements PactDriver<GenericJoiner<IT1, IT2, OT>, OT> { +public class MatchDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> { protected static final Log LOG = LogFactory.getLog(MatchDriver.class); - protected PactTaskContext<GenericJoiner<IT1, IT2, OT>, OT> taskContext; + protected PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext; private volatile JoinTaskIterator<IT1, IT2, OT> matchIterator; // the iterator that does the actual matching @@ -57,7 +57,7 @@ public class MatchDriver<IT1, IT2, OT> implements PactDriver<GenericJoiner<IT1, // ------------------------------------------------------------------------ @Override - public void setup(PactTaskContext<GenericJoiner<IT1, IT2, OT>, OT> context) { + public void setup(PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> context) { this.taskContext = context; this.running = true; } @@ -68,9 +68,9 @@ public class MatchDriver<IT1, IT2, OT> implements PactDriver<GenericJoiner<IT1, } @Override - public Class<GenericJoiner<IT1, IT2, OT>> getStubType() { + public Class<FlatJoinFunction<IT1, IT2, OT>> getStubType() { @SuppressWarnings("unchecked") - final Class<GenericJoiner<IT1, IT2, OT>> clazz = (Class<GenericJoiner<IT1, IT2, OT>>) (Class<?>) GenericJoiner.class; + final Class<FlatJoinFunction<IT1, IT2, OT>> clazz = (Class<FlatJoinFunction<IT1, IT2, OT>>) (Class<?>) FlatJoinFunction.class; return clazz; } @@ -141,7 +141,7 @@ public class MatchDriver<IT1, IT2, OT> implements PactDriver<GenericJoiner<IT1, @Override public void run() throws Exception { - final GenericJoiner<IT1, IT2, OT> matchStub = this.taskContext.getStub(); + final FlatJoinFunction<IT1, IT2, OT> matchStub = this.taskContext.getStub(); final Collector<OT> collector = this.taskContext.getOutputCollector(); final JoinTaskIterator<IT1, IT2, OT> matchIterator = this.matchIterator; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java index ffe27e6..33d8a18 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.operators; -import org.apache.flink.api.common.functions.AbstractFunction; +import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; @@ -28,15 +28,15 @@ import org.apache.flink.util.MutableObjectIterator; * * @param <T> The data type. */ -public class NoOpDriver<T> implements PactDriver<AbstractFunction, T> { +public class NoOpDriver<T> implements PactDriver<AbstractRichFunction, T> { - private PactTaskContext<AbstractFunction, T> taskContext; + private PactTaskContext<AbstractRichFunction, T> taskContext; private volatile boolean running; @Override - public void setup(PactTaskContext<AbstractFunction, T> context) { + public void setup(PactTaskContext<AbstractRichFunction, T> context) { this.taskContext = context; this.running = true; } @@ -47,7 +47,7 @@ public class NoOpDriver<T> implements PactDriver<AbstractFunction, T> { } @Override - public Class<AbstractFunction> getStubType() { + public Class<AbstractRichFunction> getStubType() { return null; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java index 4d72085..87cea30 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java @@ -24,7 +24,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.flink.api.common.functions.GenericReduce; +import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; @@ -44,7 +44,7 @@ import org.apache.flink.util.MutableObjectIterator; * * @param <T> The data type consumed and produced by the combiner. */ -public class ReduceCombineDriver<T> implements PactDriver<GenericReduce<T>, T> { +public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T> { private static final Log LOG = LogFactory.getLog(ReduceCombineDriver.class); @@ -52,13 +52,13 @@ public class ReduceCombineDriver<T> implements PactDriver<GenericReduce<T>, T> { private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32; - private PactTaskContext<GenericReduce<T>, T> taskContext; + private PactTaskContext<ReduceFunction<T>, T> taskContext; private TypeSerializer<T> serializer; private TypeComparator<T> comparator; - private GenericReduce<T> reducer; + private ReduceFunction<T> reducer; private Collector<T> output; @@ -75,7 +75,7 @@ public class ReduceCombineDriver<T> implements PactDriver<GenericReduce<T>, T> { // ------------------------------------------------------------------------ @Override - public void setup(PactTaskContext<GenericReduce<T>, T> context) { + public void setup(PactTaskContext<ReduceFunction<T>, T> context) { this.taskContext = context; this.running = true; } @@ -86,9 +86,9 @@ public class ReduceCombineDriver<T> implements PactDriver<GenericReduce<T>, T> { } @Override - public Class<GenericReduce<T>> getStubType() { + public Class<ReduceFunction<T>> getStubType() { @SuppressWarnings("unchecked") - final Class<GenericReduce<T>> clazz = (Class<GenericReduce<T>>) (Class<?>) GenericReduce.class; + final Class<ReduceFunction<T>> clazz = (Class<ReduceFunction<T>>) (Class<?>) ReduceFunction.class; return clazz; } @@ -168,7 +168,7 @@ public class ReduceCombineDriver<T> implements PactDriver<GenericReduce<T>, T> { final TypeSerializer<T> serializer = this.serializer; final TypeComparator<T> comparator = this.comparator; - final GenericReduce<T> function = this.reducer; + final ReduceFunction<T> function = this.reducer; final Collector<T> output = this.output; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java index a7e9305..9495cdc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.flink.api.common.functions.GenericReduce; +import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.operators.util.TaskConfig; @@ -36,13 +36,13 @@ import org.apache.flink.util.MutableObjectIterator; * The ReduceTask creates a iterator over all records from its input. The iterator returns all records grouped by their * key. The iterator is handed to the <code>reduce()</code> method of the ReduceFunction. * - * @see GenericReduce + * @see org.apache.flink.api.common.functions.ReduceFunction */ -public class ReduceDriver<T> implements PactDriver<GenericReduce<T>, T> { +public class ReduceDriver<T> implements PactDriver<ReduceFunction<T>, T> { private static final Log LOG = LogFactory.getLog(ReduceDriver.class); - private PactTaskContext<GenericReduce<T>, T> taskContext; + private PactTaskContext<ReduceFunction<T>, T> taskContext; private MutableObjectIterator<T> input; @@ -55,7 +55,7 @@ public class ReduceDriver<T> implements PactDriver<GenericReduce<T>, T> { // ------------------------------------------------------------------------ @Override - public void setup(PactTaskContext<GenericReduce<T>, T> context) { + public void setup(PactTaskContext<ReduceFunction<T>, T> context) { this.taskContext = context; this.running = true; } @@ -66,9 +66,9 @@ public class ReduceDriver<T> implements PactDriver<GenericReduce<T>, T> { } @Override - public Class<GenericReduce<T>> getStubType() { + public Class<ReduceFunction<T>> getStubType() { @SuppressWarnings("unchecked") - final Class<GenericReduce<T>> clazz = (Class<GenericReduce<T>>) (Class<?>) GenericReduce.class; + final Class<ReduceFunction<T>> clazz = (Class<ReduceFunction<T>>) (Class<?>) ReduceFunction.class; return clazz; } @@ -101,7 +101,7 @@ public class ReduceDriver<T> implements PactDriver<GenericReduce<T>, T> { final TypeSerializer<T> serializer = this.serializer; final TypeComparator<T> comparator = this.comparator; - final GenericReduce<T> function = this.taskContext.getStub(); + final ReduceFunction<T> function = this.taskContext.getStub(); final Collector<T> output = this.taskContext.getOutputCollector(); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java index 45fe05a..c8f217c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java @@ -24,8 +24,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.api.common.distributions.DataDistribution; +import org.apache.flink.api.common.functions.FlatCombineFunction; import org.apache.flink.api.common.functions.Function; -import org.apache.flink.api.common.functions.GenericCombine; +import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeComparatorFactory; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -497,7 +498,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i if (this.stub != null) { try { Configuration stubConfig = this.config.getStubParameters(); - this.stub.open(stubConfig); + FunctionUtils.openFunction(this.stub, stubConfig); stubOpen = true; } catch (Throwable t) { @@ -510,7 +511,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i // close. We close here such that a regular close throwing an exception marks a task as failed. if (this.running && this.stub != null) { - this.stub.close(); + FunctionUtils.closeFunction(this.stub); stubOpen = false; } @@ -525,15 +526,17 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i // modify accumulators.ll; if (this.stub != null) { // collect the counters from the stub - Map<String, Accumulator<?,?>> accumulators = this.stub.getRuntimeContext().getAllAccumulators(); - RegularPactTask.reportAndClearAccumulators(getEnvironment(), accumulators, this.chainedTasks); + if (FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext) != null) { + Map<String, Accumulator<?, ?>> accumulators = FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext).getAllAccumulators(); + RegularPactTask.reportAndClearAccumulators(getEnvironment(), accumulators, this.chainedTasks); + } } } catch (Exception ex) { // close the input, but do not report any exceptions, since we already have another root cause if (stubOpen) { try { - this.stub.close(); + FunctionUtils.closeFunction(this.stub); } catch (Throwable t) {} } @@ -582,9 +585,12 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i // tasks. Type conflicts can occur here if counters with same name but // different type were used. + for (ChainedDriver<?, ?> chainedTask : chainedTasks) { - Map<String, Accumulator<?, ?>> chainedAccumulators = chainedTask.getStub().getRuntimeContext().getAllAccumulators(); - AccumulatorHelper.mergeInto(accumulators, chainedAccumulators); + if (FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null) != null) { + Map<String, Accumulator<?, ?>> chainedAccumulators = FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null).getAllAccumulators(); + AccumulatorHelper.mergeInto(accumulators, chainedAccumulators); + } } // Don't report if the UDF didn't collect any accumulators @@ -607,7 +613,9 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i // done before sending AccumulatorHelper.resetAndClearAccumulators(accumulators); for (ChainedDriver<?, ?> chainedTask : chainedTasks) { - AccumulatorHelper.resetAndClearAccumulators(chainedTask.getStub().getRuntimeContext().getAllAccumulators()); + if (FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null) != null) { + AccumulatorHelper.resetAndClearAccumulators(FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null).getAllAccumulators()); + } } } @@ -693,7 +701,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i throw new RuntimeException("The class '" + stub.getClass().getName() + "' is not a subclass of '" + stubSuperClass.getName() + "' as is required."); } - stub.setRuntimeContext(this.runtimeUdfContext); + FunctionUtils.setFunctionRuntimeContext(stub, this.runtimeUdfContext); return stub; } catch (ClassCastException ccex) { @@ -988,13 +996,13 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i e.getMessage() == null ? "." : ": " + e.getMessage(), e); } - if (!(localStub instanceof GenericCombine)) { + if (!(localStub instanceof FlatCombineFunction)) { throw new IllegalStateException("Performing combining sort outside a reduce task!"); } @SuppressWarnings({ "rawtypes", "unchecked" }) CombiningUnilateralSortMerger<?> cSorter = new CombiningUnilateralSortMerger( - (GenericCombine) localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum], + (FlatCombineFunction) localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum], this, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum), this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum), this.config.getSpillingThresholdInput(inputNum)); @@ -1375,7 +1383,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i // -------------------------------------------------------------------------------------------- /** - * Opens the given stub using its {@link Function#open(Configuration)} method. If the open call produces + * Opens the given stub using its {@link org.apache.flink.api.common.functions.RichFunction#open(Configuration)} method. If the open call produces * an exception, a new exception with a standard error message is created, using the encountered exception * as its cause. * @@ -1386,14 +1394,14 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i */ public static void openUserCode(Function stub, Configuration parameters) throws Exception { try { - stub.open(parameters); + FunctionUtils.openFunction(stub, parameters); } catch (Throwable t) { throw new Exception("The user defined 'open(Configuration)' method in " + stub.getClass().toString() + " caused an exception: " + t.getMessage(), t); } } /** - * Closes the given stub using its {@link Function#close()} method. If the close call produces + * Closes the given stub using its {@link org.apache.flink.api.common.functions.RichFunction#close()} method. If the close call produces * an exception, a new exception with a standard error message is created, using the encountered exception * as its cause. * @@ -1403,7 +1411,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i */ public static void closeUserCode(Function stub) throws Exception { try { - stub.close(); + FunctionUtils.closeFunction(stub); } catch (Throwable t) { throw new Exception("The user defined 'close()' method caused an exception: " + t.getMessage(), t); } @@ -1505,4 +1513,6 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i } return a; } + + } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java index 3f7ad61..3dbab78 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.operators.chaining; -import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.functions.GenericCollectorMap; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; @@ -61,7 +61,7 @@ public class ChainedCollectorMapDriver<IT, OT> extends ChainedDriver<IT, OT> { // -------------------------------------------------------------------------------------------- - public Function getStub() { + public RichFunction getStub() { return this.mapper; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java index cca6838..db134a8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java @@ -19,25 +19,26 @@ package org.apache.flink.runtime.operators.chaining; +import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.Function; -import org.apache.flink.api.common.functions.GenericFlatMap; +import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.operators.RegularPactTask; public class ChainedFlatMapDriver<IT, OT> extends ChainedDriver<IT, OT> { - private GenericFlatMap<IT, OT> mapper; + private FlatMapFunction<IT, OT> mapper; // -------------------------------------------------------------------------------------------- @Override public void setup(AbstractInvokable parent) { @SuppressWarnings("unchecked") - final GenericFlatMap<IT, OT> mapper = - RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, GenericFlatMap.class); + final FlatMapFunction<IT, OT> mapper = + RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, FlatMapFunction.class); this.mapper = mapper; - mapper.setRuntimeContext(getUdfRuntimeContext()); + FunctionUtils.setFunctionRuntimeContext(mapper, getUdfRuntimeContext()); } @Override @@ -54,8 +55,9 @@ public class ChainedFlatMapDriver<IT, OT> extends ChainedDriver<IT, OT> { @Override public void cancelTask() { try { - this.mapper.close(); - } catch (Throwable t) { + FunctionUtils.closeFunction(this.mapper); + } + catch (Throwable t) { } } @@ -84,4 +86,5 @@ public class ChainedFlatMapDriver<IT, OT> extends ChainedDriver<IT, OT> { public void close() { this.outputCollector.close(); } + } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java index 3ae324c..a7b1048 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java @@ -20,24 +20,25 @@ package org.apache.flink.runtime.operators.chaining; import org.apache.flink.api.common.functions.Function; -import org.apache.flink.api.common.functions.GenericMap; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.operators.RegularPactTask; public class ChainedMapDriver<IT, OT> extends ChainedDriver<IT, OT> { - private GenericMap<IT, OT> mapper; + private MapFunction<IT, OT> mapper; // -------------------------------------------------------------------------------------------- @Override public void setup(AbstractInvokable parent) { @SuppressWarnings("unchecked") - final GenericMap<IT, OT> mapper = - RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, GenericMap.class); + final MapFunction<IT, OT> mapper = + RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, MapFunction.class); this.mapper = mapper; - mapper.setRuntimeContext(getUdfRuntimeContext()); + FunctionUtils.setFunctionRuntimeContext(mapper, getUdfRuntimeContext()); } @Override @@ -54,7 +55,7 @@ public class ChainedMapDriver<IT, OT> extends ChainedDriver<IT, OT> { @Override public void cancelTask() { try { - this.mapper.close(); + FunctionUtils.closeFunction(this.mapper); } catch (Throwable t) { } } @@ -84,4 +85,5 @@ public class ChainedMapDriver<IT, OT> extends ChainedDriver<IT, OT> { public void close() { this.outputCollector.close(); } + } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java index 76d860b..3a42469 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedTerminationCriterionDriver.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.operators.chaining; -import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.functions.IterationRuntimeContext; import org.apache.flink.api.common.operators.base.BulkIterationBase; import org.apache.flink.api.common.operators.base.BulkIterationBase.TerminationCriterionAggregator; @@ -47,7 +47,7 @@ public class ChainedTerminationCriterionDriver<IT, OT> extends ChainedDriver<IT, // -------------------------------------------------------------------------------------------- - public Function getStub() { + public RichFunction getStub() { return null; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java index d5ce0a7..ffac151 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java @@ -22,8 +22,9 @@ package org.apache.flink.runtime.operators.chaining; import java.io.IOException; import java.util.List; +import org.apache.flink.api.common.functions.FlatCombineFunction; import org.apache.flink.api.common.functions.Function; -import org.apache.flink.api.common.functions.GenericCombine; +import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeComparatorFactory; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -51,7 +52,7 @@ public class SynchronousChainedCombineDriver<T> extends ChainedDriver<T, T> { private InMemorySorter<T> sorter; - private GenericCombine<T> combiner; + private FlatCombineFunction<T> combiner; private TypeSerializer<T> serializer; @@ -72,10 +73,10 @@ public class SynchronousChainedCombineDriver<T> extends ChainedDriver<T, T> { this.parent = parent; @SuppressWarnings("unchecked") - final GenericCombine<T> combiner = - RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, GenericCombine.class); + final FlatCombineFunction<T> combiner = + RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, FlatCombineFunction.class); this.combiner = combiner; - combiner.setRuntimeContext(getUdfRuntimeContext()); + FunctionUtils.setFunctionRuntimeContext(combiner, getUdfRuntimeContext()); } @Override @@ -185,7 +186,7 @@ public class SynchronousChainedCombineDriver<T> extends ChainedDriver<T, T> { this.comparator); // cache references on the stack - final GenericCombine<T> stub = this.combiner; + final FlatCombineFunction<T> stub = this.combiner; final Collector<T> output = this.outputCollector; // run stub implementation http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildFirstHashMatchIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildFirstHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildFirstHashMatchIterator.java index 65af050..0daf69b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildFirstHashMatchIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildFirstHashMatchIterator.java @@ -22,7 +22,7 @@ package org.apache.flink.runtime.operators.hash; import java.io.IOException; import java.util.List; -import org.apache.flink.api.common.functions.GenericJoiner; +import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -101,7 +101,7 @@ public class BuildFirstHashMatchIterator<V1, V2, O> implements JoinTaskIterator< } @Override - public final boolean callWithNextKey(GenericJoiner<V1, V2, O> matchFunction, Collector<O> collector) + public final boolean callWithNextKey(FlatJoinFunction<V1, V2, O> matchFunction, Collector<O> collector) throws Exception { if (this.hashJoin.nextRecord()) http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildSecondHashMatchIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildSecondHashMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildSecondHashMatchIterator.java index e124201..70e5afb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildSecondHashMatchIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/BuildSecondHashMatchIterator.java @@ -22,7 +22,7 @@ package org.apache.flink.runtime.operators.hash; import java.io.IOException; import java.util.List; -import org.apache.flink.api.common.functions.GenericJoiner; +import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -100,7 +100,7 @@ public class BuildSecondHashMatchIterator<V1, V2, O> implements JoinTaskIterator } @Override - public boolean callWithNextKey(GenericJoiner<V1, V2, O> matchFunction, Collector<O> collector) + public boolean callWithNextKey(FlatJoinFunction<V1, V2, O> matchFunction, Collector<O> collector) throws Exception { if (this.hashJoin.nextRecord()) http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java index 06d0ac7..da8a11b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java @@ -29,7 +29,8 @@ import java.util.Queue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.flink.api.common.functions.GenericCombine; +import org.apache.flink.api.common.functions.FlatCombineFunction; +import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; @@ -71,7 +72,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> { */ private static final Log LOG = LogFactory.getLog(CombiningUnilateralSortMerger.class); - private final GenericCombine<E> combineStub; // the user code stub that does the combining + private final FlatCombineFunction<E> combineStub; // the user code stub that does the combining private Configuration udfConfig; @@ -101,7 +102,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> { * @throws MemoryAllocationException Thrown, if not enough memory can be obtained from the memory manager to * perform the sort. */ - public CombiningUnilateralSortMerger(GenericCombine<E> combineStub, MemoryManager memoryManager, IOManager ioManager, + public CombiningUnilateralSortMerger(FlatCombineFunction<E> combineStub, MemoryManager memoryManager, IOManager ioManager, MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, double memoryFraction, int maxNumFileHandles, float startSpillingFraction) @@ -133,7 +134,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> { * @throws MemoryAllocationException Thrown, if not enough memory can be obtained from the memory manager to * perform the sort. */ - public CombiningUnilateralSortMerger(GenericCombine<E> combineStub, MemoryManager memoryManager, IOManager ioManager, + public CombiningUnilateralSortMerger(FlatCombineFunction<E> combineStub, MemoryManager memoryManager, IOManager ioManager, MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, double memoryFraction, int numSortBuffers, int maxNumFileHandles, @@ -254,12 +255,12 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> { // ------------------- Spilling Phase ------------------------ - final GenericCombine<E> combineStub = CombiningUnilateralSortMerger.this.combineStub; + final FlatCombineFunction<E> combineStub = CombiningUnilateralSortMerger.this.combineStub; // now that we are actually spilling, take the combiner, and open it try { - Configuration conf = CombiningUnilateralSortMerger.this.udfConfig; - combineStub.open(conf == null ? new Configuration() : conf); + Configuration conf = CombiningUnilateralSortMerger.this.udfConfig; + FunctionUtils.openFunction (combineStub, (conf == null ? new Configuration() : conf)); } catch (Throwable t) { throw new IOException("The user-defined combiner failed in its 'open()' method.", t); @@ -380,7 +381,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> { // close the user code try { - combineStub.close(); + FunctionUtils.closeFunction(combineStub); } catch (Throwable t) { throw new IOException("The user-defined combiner failed in its 'close()' method.", t); @@ -466,7 +467,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> { this.memManager.getPageSize()); final WriterCollector<E> collector = new WriterCollector<E>(output, this.serializer); - final GenericCombine<E> combineStub = CombiningUnilateralSortMerger.this.combineStub; + final FlatCombineFunction<E> combineStub = CombiningUnilateralSortMerger.this.combineStub; // combine and write to disk try { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java index 308e333..2ed75ae 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeMatchIterator.java @@ -25,7 +25,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.flink.api.common.functions.GenericJoiner; +import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -151,7 +151,7 @@ public class MergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O * @see org.apache.flink.runtime.operators.util.JoinTaskIterator#callWithNextKey() */ @Override - public boolean callWithNextKey(final GenericJoiner<T1, T2, O> matchFunction, final Collector<O> collector) + public boolean callWithNextKey(final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector) throws Exception { if (!this.iterator1.nextKey() || !this.iterator2.nextKey()) { @@ -234,7 +234,7 @@ public class MergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O * @throws Exception Forwards all exceptions thrown by the stub. */ private void crossFirst1withNValues(final T1 val1, final T2 firstValN, - final Iterator<T2> valsN, final GenericJoiner<T1, T2, O> matchFunction, final Collector<O> collector) + final Iterator<T2> valsN, final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector) throws Exception { this.copy1 = this.serializer1.copy(val1, this.copy1); @@ -267,7 +267,7 @@ public class MergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O * @throws Exception Forwards all exceptions thrown by the stub. */ private void crossSecond1withNValues(T2 val1, T1 firstValN, - Iterator<T1> valsN, GenericJoiner<T1, T2, O> matchFunction, Collector<O> collector) + Iterator<T1> valsN, FlatJoinFunction<T1, T2, O> matchFunction, Collector<O> collector) throws Exception { this.copy2 = this.serializer2.copy(val1, this.copy2); @@ -280,7 +280,7 @@ public class MergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O if (valsN.hasNext()) { this.copy2 = this.serializer2.copy(val1, this.copy2); - matchFunction.join(nRec, this.copy2, collector); + matchFunction.join(nRec,this.copy2,collector); } else { matchFunction.join(nRec, val1, collector); more = false; @@ -297,7 +297,7 @@ public class MergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O */ private void crossMwithNValues(final T1 firstV1, Iterator<T1> spillVals, final T2 firstV2, final Iterator<T2> blockVals, - final GenericJoiner<T1, T2, O> matchFunction, final Collector<O> collector) + final FlatJoinFunction<T1, T2, O> matchFunction, final Collector<O> collector) throws Exception { // ================================================== @@ -411,7 +411,7 @@ public class MergeMatchIterator<T1, T2, O> implements JoinTaskIterator<T1, T2, O // get instances of key and block value final T2 nextBlockVal = this.blockIt.next(); this.copy1 = this.serializer1.copy(nextSpillVal, this.copy1); - matchFunction.join(this.copy1, nextBlockVal, collector); + matchFunction.join(this.copy1, nextBlockVal, collector); } // reset block iterator http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/JoinTaskIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/JoinTaskIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/JoinTaskIterator.java index 6bcb6ee..3ed647d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/JoinTaskIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/JoinTaskIterator.java @@ -22,7 +22,7 @@ package org.apache.flink.runtime.operators.util; import java.io.IOException; -import org.apache.flink.api.common.functions.GenericJoiner; +import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.runtime.memorymanager.MemoryAllocationException; import org.apache.flink.util.Collector; @@ -60,7 +60,7 @@ public interface JoinTaskIterator<V1, V2, O> * @return True, if a next key exists, false if no more keys exist. * @throws Exception Exceptions from the user code are forwarded. */ - boolean callWithNextKey(GenericJoiner<V1, V2, O> matchFunction, Collector<O> collector) throws Exception; + boolean callWithNextKey(FlatJoinFunction<V1, V2, O> matchFunction, Collector<O> collector) throws Exception; /** * Aborts the matching process. This extra abort method is supplied, because a significant time may pass while http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java index e91e100..3894233 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java @@ -23,13 +23,10 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.flink.api.common.functions.GenericJoiner; +import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.java.record.functions.JoinFunction; import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator; import org.apache.flink.api.java.typeutils.runtime.record.RecordPairComparatorFactory; -import org.apache.flink.runtime.operators.BuildFirstCachedMatchDriver; -import org.apache.flink.runtime.operators.BuildSecondCachedMatchDriver; -import org.apache.flink.runtime.operators.DriverStrategy; import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator; import org.apache.flink.runtime.operators.testutils.DriverTestBase; import org.apache.flink.runtime.operators.testutils.ExpectedTestException; @@ -43,7 +40,7 @@ import org.apache.flink.util.Collector; import org.junit.Assert; import org.junit.Test; -public class CachedMatchTaskTest extends DriverTestBase<GenericJoiner<Record, Record, Record>> +public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Record, Record>> { private static final long HASH_MEM = 6*1024*1024; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/22b24f20/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java index 46717be..5551485 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java @@ -23,12 +23,9 @@ import java.util.Iterator; import org.junit.Assert; -import org.apache.flink.api.common.functions.GenericCoGrouper; -import org.apache.flink.api.java.record.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator; import org.apache.flink.api.java.typeutils.runtime.record.RecordPairComparatorFactory; -import org.apache.flink.runtime.operators.CoGroupDriver; -import org.apache.flink.runtime.operators.DriverStrategy; import org.apache.flink.runtime.operators.testutils.DriverTestBase; import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator; import org.apache.flink.types.IntValue; @@ -37,7 +34,7 @@ import org.apache.flink.types.Record; import org.apache.flink.util.Collector; import org.junit.Test; -public class CoGroupTaskExternalITCase extends DriverTestBase<GenericCoGrouper<Record, Record, Record>> +public class CoGroupTaskExternalITCase extends DriverTestBase<CoGroupFunction<Record, Record, Record>> { private static final long SORT_MEM = 3*1024*1024; @@ -87,7 +84,7 @@ public class CoGroupTaskExternalITCase extends DriverTestBase<GenericCoGrouper<R Assert.assertEquals("Wrong result set size.", expCnt, this.output.getNumberOfRecords()); } - public static final class MockCoGroupStub extends CoGroupFunction { + public static final class MockCoGroupStub extends org.apache.flink.api.java.record.functions.CoGroupFunction { private static final long serialVersionUID = 1L; private final Record res = new Record();
