http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java index abed03f..41d1778 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java @@ -18,21 +18,22 @@ */ package org.apache.samza.operators.impl; -import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.config.Config; import org.apache.samza.operators.functions.SinkFunction; import org.apache.samza.operators.spec.SinkOperatorSpec; import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; /** * Implementation for {@link SinkOperatorSpec} */ -class SinkOperatorImpl<M extends MessageEnvelope> extends OperatorImpl<M, MessageEnvelope> { +class SinkOperatorImpl<M> extends OperatorImpl<M, M> { private final SinkFunction<M> sinkFn; - SinkOperatorImpl(SinkOperatorSpec<M> sinkOp) { + SinkOperatorImpl(SinkOperatorSpec<M> sinkOp, Config config, TaskContext context) { this.sinkFn = sinkOp.getSinkFn(); }
http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java index 3a5c56e..644de20 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java @@ -18,24 +18,26 @@ */ package org.apache.samza.operators.impl; -import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.operators.spec.StreamOperatorSpec; import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; /** - * A StreamOperator that accepts a 1:n transform function and applies it to each incoming {@link MessageEnvelope}. + * A StreamOperator that accepts a 1:n transform function and applies it to each incoming message. * - * @param <M> type of {@link MessageEnvelope} in the input stream - * @param <RM> type of {@link MessageEnvelope} in the output stream + * @param <M> type of message in the input stream + * @param <RM> type of message in the output stream */ -class StreamOperatorImpl<M extends MessageEnvelope, RM extends MessageEnvelope> extends OperatorImpl<M, RM> { +class StreamOperatorImpl<M, RM> extends OperatorImpl<M, RM> { private final FlatMapFunction<M, RM> transformFn; - StreamOperatorImpl(StreamOperatorSpec<M, RM> streamOperatorSpec) { + StreamOperatorImpl(StreamOperatorSpec<M, RM> streamOperatorSpec, MessageStreamImpl<M> source, Config config, TaskContext context) { this.transformFn = streamOperatorSpec.getTransformFn(); } http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java index a5b71a7..af00553 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java @@ -18,18 +18,21 @@ */ package org.apache.samza.operators.impl; -import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.spec.WindowOperatorSpec; import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.internal.WindowInternal; import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; -public class WindowOperatorImpl<M extends MessageEnvelope, K, WK, WV, WM extends WindowPane<WK, WV>> extends OperatorImpl<M, WM> { +public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK, WV>> { - private final WindowInternal<M, K, WV> window; + private final WindowInternal<M, WK, WV> window; - public WindowOperatorImpl(WindowOperatorSpec spec) { + public WindowOperatorImpl(WindowOperatorSpec spec, MessageStreamImpl<M> source, Config config, TaskContext context) { + // source, config, and context are used to initialize the window kv-store window = spec.getWindow(); } http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java index 8b75cdc..1444662 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java @@ -18,20 +18,45 @@ */ package org.apache.samza.operators.spec; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.task.TaskContext; /** * A stateless serializable stream operator specification that holds all the information required - * to transform the input {@link MessageStream} and produce the output {@link MessageStream}. + * to transform the input {@link MessageStreamImpl} and produce the output {@link MessageStreamImpl}. + * + * @param <OM> the type of output message from the operator */ -public interface OperatorSpec<OM extends MessageEnvelope> { +@InterfaceStability.Unstable +public interface OperatorSpec<OM> { + + enum OpCode { + MAP, + FLAT_MAP, + FILTER, + SINK, + SEND_TO, + JOIN, + WINDOW, + MERGE, + PARTITION_BY + } + /** - * Get the output stream containing transformed {@link MessageEnvelope} produced by this operator. - * @return the output stream containing transformed {@link MessageEnvelope} produced by this operator. + * Get the output stream containing transformed messages produced by this operator. + * @return the output stream containing transformed messages produced by this operator. */ - MessageStream<OM> getOutputStream(); + MessageStreamImpl<OM> getNextStream(); + /** + * Init method to initialize the context for this {@link OperatorSpec}. The default implementation is NO-OP. + * + * @param config the {@link Config} object for this task + * @param context the {@link TaskContext} object for this task + */ + default void init(Config config, TaskContext context) { } } http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java index fc25929..d626852 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java @@ -19,16 +19,21 @@ package org.apache.samza.operators.spec; -import org.apache.samza.operators.data.MessageEnvelope; +import java.util.Collection; +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.StreamGraphImpl; +import org.apache.samza.operators.functions.FilterFunction; import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.operators.functions.MapFunction; +import org.apache.samza.operators.functions.PartialJoinFunction; import org.apache.samza.operators.functions.SinkFunction; -import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.internal.WindowInternal; import java.util.ArrayList; -import java.util.UUID; -import java.util.function.BiFunction; +import org.apache.samza.task.TaskContext; /** @@ -38,80 +43,168 @@ public class OperatorSpecs { private OperatorSpecs() {} - private static String getOperatorId() { - // TODO: need to change the IDs to be a consistent, durable IDs that can be recovered across container and job restarts - return UUID.randomUUID().toString(); + /** + * Creates a {@link StreamOperatorSpec} for {@link MapFunction} + * + * @param mapFn the map function + * @param graph the {@link StreamGraphImpl} object + * @param output the output {@link MessageStreamImpl} object + * @param <M> type of input message + * @param <OM> type of output message + * @return the {@link StreamOperatorSpec} + */ + public static <M, OM> StreamOperatorSpec<M, OM> createMapOperatorSpec(MapFunction<M, OM> mapFn, StreamGraphImpl graph, MessageStreamImpl<OM> output) { + return new StreamOperatorSpec<>(new FlatMapFunction<M, OM>() { + @Override + public Collection<OM> apply(M message) { + return new ArrayList<OM>() { + { + OM r = mapFn.apply(message); + if (r != null) { + this.add(r); + } + } + }; + } + + @Override + public void init(Config config, TaskContext context) { + mapFn.init(config, context); + } + }, output, OperatorSpec.OpCode.MAP, graph.getNextOpId()); + } + + /** + * Creates a {@link StreamOperatorSpec} for {@link FilterFunction} + * + * @param filterFn the transformation function + * @param graph the {@link StreamGraphImpl} object + * @param output the output {@link MessageStreamImpl} object + * @param <M> type of input message + * @return the {@link StreamOperatorSpec} + */ + public static <M> StreamOperatorSpec<M, M> createFilterOperatorSpec(FilterFunction<M> filterFn, StreamGraphImpl graph, MessageStreamImpl<M> output) { + return new StreamOperatorSpec<>(new FlatMapFunction<M, M>() { + @Override + public Collection<M> apply(M message) { + return new ArrayList<M>() { + { + if (filterFn.apply(message)) { + this.add(message); + } + } + }; + } + + @Override + public void init(Config config, TaskContext context) { + filterFn.init(config, context); + } + }, output, OperatorSpec.OpCode.FILTER, graph.getNextOpId()); } /** * Creates a {@link StreamOperatorSpec}. * * @param transformFn the transformation function - * @param <M> type of input {@link MessageEnvelope} - * @param <OM> type of output {@link MessageEnvelope} + * @param graph the {@link StreamGraphImpl} object + * @param output the output {@link MessageStreamImpl} object + * @param <M> type of input message + * @param <OM> type of output message * @return the {@link StreamOperatorSpec} */ - public static <M extends MessageEnvelope, OM extends MessageEnvelope> StreamOperatorSpec<M, OM> createStreamOperatorSpec( - FlatMapFunction<M, OM> transformFn) { - return new StreamOperatorSpec<>(transformFn); + public static <M, OM> StreamOperatorSpec<M, OM> createStreamOperatorSpec( + FlatMapFunction<M, OM> transformFn, StreamGraphImpl graph, MessageStreamImpl<OM> output) { + return new StreamOperatorSpec<>(transformFn, output, OperatorSpec.OpCode.FLAT_MAP, graph.getNextOpId()); + } + + /** + * Creates a {@link SinkOperatorSpec}. + * + * @param sinkFn the sink function + * @param <M> type of input message + * @param graph the {@link StreamGraphImpl} object + * @return the {@link SinkOperatorSpec} + */ + public static <M> SinkOperatorSpec<M> createSinkOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph) { + return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.SINK, graph.getNextOpId()); + } + + /** + * Creates a {@link SinkOperatorSpec}. + * + * @param sinkFn the sink function + * @param graph the {@link StreamGraphImpl} object + * @param stream the {@link OutputStream} where the message is sent to + * @param <M> type of input message + * @return the {@link SinkOperatorSpec} + */ + public static <M> SinkOperatorSpec<M> createSendToOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph, OutputStream<M> stream) { + return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.SEND_TO, graph.getNextOpId(), stream); } /** * Creates a {@link SinkOperatorSpec}. * * @param sinkFn the sink function - * @param <M> type of input {@link MessageEnvelope} + * @param graph the {@link StreamGraphImpl} object + * @param stream the {@link OutputStream} where the message is sent to + * @param <M> type of input message * @return the {@link SinkOperatorSpec} */ - public static <M extends MessageEnvelope> SinkOperatorSpec<M> createSinkOperatorSpec(SinkFunction<M> sinkFn) { - return new SinkOperatorSpec<>(sinkFn); + public static <M> SinkOperatorSpec<M> createPartitionOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph, OutputStream<M> stream) { + return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.PARTITION_BY, graph.getNextOpId(), stream); } /** * Creates a {@link WindowOperatorSpec}. * * @param window the description of the window. - * @param <M> the type of input {@link MessageEnvelope} - * @param <K> the type of key in the {@link MessageEnvelope} in this {@link org.apache.samza.operators.MessageStream}. If a key is specified, - * results are emitted per-key + * @param graph the {@link StreamGraphImpl} object + * @param wndOutput the window output {@link MessageStreamImpl} object + * @param <M> the type of input message * @param <WK> the type of key in the {@link WindowPane} * @param <WV> the type of value in the window - * @param <WM> the type of output {@link WindowPane} * @return the {@link WindowOperatorSpec} */ - public static <M extends MessageEnvelope, K, WK, WV, WM extends WindowPane<WK, WV>> WindowOperatorSpec<M, K, WK, WV, WM> createWindowOperatorSpec(WindowInternal<M, K, WV> window) { - return new WindowOperatorSpec<>(window, OperatorSpecs.getOperatorId()); + public static <M, WK, WV> WindowOperatorSpec<M, WK, WV> createWindowOperatorSpec( + WindowInternal<M, WK, WV> window, StreamGraphImpl graph, MessageStreamImpl<WindowPane<WK, WV>> wndOutput) { + return new WindowOperatorSpec<>(window, wndOutput, graph.getNextOpId()); } /** * Creates a {@link PartialJoinOperatorSpec}. * * @param partialJoinFn the join function + * @param graph the {@link StreamGraphImpl} object * @param joinOutput the output {@link MessageStreamImpl} - * @param <M> type of input {@link MessageEnvelope} + * @param <M> type of input message * @param <K> type of join key - * @param <JM> the type of {@link MessageEnvelope} in the other join stream - * @param <OM> the type of {@link MessageEnvelope} in the join output + * @param <JM> the type of message in the other join stream + * @param <OM> the type of message in the join output * @return the {@link PartialJoinOperatorSpec} */ - public static <M extends MessageEnvelope<K, ?>, K, JM extends MessageEnvelope<K, ?>, OM extends MessageEnvelope> PartialJoinOperatorSpec<M, K, JM, OM> createPartialJoinOperatorSpec( - BiFunction<M, JM, OM> partialJoinFn, MessageStreamImpl<OM> joinOutput) { - return new PartialJoinOperatorSpec<>(partialJoinFn, joinOutput, OperatorSpecs.getOperatorId()); + public static <M, K, JM, OM> PartialJoinOperatorSpec<M, K, JM, OM> createPartialJoinOperatorSpec( + PartialJoinFunction<K, M, JM, OM> partialJoinFn, StreamGraphImpl graph, MessageStreamImpl<OM> joinOutput) { + return new PartialJoinOperatorSpec<>(partialJoinFn, joinOutput, graph.getNextOpId()); } /** * Creates a {@link StreamOperatorSpec} with a merger function. * + * @param graph the {@link StreamGraphImpl} object * @param mergeOutput the output {@link MessageStreamImpl} from the merger - * @param <M> the type of input {@link MessageEnvelope} + * @param <M> the type of input message * @return the {@link StreamOperatorSpec} for the merge */ - public static <M extends MessageEnvelope> StreamOperatorSpec<M, M> createMergeOperatorSpec(MessageStreamImpl<M> mergeOutput) { - return new StreamOperatorSpec<M, M>(t -> - new ArrayList<M>() { { - this.add(t); - } }, - mergeOutput); + public static <M> StreamOperatorSpec<M, M> createMergeOperatorSpec(StreamGraphImpl graph, MessageStreamImpl<M> mergeOutput) { + return new StreamOperatorSpec<M, M>(message -> + new ArrayList<M>() { + { + this.add(message); + } + }, + mergeOutput, OperatorSpec.OpCode.MERGE, graph.getNextOpId()); } } http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java index e6d77f6..e057c2b 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java @@ -18,63 +18,69 @@ */ package org.apache.samza.operators.spec; -import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.config.Config; import org.apache.samza.operators.MessageStreamImpl; - -import java.util.function.BiFunction; +import org.apache.samza.operators.functions.PartialJoinFunction; +import org.apache.samza.task.TaskContext; /** - * Spec for the partial join operator that takes {@link MessageEnvelope}s from one input stream, joins with buffered - * {@link MessageEnvelope}s from another stream, and produces join results to an output {@link MessageStreamImpl}. + * Spec for the partial join operator that takes messages from one input stream, joins with buffered + * messages from another stream, and produces join results to an output {@link MessageStreamImpl}. * - * @param <M> the type of input {@link MessageEnvelope} + * @param <M> the type of input message * @param <K> the type of join key - * @param <JM> the type of {@link MessageEnvelope} in the other join stream - * @param <RM> the type of {@link MessageEnvelope} in the join output stream + * @param <JM> the type of message in the other join stream + * @param <RM> the type of message in the join output stream */ -public class PartialJoinOperatorSpec<M extends MessageEnvelope<K, ?>, K, JM extends MessageEnvelope<K, ?>, RM extends MessageEnvelope> - implements OperatorSpec<RM> { +public class PartialJoinOperatorSpec<M, K, JM, RM> implements OperatorSpec<RM> { private final MessageStreamImpl<RM> joinOutput; /** - * The transformation function of {@link PartialJoinOperatorSpec} that takes an input {@link MessageEnvelope} of - * type {@code M}, joins with a stream of buffered {@link MessageEnvelope}s of type {@code JM} from another stream, - * and generates a joined result {@link MessageEnvelope} of type {@code RM}. + * The transformation function of {@link PartialJoinOperatorSpec} that takes an input message of + * type {@code M}, joins with a stream of buffered messages of type {@code JM} from another stream, + * and generates a joined result message of type {@code RM}. */ - private final BiFunction<M, JM, RM> transformFn; + private final PartialJoinFunction<K, M, JM, RM> transformFn; /** * The unique ID for this operator. */ - private final String operatorId; + private final int opId; /** * Default constructor for a {@link PartialJoinOperatorSpec}. * - * @param partialJoinFn partial join function that take type {@code M} of input {@link MessageEnvelope} and join - * w/ type {@code JM} of buffered {@link MessageEnvelope} from another stream + * @param partialJoinFn partial join function that take type {@code M} of input message and join + * w/ type {@code JM} of buffered message from another stream * @param joinOutput the output {@link MessageStreamImpl} of the join results */ - PartialJoinOperatorSpec(BiFunction<M, JM, RM> partialJoinFn, MessageStreamImpl<RM> joinOutput, String operatorId) { + PartialJoinOperatorSpec(PartialJoinFunction<K, M, JM, RM> partialJoinFn, MessageStreamImpl<RM> joinOutput, int opId) { this.joinOutput = joinOutput; this.transformFn = partialJoinFn; - this.operatorId = operatorId; + this.opId = opId; } @Override - public String toString() { - return this.operatorId; - } - - @Override - public MessageStreamImpl<RM> getOutputStream() { + public MessageStreamImpl<RM> getNextStream() { return this.joinOutput; } - public BiFunction<M, JM, RM> getTransformFn() { + public PartialJoinFunction<K, M, JM, RM> getTransformFn() { return this.transformFn; } + + public OperatorSpec.OpCode getOpCode() { + return OpCode.JOIN; + } + + public int getOpId() { + return this.opId; + } + + @Override public void init(Config config, TaskContext context) { + this.transformFn.init(config, context); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java index 4348bc0..ba30d67 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java @@ -18,18 +18,30 @@ */ package org.apache.samza.operators.spec; -import org.apache.samza.operators.data.MessageEnvelope; -import org.apache.samza.operators.functions.SinkFunction; +import org.apache.samza.config.Config; import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.functions.SinkFunction; +import org.apache.samza.task.TaskContext; /** * The spec for a sink operator that accepts user-defined logic to output a {@link MessageStreamImpl} to an external * system. This is a terminal operator and does allows further operator chaining. * - * @param <M> the type of input {@link MessageEnvelope} + * @param <M> the type of input message */ -public class SinkOperatorSpec<M extends MessageEnvelope> implements OperatorSpec { +public class SinkOperatorSpec<M> implements OperatorSpec { + + /** + * {@link OpCode} for this {@link SinkOperatorSpec} + */ + private final OperatorSpec.OpCode opCode; + + /** + * The unique ID for this operator. + */ + private final int opId; /** * The user-defined sink function @@ -37,14 +49,40 @@ public class SinkOperatorSpec<M extends MessageEnvelope> implements OperatorSpec private final SinkFunction<M> sinkFn; /** - * Default constructor for a {@link SinkOperatorSpec}. + * Potential output stream defined by the {@link SinkFunction} + */ + private final OutputStream<M> outStream; + + /** + * Default constructor for a {@link SinkOperatorSpec} w/o an output stream. (e.g. output is sent to remote database) + * + * @param sinkFn a user defined {@link SinkFunction} that will be called with the output message, + * the output {@link org.apache.samza.task.MessageCollector} and the + * {@link org.apache.samza.task.TaskCoordinator}. + * @param opCode the specific {@link OpCode} for this {@link SinkOperatorSpec}. It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO}, + * or {@link OpCode#PARTITION_BY} + * @param opId the unique id of this {@link SinkOperatorSpec} in the {@link org.apache.samza.operators.StreamGraph} + */ + SinkOperatorSpec(SinkFunction<M> sinkFn, OperatorSpec.OpCode opCode, int opId) { + this(sinkFn, opCode, opId, null); + } + + /** + * Default constructor for a {@link SinkOperatorSpec} that sends the output to an {@link OutputStream} * - * @param sinkFn a user defined {@link SinkFunction} that will be called with the output {@link MessageEnvelope}, + * @param sinkFn a user defined {@link SinkFunction} that will be called with the output message, * the output {@link org.apache.samza.task.MessageCollector} and the * {@link org.apache.samza.task.TaskCoordinator}. + * @param opCode the specific {@link OpCode} for this {@link SinkOperatorSpec}. It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO}, + * or {@link OpCode#PARTITION_BY} + * @param opId the unique id of this {@link SinkOperatorSpec} in the {@link org.apache.samza.operators.StreamGraph} + * @param opId the {@link OutputStream} for this {@link SinkOperatorSpec} */ - SinkOperatorSpec(SinkFunction<M> sinkFn) { + SinkOperatorSpec(SinkFunction<M> sinkFn, OperatorSpec.OpCode opCode, int opId, OutputStream<M> outStream) { this.sinkFn = sinkFn; + this.opCode = opCode; + this.opId = opId; + this.outStream = outStream; } /** @@ -52,11 +90,27 @@ public class SinkOperatorSpec<M extends MessageEnvelope> implements OperatorSpec * @return null */ @Override - public MessageStreamImpl getOutputStream() { + public MessageStreamImpl<M> getNextStream() { return null; } public SinkFunction<M> getSinkFn() { return this.sinkFn; } + + public OperatorSpec.OpCode getOpCode() { + return this.opCode; + } + + public int getOpId() { + return this.opId; + } + + public OutputStream<M> getOutStream() { + return this.outStream; + } + + @Override public void init(Config config, TaskContext context) { + this.sinkFn.init(config, context); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java index ed18da4..d7813f7 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java @@ -18,50 +18,74 @@ */ package org.apache.samza.operators.spec; -import org.apache.samza.operators.data.MessageEnvelope; -import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.config.Config; import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.task.TaskContext; /** - * The spec for a linear stream operator that outputs 0 or more {@link MessageEnvelope}s for each input {@link MessageEnvelope}. + * The spec for a linear stream operator that outputs 0 or more messages for each input message. * - * @param <M> the type of input {@link MessageEnvelope} - * @param <OM> the type of output {@link MessageEnvelope} + * @param <M> the type of input message + * @param <OM> the type of output message */ -public class StreamOperatorSpec<M extends MessageEnvelope, OM extends MessageEnvelope> implements OperatorSpec<OM> { +public class StreamOperatorSpec<M, OM> implements OperatorSpec<OM> { - private final MessageStreamImpl<OM> outputStream; + /** + * {@link OpCode} for this {@link StreamOperatorSpec} + */ + private final OperatorSpec.OpCode opCode; - private final FlatMapFunction<M, OM> transformFn; + /** + * The unique ID for this operator. + */ + private final int opId; /** - * Default constructor for a {@link StreamOperatorSpec}. - * - * @param transformFn the transformation function that transforms each input {@link MessageEnvelope} into a collection - * of output {@link MessageEnvelope}s + * The output {@link MessageStreamImpl} from this {@link StreamOperatorSpec} */ - StreamOperatorSpec(FlatMapFunction<M, OM> transformFn) { - this(transformFn, new MessageStreamImpl<>()); - } + private final MessageStreamImpl<OM> outputStream; + + /** + * Transformation function applied in this {@link StreamOperatorSpec} + */ + private final FlatMapFunction<M, OM> transformFn; /** * Constructor for a {@link StreamOperatorSpec} that accepts an output {@link MessageStreamImpl}. * * @param transformFn the transformation function * @param outputStream the output {@link MessageStreamImpl} + * @param opCode the {@link OpCode} for this {@link StreamOperatorSpec} + * @param opId the unique id for this {@link StreamOperatorSpec} in a {@link org.apache.samza.operators.StreamGraph} */ - StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, MessageStreamImpl<OM> outputStream) { + StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, MessageStreamImpl outputStream, OperatorSpec.OpCode opCode, int opId) { this.outputStream = outputStream; this.transformFn = transformFn; + this.opCode = opCode; + this.opId = opId; } @Override - public MessageStreamImpl<OM> getOutputStream() { + public MessageStreamImpl<OM> getNextStream() { return this.outputStream; } public FlatMapFunction<M, OM> getTransformFn() { return this.transformFn; } + + public OperatorSpec.OpCode getOpCode() { + return this.opCode; + } + + public int getOpId() { + return this.opId; + } + + @Override + public void init(Config config, TaskContext context) { + this.transformFn.init(config, context); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java index cdc02a8..46417ed 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java @@ -19,29 +19,42 @@ package org.apache.samza.operators.spec; -import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.operators.data.MessageEnvelope; import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.internal.WindowInternal; -public class WindowOperatorSpec<M extends MessageEnvelope, K, WK, WV, WM extends WindowPane<WK, WV>> implements OperatorSpec<WM> { - private final WindowInternal window; +/** + * Default window operator spec object + * + * @param <M> the type of input message to the window + * @param <WK> the type of key of the window + * @param <WV> the type of aggregated value in the window output {@link WindowPane} + */ +public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK, WV>> { + + private final WindowInternal<M, WK, WV> window; - private final MessageStreamImpl<WM> outputStream; + private final MessageStreamImpl<WindowPane<WK, WV>> outputStream; - private final String operatorId; + private final int opId; - public WindowOperatorSpec(WindowInternal window, String operatorId) { + /** + * Constructor for {@link WindowOperatorSpec}. + * + * @param window the window function + * @param outputStream the output {@link MessageStreamImpl} from this {@link WindowOperatorSpec} + * @param opId auto-generated unique ID of this operator + */ + WindowOperatorSpec(WindowInternal<M, WK, WV> window, MessageStreamImpl<WindowPane<WK, WV>> outputStream, int opId) { + this.outputStream = outputStream; this.window = window; - this.outputStream = new MessageStreamImpl<>(); - this.operatorId = operatorId; + this.opId = opId; } @Override - public MessageStream<WM> getOutputStream() { + public MessageStreamImpl<WindowPane<WK, WV>> getNextStream() { return this.outputStream; } @@ -49,7 +62,11 @@ public class WindowOperatorSpec<M extends MessageEnvelope, K, WK, WV, WM extends return window; } - public String getOperatorId() { - return operatorId; + public OpCode getOpCode() { + return OpCode.WINDOW; + } + + public int getOpId() { + return this.opId; } } http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java index e9af043..53bca2e 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java @@ -30,20 +30,16 @@ import org.apache.samza.annotation.InterfaceStability; @InterfaceStability.Unstable public interface WindowState<WV> { /** - * Method to get the system time when the first {@link org.apache.samza.operators.data.MessageEnvelope} - * in the window is received + * Method to get the system time when the first message in the window is received * - * @return nano-second of system time for the first {@link org.apache.samza.operators.data.MessageEnvelope} - * received in the window + * @return nano-second of system time for the first message received in the window */ long getFirstMessageTimeNs(); /** - * Method to get the system time when the last {@link org.apache.samza.operators.data.MessageEnvelope} - * in the window is received + * Method to get the system time when the last message in the window is received * - * @return nano-second of system time for the last {@link org.apache.samza.operators.data.MessageEnvelope} - * received in the window + * @return nano-second of system time for the last message received in the window */ long getLastMessageTimeNs(); @@ -62,9 +58,9 @@ public interface WindowState<WV> { long getLatestEventTimeNs(); /** - * Method to get the total number of {@link org.apache.samza.operators.data.MessageEnvelope}s received in the window + * Method to get the total number of messages received in the window * - * @return number of {@link org.apache.samza.operators.data.MessageEnvelope}s in the window + * @return number of messages in the window */ long getNumberMessages(); http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/system/SingleJobExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/system/SingleJobExecutionEnvironment.java b/samza-operator/src/main/java/org/apache/samza/system/SingleJobExecutionEnvironment.java new file mode 100644 index 0000000..60a4c60 --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/system/SingleJobExecutionEnvironment.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.system; + +import org.apache.samza.operators.StreamGraphFactory; +import org.apache.samza.config.Config; + +/** + * This class implements the {@link ExecutionEnvironment} that runs the applications in YARN environment + */ +public class SingleJobExecutionEnvironment implements ExecutionEnvironment { + + @Override public void run(StreamGraphFactory app, Config config) { + // TODO: add description of ProcessContext that is going to create a sub-DAG of the {@code graph} + // TODO: actually instantiate the tasks and run the job, i.e. + // 1. create all input/output/intermediate topics + // 2. create the single job configuration + // 3. execute JobRunner to submit the single job for the whole graph + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java b/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java new file mode 100644 index 0000000..f60ff82 --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system; + +import org.apache.samza.operators.StreamGraphFactory; +import org.apache.samza.config.Config; + + +/** + * This class implements the {@link ExecutionEnvironment} that runs the applications in standalone environment + */ +public class StandaloneExecutionEnvironment implements ExecutionEnvironment { + + @Override public void run(StreamGraphFactory app, Config config) { + // 1. get logic graph for optimization + // StreamGraph logicGraph = app.create(config); + // 2. potential optimization.... + // 3. create new instance of StreamGraphFactory that would generate the optimized graph + // 4. create all input/output/intermediate topics + // 5. create the configuration for StreamProcessor + // 6. start the StreamProcessor w/ optimized instance of StreamGraphFactory + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java new file mode 100644 index 0000000..fa7ec5e --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.task; + +import org.apache.samza.config.Config; +import org.apache.samza.operators.ContextManager; +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.StreamGraphFactory; +import org.apache.samza.operators.StreamGraphImpl; +import org.apache.samza.operators.data.InputMessageEnvelope; +import org.apache.samza.operators.impl.OperatorGraph; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemStream; + +import java.util.HashMap; +import java.util.Map; + + +/** + * Execution of the logic sub-DAG + * + * + * An {@link StreamTask} implementation that receives {@link InputMessageEnvelope}s and propagates them + * through the user's stream transformations defined in {@link StreamGraphImpl} using the + * {@link org.apache.samza.operators.MessageStream} APIs. + * <p> + * This class brings all the operator API implementation components together and feeds the + * {@link InputMessageEnvelope}s into the transformation chains. + * <p> + * It accepts an instance of the user implemented factory {@link StreamGraphFactory} as input parameter of the constructor. + * When its own {@link #init(Config, TaskContext)} method is called during startup, it instantiate a user-defined {@link StreamGraphImpl} + * from the {@link StreamGraphFactory}, calls {@link StreamGraphImpl#getContextManager()} to initialize the task-wide context + * for the graph, and creates a {@link MessageStreamImpl} corresponding to each of its input + * {@link org.apache.samza.system.SystemStreamPartition}s. Each input {@link MessageStreamImpl} + * will be corresponding to either an input stream or intermediate stream in {@link StreamGraphImpl}. + * <p> + * Then, this task calls {@link org.apache.samza.operators.impl.OperatorGraph#init(Map, Config, TaskContext)} for each of the input + * {@link MessageStreamImpl}. This instantiates the {@link org.apache.samza.operators.impl.OperatorImpl} DAG + * corresponding to the aforementioned {@link org.apache.samza.operators.spec.OperatorSpec} DAG and returns the + * root node of the DAG, which this class saves. + * <p> + * Now that it has the root for the DAG corresponding to each {@link org.apache.samza.system.SystemStreamPartition}, it + * can pass the message envelopes received in {@link StreamTask#process(IncomingMessageEnvelope, MessageCollector, TaskCoordinator)} + * along to the appropriate root nodes. From then on, each {@link org.apache.samza.operators.impl.OperatorImpl} propagates + * its transformed output to the next set of {@link org.apache.samza.operators.impl.OperatorImpl}s. + */ +public final class StreamOperatorTask implements StreamTask, InitableTask, WindowableTask, ClosableTask { + + /** + * A mapping from each {@link SystemStream} to the root node of its operator chain DAG. + */ + private final OperatorGraph operatorGraph = new OperatorGraph(); + + private final StreamGraphFactory graphFactory; + + private ContextManager taskManager; + + public StreamOperatorTask(StreamGraphFactory graphFactory) { + this.graphFactory = graphFactory; + } + + @Override + public final void init(Config config, TaskContext context) throws Exception { + // create the MessageStreamsImpl object and initialize app-specific logic DAG within the task + StreamGraphImpl streams = (StreamGraphImpl) this.graphFactory.create(config); + this.taskManager = streams.getContextManager(); + + Map<SystemStream, MessageStreamImpl> inputBySystemStream = new HashMap<>(); + context.getSystemStreamPartitions().forEach(ssp -> { + if (!inputBySystemStream.containsKey(ssp.getSystemStream())) { + inputBySystemStream.putIfAbsent(ssp.getSystemStream(), streams.getInputStream(ssp.getSystemStream())); + } + }); + operatorGraph.init(inputBySystemStream, config, this.taskManager.initTaskContext(config, context)); + } + + @Override + public final void process(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) { + this.operatorGraph.get(ime.getSystemStreamPartition().getSystemStream()) + .onNext(new InputMessageEnvelope(ime), collector, coordinator); + } + + @Override + public final void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception { + // TODO: invoke timer based triggers + } + + @Override + public void close() throws Exception { + this.taskManager.finalizeTaskContext(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/test/java/org/apache/samza/example/BroadcastGraph.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/example/BroadcastGraph.java b/samza-operator/src/test/java/org/apache/samza/example/BroadcastGraph.java new file mode 100644 index 0000000..a91ce09 --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/example/BroadcastGraph.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.example; + +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.StreamGraphFactory; +import org.apache.samza.operators.StreamGraphImpl; +import org.apache.samza.operators.StreamSpec; +import org.apache.samza.operators.data.InputMessageEnvelope; +import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; +import org.apache.samza.operators.data.Offset; +import org.apache.samza.operators.triggers.Triggers; +import org.apache.samza.operators.windows.Windows; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamPartition; + +import java.time.Duration; +import java.util.function.BiFunction; +import java.util.Properties; +import java.util.Set; + + +/** + * Example implementation of split stream tasks + * + */ +public class BroadcastGraph implements StreamGraphFactory { + + private final Set<SystemStreamPartition> inputs; + + BroadcastGraph(Set<SystemStreamPartition> inputs) { + this.inputs = inputs; + } + + class MessageType { + String field1; + String field2; + String field3; + String field4; + String parKey; + private long timestamp; + + public long getTimestamp() { + return this.timestamp; + } + } + + class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> { + JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) { + super(key, data, offset, partition); + } + } + + @Override + public StreamGraph create(Config config) { + StreamGraphImpl graph = new StreamGraphImpl(); + + BiFunction<JsonMessageEnvelope, Integer, Integer> sumAggregator = (m, c) -> c + 1; + inputs.forEach(entry -> { + MessageStream<JsonMessageEnvelope> inputStream = graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() { + @Override public SystemStream getSystemStream() { + return entry.getSystemStream(); + } + + @Override public Properties getProperties() { + return null; + } + }, null, null). + map(this::getInputMessage); + + inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator) + .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10))))); + + inputStream.filter(this::myFilter2).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator) + .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10))))); + + inputStream.filter(this::myFilter3).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator) + .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10))))); + + }); + return graph; + } + + JsonMessageEnvelope getInputMessage(InputMessageEnvelope m1) { + return (JsonMessageEnvelope) m1.getMessage(); + } + + boolean myFilter1(JsonMessageEnvelope m1) { + // Do user defined processing here + return m1.getMessage().parKey.equals("key1"); + } + + boolean myFilter2(JsonMessageEnvelope m1) { + // Do user defined processing here + return m1.getMessage().parKey.equals("key2"); + } + + boolean myFilter3(JsonMessageEnvelope m1) { + return m1.getMessage().parKey.equals("key3"); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/test/java/org/apache/samza/example/JoinGraph.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/example/JoinGraph.java b/samza-operator/src/test/java/org/apache/samza/example/JoinGraph.java new file mode 100644 index 0000000..2313f63 --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/example/JoinGraph.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.example; + +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.StreamGraphFactory; +import org.apache.samza.operators.StreamGraphImpl; +import org.apache.samza.operators.StreamSpec; +import org.apache.samza.operators.data.InputMessageEnvelope; +import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; +import org.apache.samza.operators.data.Offset; +import org.apache.samza.operators.functions.KeyValueJoinFunction; +import org.apache.samza.serializers.JsonSerde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamPartition; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.Set; + + +/** + * Example implementation of unique key-based stream-stream join tasks + * + */ +public class JoinGraph implements StreamGraphFactory { + private final Set<SystemStreamPartition> inputs; + + JoinGraph(Set<SystemStreamPartition> inputs) { + this.inputs = inputs; + } + + class MessageType { + String joinKey; + List<String> joinFields = new ArrayList<>(); + } + + class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> { + JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) { + super(key, data, offset, partition); + } + } + + MessageStream<JsonMessageEnvelope> joinOutput = null; + + @Override + public StreamGraph create(Config config) { + StreamGraphImpl graph = new StreamGraphImpl(); + + for (SystemStreamPartition input : inputs) { + MessageStream<JsonMessageEnvelope> newSource = graph.<Object, Object, InputMessageEnvelope>createInStream( + new StreamSpec() { + @Override public SystemStream getSystemStream() { + return input.getSystemStream(); + } + + @Override public Properties getProperties() { + return null; + } + }, null, null).map(this::getInputMessage); + if (joinOutput == null) { + joinOutput = newSource; + } else { + joinOutput = joinOutput.join(newSource, + (KeyValueJoinFunction<String, JsonMessageEnvelope, JsonMessageEnvelope, JsonMessageEnvelope>) this::myJoinResult); + } + } + + joinOutput.sendTo(graph.createOutStream(new StreamSpec() { + @Override public SystemStream getSystemStream() { + return null; + } + + @Override public Properties getProperties() { + return null; + } + }, new StringSerde("UTF-8"), new JsonSerde<>())); + + return graph; + } + + private JsonMessageEnvelope getInputMessage(InputMessageEnvelope ism) { + return new JsonMessageEnvelope( + ((MessageType) ism.getMessage()).joinKey, + (MessageType) ism.getMessage(), + ism.getOffset(), + ism.getSystemStreamPartition()); + } + + JsonMessageEnvelope myJoinResult(JsonMessageEnvelope m1, JsonMessageEnvelope m2) { + MessageType newJoinMsg = new MessageType(); + newJoinMsg.joinKey = m1.getKey(); + newJoinMsg.joinFields.addAll(m1.getMessage().joinFields); + newJoinMsg.joinFields.addAll(m2.getMessage().joinFields); + return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java new file mode 100644 index 0000000..ad6336a --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.example; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.StreamGraphFactory; +import org.apache.samza.config.Config; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.StreamSpec; +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.serializers.JsonSerde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.storage.kv.KeyValueStore; +import org.apache.samza.system.ExecutionEnvironment; +import org.apache.samza.system.SystemStream; +import org.apache.samza.task.TaskContext; +import org.apache.samza.util.CommandLine; + +import java.util.Properties; + + +/** + * Example code using {@link KeyValueStore} to implement event-time window + */ +public class KeyValueStoreExample implements StreamGraphFactory { + + /** + * used by remote execution environment to launch the job in remote program. The remote program should follow the similar + * invoking context as in standalone: + * + * public static void main(String args[]) throws Exception { + * CommandLine cmdLine = new CommandLine(); + * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + * ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config); + * UserMainExample runnableApp = new UserMainExample(); + * runnableApp.run(remoteEnv, config); + * } + * + */ + @Override public StreamGraph create(Config config) { + StreamGraph graph = StreamGraph.fromConfig(config); + + MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>()); + OutputStream<StatsOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<StatsOutput>()); + + pageViewEvents. + partitionBy(m -> m.getMessage().memberId). + flatMap(new MyStatsCounter()). + sendTo(pageViewPerMemberCounters); + + return graph; + } + + // standalone local program model + public static void main(String[] args) throws Exception { + CommandLine cmdLine = new CommandLine(); + Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config); + standaloneEnv.run(new KeyValueStoreExample(), config); + } + + class MyStatsCounter implements FlatMapFunction<PageViewEvent, StatsOutput> { + private final int timeoutMs = 10 * 60 * 1000; + + KeyValueStore<String, StatsWindowState> statsStore; + + class StatsWindowState { + int lastCount = 0; + long timeAtLastOutput = 0; + int newCount = 0; + } + + @Override + public Collection<StatsOutput> apply(PageViewEvent message) { + List<StatsOutput> outputStats = new ArrayList<>(); + long wndTimestamp = (long) Math.floor(TimeUnit.MILLISECONDS.toMinutes(message.getMessage().timestamp) / 5) * 5; + String wndKey = String.format("%s-%d", message.getMessage().memberId, wndTimestamp); + StatsWindowState curState = this.statsStore.get(wndKey); + curState.newCount++; + long curTimeMs = System.currentTimeMillis(); + if (curState.newCount > 0 && curState.timeAtLastOutput + timeoutMs < curTimeMs) { + curState.timeAtLastOutput = curTimeMs; + curState.lastCount += curState.newCount; + curState.newCount = 0; + outputStats.add(new StatsOutput(message.getMessage().memberId, wndTimestamp, curState.lastCount)); + } + // update counter w/o generating output + this.statsStore.put(wndKey, curState); + return outputStats; + } + + @Override + public void init(Config config, TaskContext context) { + this.statsStore = (KeyValueStore<String, StatsWindowState>) context.getStore("my-stats-wnd-store"); + } + } + + StreamSpec input1 = new StreamSpec() { + @Override public SystemStream getSystemStream() { + return new SystemStream("kafka", "PageViewEvent"); + } + + @Override public Properties getProperties() { + return null; + } + }; + + StreamSpec output = new StreamSpec() { + @Override public SystemStream getSystemStream() { + return new SystemStream("kafka", "PageViewPerMember5min"); + } + + @Override public Properties getProperties() { + return null; + } + }; + + class PageViewEvent implements MessageEnvelope<String, PageViewEvent> { + String pageId; + String memberId; + long timestamp; + + PageViewEvent(String pageId, String memberId, long timestamp) { + this.pageId = pageId; + this.memberId = memberId; + this.timestamp = timestamp; + } + + @Override + public String getKey() { + return this.pageId; + } + + @Override + public PageViewEvent getMessage() { + return this; + } + } + + class StatsOutput implements MessageEnvelope<String, StatsOutput> { + private String memberId; + private long timestamp; + private Integer count; + + StatsOutput(String key, long timestamp, Integer count) { + this.memberId = key; + this.timestamp = timestamp; + this.count = count; + } + + @Override + public String getKey() { + return this.memberId; + } + + @Override + public StatsOutput getMessage() { + return this; + } + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java b/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java new file mode 100644 index 0000000..577d06f --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.example; + +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.StreamGraphFactory; +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.StreamSpec; +import org.apache.samza.operators.data.InputMessageEnvelope; +import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; +import org.apache.samza.operators.data.Offset; +import org.apache.samza.operators.functions.JoinFunction; +import org.apache.samza.serializers.JsonSerde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.ExecutionEnvironment; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.CommandLine; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + + +/** + * Example {@link StreamGraphFactory} code to test the API methods + */ +public class NoContextStreamExample implements StreamGraphFactory { + + StreamSpec input1 = new StreamSpec() { + @Override public SystemStream getSystemStream() { + return new SystemStream("kafka", "input1"); + } + + @Override public Properties getProperties() { + return null; + } + }; + + StreamSpec input2 = new StreamSpec() { + @Override public SystemStream getSystemStream() { + return new SystemStream("kafka", "input2"); + } + + @Override public Properties getProperties() { + return null; + } + }; + + StreamSpec output = new StreamSpec() { + @Override public SystemStream getSystemStream() { + return new SystemStream("kafka", "output"); + } + + @Override public Properties getProperties() { + return null; + } + }; + + class MessageType { + String joinKey; + List<String> joinFields = new ArrayList<>(); + } + + class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> { + JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) { + super(key, data, offset, partition); + } + } + + private JsonMessageEnvelope getInputMessage(InputMessageEnvelope ism) { + return new JsonMessageEnvelope( + ((MessageType) ism.getMessage()).joinKey, + (MessageType) ism.getMessage(), + ism.getOffset(), + ism.getSystemStreamPartition()); + } + + class MyJoinFunction implements JoinFunction<String, JsonMessageEnvelope, JsonMessageEnvelope, JsonIncomingSystemMessageEnvelope<MessageType>> { + + @Override + public JsonIncomingSystemMessageEnvelope<MessageType> apply(JsonMessageEnvelope m1, + JsonMessageEnvelope m2) { + MessageType newJoinMsg = new MessageType(); + newJoinMsg.joinKey = m1.getKey(); + newJoinMsg.joinFields.addAll(m1.getMessage().joinFields); + newJoinMsg.joinFields.addAll(m2.getMessage().joinFields); + return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null); + } + + @Override + public String getFirstKey(JsonMessageEnvelope message) { + return message.getKey(); + } + + @Override + public String getSecondKey(JsonMessageEnvelope message) { + return message.getKey(); + } + } + + /** + * used by remote execution environment to launch the job in remote program. The remote program should follow the similar + * invoking context as in standalone: + * + * public static void main(String args[]) throws Exception { + * CommandLine cmdLine = new CommandLine(); + * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + * ExecutionEnvironment remoteEnv = ExecutionEnvironment.fromConfig(config); + * remoteEnv.run(new NoContextStreamExample(), config); + * } + * + */ + @Override public StreamGraph create(Config config) { + StreamGraph graph = StreamGraph.fromConfig(config); + MessageStream<InputMessageEnvelope> inputSource1 = graph.<Object, Object, InputMessageEnvelope>createInStream( + input1, null, null); + MessageStream<InputMessageEnvelope> inputSource2 = graph.<Object, Object, InputMessageEnvelope>createInStream( + input2, null, null); + OutputStream<JsonIncomingSystemMessageEnvelope<MessageType>> outStream = graph.createOutStream(output, + new StringSerde("UTF-8"), new JsonSerde<>()); + + inputSource1.map(this::getInputMessage). + join(inputSource2.map(this::getInputMessage), new MyJoinFunction()). + sendTo(outStream); + + return graph; + } + + // standalone local program model + public static void main(String[] args) throws Exception { + CommandLine cmdLine = new CommandLine(); + Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config); + standaloneEnv.run(new NoContextStreamExample(), config); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java new file mode 100644 index 0000000..ad433b6 --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.example; + +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.StreamGraphFactory; +import org.apache.samza.config.Config; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.StreamSpec; +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.functions.JoinFunction; +import org.apache.samza.serializers.JsonSerde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.ExecutionEnvironment; +import org.apache.samza.system.SystemStream; +import org.apache.samza.util.CommandLine; + +import java.util.Properties; + + +/** + * Simple 2-way stream-to-stream join example + */ +public class OrderShipmentJoinExample implements StreamGraphFactory { + + /** + * used by remote execution environment to launch the job in remote program. The remote program should follow the similar + * invoking context as in standalone: + * + * public static void main(String args[]) throws Exception { + * CommandLine cmdLine = new CommandLine(); + * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + * ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config); + * UserMainExample runnableApp = new UserMainExample(); + * runnableApp.run(remoteEnv, config); + * } + * + */ + @Override public StreamGraph create(Config config) { + StreamGraph graph = StreamGraph.fromConfig(config); + + MessageStream<OrderRecord> orders = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>()); + MessageStream<ShipmentRecord> shipments = graph.createInStream(input2, new StringSerde("UTF-8"), new JsonSerde<>()); + OutputStream<FulFilledOrderRecord> fulfilledOrders = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>()); + + orders.join(shipments, new MyJoinFunction()).sendTo(fulfilledOrders); + + return graph; + } + + // standalone local program model + public static void main(String[] args) throws Exception { + CommandLine cmdLine = new CommandLine(); + Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config); + standaloneEnv.run(new OrderShipmentJoinExample(), config); + } + + StreamSpec input1 = new StreamSpec() { + @Override public SystemStream getSystemStream() { + return new SystemStream("kafka", "Orders"); + } + + @Override public Properties getProperties() { + return null; + } + }; + + StreamSpec input2 = new StreamSpec() { + @Override public SystemStream getSystemStream() { + return new SystemStream("kafka", "Shipment"); + } + + @Override public Properties getProperties() { + return null; + } + }; + + StreamSpec output = new StreamSpec() { + @Override public SystemStream getSystemStream() { + return new SystemStream("kafka", "FulfilledOrders"); + } + + @Override public Properties getProperties() { + return null; + } + }; + + class OrderRecord implements MessageEnvelope<String, OrderRecord> { + String orderId; + long orderTimeMs; + + OrderRecord(String orderId, long timeMs) { + this.orderId = orderId; + this.orderTimeMs = timeMs; + } + + @Override + public String getKey() { + return this.orderId; + } + + @Override + public OrderRecord getMessage() { + return this; + } + } + + class ShipmentRecord implements MessageEnvelope<String, ShipmentRecord> { + String orderId; + long shipTimeMs; + + ShipmentRecord(String orderId, long timeMs) { + this.orderId = orderId; + this.shipTimeMs = timeMs; + } + + @Override + public String getKey() { + return this.orderId; + } + + @Override + public ShipmentRecord getMessage() { + return this; + } + } + + class FulFilledOrderRecord implements MessageEnvelope<String, FulFilledOrderRecord> { + String orderId; + long orderTimeMs; + long shipTimeMs; + + FulFilledOrderRecord(String orderId, long orderTimeMs, long shipTimeMs) { + this.orderId = orderId; + this.orderTimeMs = orderTimeMs; + this.shipTimeMs = shipTimeMs; + } + + + @Override + public String getKey() { + return this.orderId; + } + + @Override + public FulFilledOrderRecord getMessage() { + return this; + } + } + + FulFilledOrderRecord myJoinResult(OrderRecord m1, ShipmentRecord m2) { + return new FulFilledOrderRecord(m1.getKey(), m1.orderTimeMs, m2.shipTimeMs); + } + + class MyJoinFunction implements JoinFunction<String, OrderRecord, ShipmentRecord, FulFilledOrderRecord> { + + @Override + public FulFilledOrderRecord apply(OrderRecord message, ShipmentRecord otherMessage) { + return OrderShipmentJoinExample.this.myJoinResult(message, otherMessage); + } + + @Override + public String getFirstKey(OrderRecord message) { + return message.getKey(); + } + + @Override + public String getSecondKey(ShipmentRecord message) { + return message.getKey(); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java new file mode 100644 index 0000000..1502aa2 --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.example; + +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.StreamGraphFactory; +import org.apache.samza.config.Config; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.StreamSpec; +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.triggers.Triggers; +import org.apache.samza.operators.windows.AccumulationMode; +import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.operators.windows.Windows; +import org.apache.samza.serializers.JsonSerde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.ExecutionEnvironment; +import org.apache.samza.system.SystemStream; +import org.apache.samza.util.CommandLine; + +import java.time.Duration; +import java.util.Properties; + + +/** + * Example code to implement window-based counter + */ +public class PageViewCounterExample implements StreamGraphFactory { + + @Override public StreamGraph create(Config config) { + StreamGraph graph = StreamGraph.fromConfig(config); + + MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>()); + OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>()); + + pageViewEvents. + window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.getMessage().memberId, Duration.ofSeconds(10), (m, c) -> c + 1). + setEarlyTrigger(Triggers.repeat(Triggers.count(5))). + setAccumulationMode(AccumulationMode.DISCARDING)). + map(MyStreamOutput::new). + sendTo(pageViewPerMemberCounters); + return graph; + } + + public static void main(String[] args) { + CommandLine cmdLine = new CommandLine(); + Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config); + standaloneEnv.run(new PageViewCounterExample(), config); + } + + StreamSpec input1 = new StreamSpec() { + @Override public SystemStream getSystemStream() { + return new SystemStream("kafka", "PageViewEvent"); + } + + @Override public Properties getProperties() { + return null; + } + }; + + StreamSpec output = new StreamSpec() { + @Override public SystemStream getSystemStream() { + return new SystemStream("kafka", "PageViewPerMember5min"); + } + + @Override public Properties getProperties() { + return null; + } + }; + + class PageViewEvent implements MessageEnvelope<String, PageViewEvent> { + String pageId; + String memberId; + long timestamp; + + PageViewEvent(String pageId, String memberId, long timestamp) { + this.pageId = pageId; + this.memberId = memberId; + this.timestamp = timestamp; + } + + @Override + public String getKey() { + return this.pageId; + } + + @Override + public PageViewEvent getMessage() { + return this; + } + } + + class MyStreamOutput implements MessageEnvelope<String, MyStreamOutput> { + String memberId; + long timestamp; + int count; + + MyStreamOutput(WindowPane<String, Integer> m) { + this.memberId = m.getKey().getKey(); + this.timestamp = Long.valueOf(m.getKey().getPaneId()); + this.count = m.getMessage(); + } + + @Override + public String getKey() { + return this.memberId; + } + + @Override + public MyStreamOutput getMessage() { + return this; + } + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java new file mode 100644 index 0000000..f15e514 --- /dev/null +++ b/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.example; + +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.StreamGraphFactory; +import org.apache.samza.config.Config; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.StreamSpec; +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.operators.windows.Windows; +import org.apache.samza.serializers.JsonSerde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.ExecutionEnvironment; +import org.apache.samza.system.SystemStream; +import org.apache.samza.util.CommandLine; + +import java.time.Duration; +import java.util.*; + + +/** + * Example {@link StreamGraphFactory} code to test the API methods with re-partition operator + */ +public class RepartitionExample implements StreamGraphFactory { + + /** + * used by remote execution environment to launch the job in remote program. The remote program should follow the similar + * invoking context as in standalone: + * + * public static void main(String args[]) throws Exception { + * CommandLine cmdLine = new CommandLine(); + * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + * ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config); + * remoteEnv.run(new UserMainExample(), config); + * } + * + */ + @Override public StreamGraph create(Config config) { + StreamGraph graph = StreamGraph.fromConfig(config); + + MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>()); + OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>()); + + pageViewEvents. + partitionBy(m -> m.getMessage().memberId). + window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow( + msg -> msg.getMessage().memberId, Duration.ofMinutes(5), (m, c) -> c + 1)). + map(MyStreamOutput::new). + sendTo(pageViewPerMemberCounters); + + return graph; + } + + // standalone local program model + public static void main(String[] args) throws Exception { + CommandLine cmdLine = new CommandLine(); + Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); + ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config); + standaloneEnv.run(new RepartitionExample(), config); + } + + StreamSpec input1 = new StreamSpec() { + @Override public SystemStream getSystemStream() { + return new SystemStream("kafka", "PageViewEvent"); + } + + @Override public Properties getProperties() { + return null; + } + }; + + StreamSpec output = new StreamSpec() { + @Override public SystemStream getSystemStream() { + return new SystemStream("kafka", "PageViewPerMember5min"); + } + + @Override public Properties getProperties() { + return null; + } + }; + + class PageViewEvent implements MessageEnvelope<String, PageViewEvent> { + String pageId; + String memberId; + long timestamp; + + PageViewEvent(String pageId, String memberId, long timestamp) { + this.pageId = pageId; + this.memberId = memberId; + this.timestamp = timestamp; + } + + @Override + public String getKey() { + return this.pageId; + } + + @Override + public PageViewEvent getMessage() { + return this; + } + } + + class MyStreamOutput implements MessageEnvelope<String, MyStreamOutput> { + String memberId; + long timestamp; + int count; + + MyStreamOutput(WindowPane<String, Integer> m) { + this.memberId = m.getKey().getKey(); + this.timestamp = Long.valueOf(m.getKey().getPaneId()); + this.count = m.getMessage(); + } + + @Override + public String getKey() { + return this.memberId; + } + + @Override + public MyStreamOutput getMessage() { + return this; + } + } + +}