[scala] [streaming] Temporal join operator added
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/555837cd Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/555837cd Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/555837cd Branch: refs/heads/master Commit: 555837cd6da98b0311a7f752a3b2523f6efbf6a1 Parents: 1c87d8b Author: Gyula Fora <[email protected]> Authored: Sat Dec 20 18:12:19 2014 +0100 Committer: Gyula Fora <[email protected]> Committed: Fri Jan 2 18:34:38 2015 +0100 ---------------------------------------------------------------------- .../flink/streaming/api/JobGraphBuilder.java | 9 + .../api/datastream/ConnectedDataStream.java | 4 +- .../streaming/api/datastream/DataStream.java | 15 +- .../api/datastream/GroupedDataStream.java | 2 +- .../datastream/SingleOutputStreamOperator.java | 10 +- .../api/datastream/StreamJoinOperator.java | 118 +++++++----- .../api/datastream/StreamProjection.java | 2 +- .../streaming/util/keys/FieldsKeySelector.java | 15 +- .../streaming/api/WindowCrossJoinTest.java | 2 +- .../streaming/examples/join/WindowJoin.java | 36 +++- .../flink/api/scala/streaming/DataStream.scala | 6 +- .../api/scala/streaming/FieldsKeySelector.scala | 29 +++ .../scala/streaming/StreamJoinOperator.scala | 188 +++++++++++++++++++ .../scala/streaming/WindowedDataStream.scala | 2 +- 14 files changed, 359 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/555837cd/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java index f358de9..c826274 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java @@ -505,6 +505,15 @@ public class JobGraphBuilder { } + public <IN, OUT> void setInvokable(String id, StreamInvokable<IN, OUT> invokableObject) { + invokableObjects.put(id, invokableObject); + } + + public <OUT> void setOutType(String id, TypeInformation<OUT> outType) { + StreamRecordSerializer<OUT> serializer = new StreamRecordSerializer<OUT>(outType); + typeSerializersOut1.put(id, serializer); + } + /** * Sets TypeSerializerWrapper from one vertex to another, used with some * sinks. http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/555837cd/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java index 65a6c37..39b6460 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java @@ -539,7 +539,7 @@ public class ConnectedDataStream<IN1, IN2> { return invokable; } - protected <OUT> SingleOutputStreamOperator<OUT, ?> addGeneralWindowCombine( + public <OUT> SingleOutputStreamOperator<OUT, ?> addGeneralWindowCombine( CoWindowFunction<IN1, IN2, OUT> coWindowFunction, TypeInformation<OUT> outTypeInfo, long windowSize, long slideInterval, TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) { @@ -550,7 +550,7 @@ public class ConnectedDataStream<IN1, IN2> { if (slideInterval < 1) { throw new IllegalArgumentException("Slide interval must be positive"); } - + return addCoFunction("coWindowReduce", outTypeInfo, new CoWindowInvokable<IN1, IN2, OUT>( clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2)); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/555837cd/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index a236312..2a0b673 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -92,7 +92,8 @@ public class DataStream<OUT> { protected List<String> userDefinedNames; protected boolean selectAll; protected StreamPartitioner<OUT> partitioner; - protected final TypeInformation<OUT> typeInfo; + @SuppressWarnings("rawtypes") + protected TypeInformation typeInfo; protected List<DataStream<OUT>> mergedStreams; protected final JobGraphBuilder jobGraphBuilder; @@ -175,10 +176,18 @@ public class DataStream<OUT> { * * @return The type of the datastream. */ + @SuppressWarnings("unchecked") public TypeInformation<OUT> getType() { return this.typeInfo; } + @SuppressWarnings("unchecked") + public <R> DataStream<R> setType(TypeInformation<R> outType) { + jobGraphBuilder.setOutType(id, outType); + typeInfo = outType; + return (DataStream<R>) this; + } + public <F> F clean(F f) { if (getExecutionEnvironment().getConfig().isClosureCleanerEnabled()) { ClosureCleaner.clean(f, true); @@ -979,7 +988,7 @@ public class DataStream<OUT> { StreamReduceInvokable<OUT> invokable = new StreamReduceInvokable<OUT>(aggregate); - SingleOutputStreamOperator<OUT, ?> returnStream = transform("reduce", typeInfo, invokable); + SingleOutputStreamOperator<OUT, ?> returnStream = transform("reduce", getType(), invokable); return returnStream; } @@ -1077,7 +1086,7 @@ public class DataStream<OUT> { */ public DataStreamSink<OUT> addSink(SinkFunction<OUT> sinkFunction) { - DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink", typeInfo); + DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink", getType()); jobGraphBuilder.addStreamVertex(returnStream.getId(), new SinkInvokable<OUT>( clean(sinkFunction)), getType(), null, "sink", degreeOfParallelism); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/555837cd/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java index 18b4b75..160ef8d 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java @@ -186,7 +186,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> { GroupedReduceInvokable<OUT> invokable = new GroupedReduceInvokable<OUT>(clean(aggregate), keySelector); - SingleOutputStreamOperator<OUT, ?> returnStream = transform("groupReduce", typeInfo, + SingleOutputStreamOperator<OUT, ?> returnStream = transform("groupReduce", getType(), invokable); return returnStream; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/555837cd/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index 016322b..c19517b 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -33,7 +33,7 @@ import org.apache.flink.streaming.state.OperatorState; /** * The SingleOutputStreamOperator represents a user defined transformation * applied on a {@link DataStream} with one predefined output type. - * + * * @param <OUT> * Output type of the operator. * @param <O> @@ -52,6 +52,13 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato super(dataStream); } + @SuppressWarnings("unchecked") + public <R> SingleOutputStreamOperator<R, ?> setType(TypeInformation<R> outType) { + jobGraphBuilder.setOutType(id, outType); + typeInfo = outType; + return (SingleOutputStreamOperator<R, ?>) this; + } + /** * Sets the degree of parallelism for this operator. The degree must be 1 or * more. @@ -71,7 +78,6 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato return this; } - /** * Sets the maximum time frequency (ms) for the flushing of the output * buffer. By default the output buffers flush only when they are full. http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/555837cd/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java index 3051587..cbcc1b4 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java @@ -23,8 +23,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.function.co.JoinWindowFunction; +import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable; import org.apache.flink.streaming.util.keys.FieldsKeySelector; import org.apache.flink.streaming.util.keys.PojoKeySelector; @@ -122,114 +124,128 @@ public class StreamJoinOperator<I1, I2> extends } /** - * Continues a temporal Join transformation and defines the - * {@link Tuple} fields of the second join {@link DataStream} that - * should be used as join keys.<br/> - * <b>Note: Fields can only be selected as join keys on Tuple - * DataStreams.</b><br/> + * Creates a temporal Join transformation and defines the {@link Tuple} + * fields of the second join {@link DataStream} that should be used as + * join keys.<br/> + * </p> The resulting operator wraps each pair of joining elements in a + * Tuple2<I1,I2>(first, second). To use a different wrapping function + * use {@link JoinedStream#with(JoinFunction)} * * @param fields * The indexes of the Tuple fields of the second join * DataStream that should be used as keys. - * @return An incomplete join. Call {@link FinalizeStreamJoin#with} or - * {@link FinalizeStreamJoin#withDefault} to complete + * @return A streaming join operator. Call {@link JoinedStream#with} to + * apply a custom wrapping */ - public FinalizeStreamJoin<I1, I2> equalTo(int... fields) { + public JoinedStream<I1, I2> equalTo(int... fields) { keys2 = FieldsKeySelector.getSelector(op.input2.getType(), fields); - return new FinalizeStreamJoin<I1, I2>(this); + return createJoinOperator(); } /** - * Continues a temporal Join transformation and defines the fields of - * the second join {@link DataStream} that should be used as join keys.<br/> + * Creates a temporal Join transformation and defines the fields of the + * second join {@link DataStream} that should be used as join keys. </p> + * The resulting operator wraps each pair of joining elements in a + * Tuple2<I1,I2>(first, second). To use a different wrapping function + * use {@link JoinedStream#with(JoinFunction)} * * @param fields * The fields of the second join DataStream that should be * used as keys. - * @return An incomplete join. Call {@link FinalizeStreamJoin#with} or - * {@link FinalizeStreamJoin#withDefault} to complete + * @return A streaming join operator. Call {@link JoinedStream#with} to + * apply a custom wrapping */ - public FinalizeStreamJoin<I1, I2> equalTo(String... fields) { + public JoinedStream<I1, I2> equalTo(String... fields) { this.keys2 = new PojoKeySelector<I2>(op.input2.getType(), fields); - return new FinalizeStreamJoin<I1, I2>(this); + return createJoinOperator(); } /** - * Continues a temporal Join transformation and defines a + * Creates a temporal Join transformation and defines a * {@link KeySelector} function for the second join {@link DataStream} * .</br> The KeySelector function is called for each element of the * second DataStream and extracts a single key value on which the - * DataStream is joined. </br> + * DataStream is joined. </p> The resulting operator wraps each pair of + * joining elements in a Tuple2<I1,I2>(first, second). To use a + * different wrapping function use + * {@link JoinedStream#with(JoinFunction)} + * * * @param keySelector * The KeySelector function which extracts the key values * from the second DataStream on which it is joined. - * @return An incomplete join. Call {@link FinalizeStreamJoin#with} or - * {@link FinalizeStreamJoin#withDefault} to complete + * @return A streaming join operator. Call {@link JoinedStream#with} to + * apply a custom wrapping */ - public <K> FinalizeStreamJoin<I1, I2> equalTo(KeySelector<I2, K> keySelector) { + public <K> JoinedStream<I1, I2> equalTo(KeySelector<I2, K> keySelector) { this.keys2 = keySelector; - return new FinalizeStreamJoin<I1, I2>(this); + return createJoinOperator(); } + private JoinedStream<I1, I2> createJoinOperator() { + + JoinFunction<I1, I2, Tuple2<I1, I2>> joinFunction = new DefaultJoinFunction<I1, I2>(); + + JoinWindowFunction<I1, I2, Tuple2<I1, I2>> joinWindowFunction = getJoinWindowFunction( + joinFunction, this); + + TypeInformation<Tuple2<I1, I2>> outType = new TupleTypeInfo<Tuple2<I1, I2>>( + op.input1.getType(), op.input2.getType()); + + return new JoinedStream<I1, I2>(this, op.input1 + .groupBy(keys1) + .connect(op.input2.groupBy(keys2)) + .addGeneralWindowCombine(joinWindowFunction, outType, op.windowSize, + op.slideInterval, op.timeStamp1, op.timeStamp2)); + } } - public static class FinalizeStreamJoin<I1, I2> { + public static class JoinedStream<I1, I2> extends + SingleOutputStreamOperator<Tuple2<I1, I2>, JoinedStream<I1, I2>> { private final JoinPredicate<I1, I2> predicate; - private FinalizeStreamJoin(JoinPredicate<I1, I2> predicate) { + private JoinedStream(JoinPredicate<I1, I2> predicate, DataStream<Tuple2<I1, I2>> ds) { + super(ds); this.predicate = predicate; } /** * Completes a stream join. </p> The resulting operator wraps each pair - * of joining elements into a {@link Tuple2}, with the element of the - * first input being the first field of the tuple and the element of the - * second input being the second field of the tuple. - * - * @return The joined data stream. - */ - public SingleOutputStreamOperator<Tuple2<I1, I2>, ?> withDefault() { - return createJoinOperator(new DefaultJoinFunction<I1, I2>()); - } - - /** - * Completes a stream join. </p> The resulting operator wraps each pair * of joining elements using the user defined {@link JoinFunction} * * @return The joined data stream. */ public <OUT> SingleOutputStreamOperator<OUT, ?> with(JoinFunction<I1, I2, OUT> joinFunction) { - return createJoinOperator(joinFunction); - } - private <OUT> SingleOutputStreamOperator<OUT, ?> createJoinOperator( - JoinFunction<I1, I2, OUT> joinFunction) { - - JoinWindowFunction<I1, I2, OUT> joinWindowFunction = new JoinWindowFunction<I1, I2, OUT>( - predicate.keys1, predicate.keys2, joinFunction); + TypeInformation<OUT> outType = TypeExtractor.getJoinReturnTypes(joinFunction, + predicate.op.input1.getType(), predicate.op.input2.getType()); - StreamJoinOperator<I1, I2> op = predicate.op; + CoWindowInvokable<I1, I2, OUT> invokable = new CoWindowInvokable<I1, I2, OUT>( + getJoinWindowFunction(joinFunction, predicate), predicate.op.windowSize, + predicate.op.slideInterval, predicate.op.timeStamp1, predicate.op.timeStamp2); - TypeInformation<OUT> outType = TypeExtractor.getJoinReturnTypes(joinFunction, - op.input1.getType(), op.input2.getType()); + jobGraphBuilder.setInvokable(id, invokable); - return op.input1.connect(op.input2).addGeneralWindowCombine(joinWindowFunction, - outType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2); + return setType(outType); } } - public static final class DefaultJoinFunction<T1, T2> implements - JoinFunction<T1, T2, Tuple2<T1, T2>> { + public static final class DefaultJoinFunction<I1, I2> implements + JoinFunction<I1, I2, Tuple2<I1, I2>> { private static final long serialVersionUID = 1L; - private final Tuple2<T1, T2> outTuple = new Tuple2<T1, T2>(); + private final Tuple2<I1, I2> outTuple = new Tuple2<I1, I2>(); @Override - public Tuple2<T1, T2> join(T1 first, T2 second) throws Exception { + public Tuple2<I1, I2> join(I1 first, I2 second) throws Exception { outTuple.f0 = first; outTuple.f1 = second; return outTuple; } } + + public static <I1, I2, OUT> JoinWindowFunction<I1, I2, OUT> getJoinWindowFunction( + JoinFunction<I1, I2, OUT> joinFunction, JoinPredicate<I1, I2> predicate) { + return new JoinWindowFunction<I1, I2, OUT>(predicate.keys1, predicate.keys2, joinFunction); + } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/555837cd/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java index e71b18c..c8ad533 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java @@ -56,7 +56,7 @@ public class StreamProjection<IN> { protected StreamProjection(DataStream<IN> dataStream, int[] fieldIndexes) { this.dataStream = dataStream; this.fieldIndexes = fieldIndexes; - this.inTypeInfo = dataStream.typeInfo; + this.inTypeInfo = dataStream.getType(); if (!inTypeInfo.isTupleType()) { throw new RuntimeException("Only Tuple DataStreams can be projected"); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/555837cd/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/FieldsKeySelector.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/FieldsKeySelector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/FieldsKeySelector.java index d785109..171ddc9 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/FieldsKeySelector.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/FieldsKeySelector.java @@ -56,12 +56,13 @@ public abstract class FieldsKeySelector<IN> implements KeySelector<IN, Object> { protected Object key; protected boolean simpleKey; - public static Class<?>[] tupleClasses = new Class[] { Tuple1.class, Tuple2.class, Tuple3.class, - Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, Tuple9.class, - Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, Tuple14.class, - Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, Tuple19.class, - Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class, Tuple24.class, - Tuple25.class }; + @SuppressWarnings("unchecked") + public static Class<? extends Tuple>[] tupleClasses = new Class[] { Tuple1.class, Tuple2.class, + Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, + Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, + Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, + Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class, + Tuple24.class, Tuple25.class }; public FieldsKeySelector(int... fields) { this.keyFields = fields; @@ -73,7 +74,7 @@ public abstract class FieldsKeySelector<IN> implements KeySelector<IN, Object> { } try { - key = (Tuple) tupleClasses[fields.length - 1].newInstance(); + key = tupleClasses[fields.length - 1].newInstance(); } catch (Exception e) { throw new RuntimeException(e.getMessage()); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/555837cd/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java index 07d40ff..37f8c0a 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java @@ -96,7 +96,7 @@ public class WindowCrossJoinTest implements Serializable { DataStream<Integer> inStream2 = env.fromCollection(in2); inStream1.join(inStream2).onWindow(1000, 1000, new MyTimestamp1(), new MyTimestamp2()) - .where(0).equalTo(0).withDefault().addSink(new JoinResultSink()); + .where(0).equalTo(0).addSink(new JoinResultSink()); inStream1.cross(inStream2).onWindow(1000, 1000, new MyTimestamp1(), new MyTimestamp2()) .with(new CrossFunction<Tuple2<Integer,String>, Integer, Tuple2<Tuple2<Integer,String>, Integer>>() { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/555837cd/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java index 2586e3c..897ad48 100644 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java +++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java @@ -19,7 +19,9 @@ package org.apache.flink.streaming.examples.join; import java.util.Random; +import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -34,7 +36,7 @@ import org.apache.flink.util.Collector; * his example will join two streams with a sliding window. One which emits * grades and one which emits salaries of people. * </p> - * + * * <p> * This example shows how to: * <ul> @@ -63,13 +65,13 @@ public class WindowJoin { // apply a temporal join over the two stream based on the names over one // second windows - DataStream<Tuple2<Tuple2<String, Integer>, Tuple2<String, Integer>>> joinedStream = grades - .join(salaries) - .onWindow(1000) - .where(0) - .equalTo(0) - .withDefault(); - + DataStream<Tuple3<String, Integer, Integer>> joinedStream = grades + .join(salaries) + .onWindow(1000) + .where(0) + .equalTo(0) + .with(new MyJoinFunction()); + // emit result if (fileOutput) { joinedStream.writeAsText(outputPath, 1); @@ -141,6 +143,24 @@ public class WindowJoin { } } + public static class MyJoinFunction + implements + JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>> { + + private static final long serialVersionUID = 1L; + + private Tuple3<String, Integer, Integer> joined = new Tuple3<String, Integer, Integer>(); + + @Override + public Tuple3<String, Integer, Integer> join(Tuple2<String, Integer> first, + Tuple2<String, Integer> second) throws Exception { + joined.f0 = first.f0; + joined.f1 = first.f1; + joined.f2 = second.f1; + return joined; + } + } + // ************************************************************************* // UTIL METHODS // ************************************************************************* http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/555837cd/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala index a117412..871fede 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala @@ -108,7 +108,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * */ def groupBy(fields: Int*): DataStream[T] = - new DataStream[T](javaStream.groupBy(fields: _*)) + new DataStream[T](javaStream.groupBy(new FieldsKeySelector[T](fields: _*))) /** * Groups the elements of a DataStream by the given field expressions to @@ -138,7 +138,7 @@ class DataStream[T](javaStream: JavaStream[T]) { * */ def partitionBy(fields: Int*): DataStream[T] = - new DataStream[T](javaStream.partitionBy(fields: _*)) + new DataStream[T](javaStream.partitionBy(new FieldsKeySelector[T](fields: _*))) /** * Sets the partitioning of the DataStream so that the output is @@ -458,6 +458,8 @@ class DataStream[T](javaStream: JavaStream[T]) { split(selector) } + def join[R](stream: DataStream[R]): StreamJoinOperator[T, R] = new StreamJoinOperator[T, R](javaStream, stream.getJavaStream) + /** * Writes a DataStream to the standard output stream (stdout). For each * element of the DataStream the result of .toString is http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/555837cd/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala new file mode 100644 index 0000000..4223512 --- /dev/null +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala @@ -0,0 +1,29 @@ +package org.apache.flink.api.scala.streaming + +import org.apache.flink.streaming.util.keys.{ FieldsKeySelector => JavaSelector } +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple + +class FieldsKeySelector[IN](fields: Int*) extends KeySelector[IN, Tuple] { + + val t: Tuple = JavaSelector.tupleClasses(fields.length - 1).newInstance() + + override def getKey(value: IN): Tuple = + + value match { + case prod: Product => { + for (i <- 0 to fields.length - 1) { + t.setField(prod.productElement(fields(i)), i) + } + t + } + case tuple: Tuple => { + for (i <- 0 to fields.length - 1) { + t.setField(tuple.getField(fields(i)), i) + } + t + } + case _ => throw new RuntimeException("Only tuple types are supported") + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/555837cd/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala new file mode 100644 index 0000000..93950a2 --- /dev/null +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.streaming + +import org.apache.flink.api.common.functions.JoinFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.scala.ClosureCleaner +import org.apache.flink.api.scala.typeutils.CaseClassSerializer +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream } +import org.apache.flink.streaming.api.datastream.TemporalOperator +import org.apache.flink.streaming.api.function.co.JoinWindowFunction +import org.apache.flink.streaming.util.keys.PojoKeySelector +import scala.reflect.ClassTag +import org.apache.commons.lang.Validate +import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable + +class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) { + + override def createNextWindowOperator() = { + new StreamJoinOperator.JoinWindow[I1, I2](this) + } +} + +object StreamJoinOperator { + + private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { + ClosureCleaner.clean(f, checkSerializable) + f + } + + class JoinWindow[I1, I2](op: StreamJoinOperator[I1, I2]) { + + /** + * Continues a temporal Join transformation by defining + * the fields in the first stream to be used as keys for the join. + * The resulting incomplete join can be completed by JoinPredicate.equalTo() + * to define the second key. + */ + def where(fields: Int*) = { + new JoinPredicate[I1, I2](op, new FieldsKeySelector[I1](fields: _*)) + } + + /** + * Continues a temporal Join transformation by defining + * the fields in the first stream to be used as keys for the join. + * The resulting incomplete join can be completed by JoinPredicate.equalTo() + * to define the second key. + */ + def where(firstField: String, otherFields: String*) = { + new JoinPredicate[I1, I2](op, new PojoKeySelector[I1](op.input1.getType(), (firstField +: otherFields): _*)) + } + + /** + * Continues a temporal Join transformation by defining + * the keyselector function that will be used to extract keys from the first stream + * for the join. + * The resulting incomplete join can be completed by JoinPredicate.equalTo() + * to define the second key. + */ + def where[K: TypeInformation](fun: (I1) => K) = { + val keyType = implicitly[TypeInformation[K]] + val keyExtractor = new KeySelector[I1, K] { + val cleanFun = op.input1.clean(fun) + def getKey(in: I1) = cleanFun(in) + } + new JoinPredicate[I1, I2](op, keyExtractor) + } + + } + + class JoinPredicate[I1, I2](private[flink] val op: StreamJoinOperator[I1, I2], private[flink] val keys1: KeySelector[I1, _]) { + private[flink] var keys2: KeySelector[I2, _] = null + + /** + * Creates a temporal join transformation by defining the second join key. + * The returned transformation wrapes each joined element pair in a tuple2: + * (first, second) + * To define a custom wrapping, use JoinedStream.with(...) + */ + def equalTo(fields: Int*): JoinedStream[I1, I2] = { + finish(new FieldsKeySelector[I2](fields: _*)) + } + + /** + * Creates a temporal join transformation by defining the second join key. + * The returned transformation wrapes each joined element pair in a tuple2: + * (first, second) + * To define a custom wrapping, use JoinedStream.with(...) + */ + def equalTo(firstField: String, otherFields: String*): JoinedStream[I1, I2] = { + finish(new PojoKeySelector[I2](op.input2.getType(), (firstField +: otherFields): _*)) + } + + /** + * Creates a temporal join transformation by defining the second join key. + * The returned transformation wrapes each joined element pair in a tuple2: + * (first, second) + * To define a custom wrapping, use JoinedStream.with(...) + */ + def equalTo[K: TypeInformation](fun: (I2) => K): JoinedStream[I1, I2] = { + val keyType = implicitly[TypeInformation[K]] + val keyExtractor = new KeySelector[I2, K] { + val cleanFun = op.input1.clean(fun) + def getKey(in: I2) = cleanFun(in) + } + finish(keyExtractor) + } + + private def finish(keys2: KeySelector[I2, _]): JoinedStream[I1, I2] = { + this.keys2 = keys2 + new JoinedStream[I1, I2](this, createJoinOperator()) + } + + private def createJoinOperator(): JavaStream[(I1, I2)] = { + + val returnType = new CaseClassTypeInfo[(I1, I2)]( + + classOf[(I1, I2)], Seq(op.input1.getType, op.input2.getType), Array("_1", "_2")) { + + override def createSerializer: TypeSerializer[(I1, I2)] = { + val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity) + for (i <- 0 until getArity) { + fieldSerializers(i) = types(i).createSerializer + } + + new CaseClassSerializer[(I1, I2)](classOf[(I1, I2)], fieldSerializers) { + override def createInstance(fields: Array[AnyRef]) = { + (fields(0).asInstanceOf[I1], fields(1).asInstanceOf[I2]) + } + } + } + } + + return op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2)).addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)), + returnType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2) + } + } + + class JoinedStream[I1, I2](jp: JoinPredicate[I1, I2], javaStream: JavaStream[(I1, I2)]) extends DataStream[(I1, I2)](javaStream) { + + private val op = jp.op + + def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = { + + val invokable = new CoWindowInvokable[I1, I2, R]( + clean(getJoinWindowFunction(jp, fun)), op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2) + + javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(), invokable) + + new DataStream[R](javaStream.setType(implicitly[TypeInformation[R]])) + } + } + + private[flink] def getJoinWindowFunction[I1, I2, R](jp: JoinPredicate[I1, I2], joinFunction: (I1, I2) => R) = { + Validate.notNull(joinFunction, "Join function must not be null.") + + val joinFun = new JoinFunction[I1, I2, R] { + + val cleanFun = clean(joinFunction) + + override def join(first: I1, second: I2): R = { + cleanFun(first, second) + } + } + + new JoinWindowFunction[I1, I2, R](jp.keys1, jp.keys2, joinFun) + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/555837cd/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala index ff89a47..c686497 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala @@ -61,7 +61,7 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) { * */ def groupBy(fields: Int*): WindowedDataStream[T] = - new WindowedDataStream[T](javaStream.groupBy(fields: _*)) + new WindowedDataStream[T](javaStream.groupBy(new FieldsKeySelector[T](fields: _*))) /** * Groups the elements of the WindowedDataStream using the given
