SAMZA-1073: top-level fluent API `Initial draft of top-level fluent API for operator DAGs
Author: Yi Pan (Data Infrastructure) <nickpa...@gmail.com> Reviewers: Xinyu Liu <xi...@linkedin.com>, Jacob Maes <jm...@linkedin.com>, Prateek Maheshwari <pmahe...@linkedin.com> Closes #51 from nickpan47/samza-fluent-api-v1 and squashes the following commits: 001be63 [Yi Pan (Data Infrastructure)] SAMZA-1073: Addressing review feedbacks. Change StreamGraphFactory to StreamGraphBuilder. 373048a [Yi Pan (Data Infrastructure)] SAMZA-1073: top-level fluent API ` Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c249443b Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c249443b Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c249443b Branch: refs/heads/samza-fluent-api-v1 Commit: c249443b1a5b427bc3fab0303b3069133ec0caad Parents: 09bf833 Author: Yi Pan (Data Infrastructure) <nickpa...@gmail.com> Authored: Thu Feb 16 10:18:09 2017 -0800 Committer: Yi Pan (Data Infrastructure) <nickpa...@gmail.com> Committed: Thu Feb 16 10:18:09 2017 -0800 ---------------------------------------------------------------------- .../apache/samza/operators/ContextManager.java | 47 ++++ .../apache/samza/operators/MessageStream.java | 80 +++--- .../apache/samza/operators/OutputStream.java | 41 +++ .../org/apache/samza/operators/StreamGraph.java | 94 +++++++ .../samza/operators/StreamGraphBuilder.java | 38 +++ .../samza/operators/StreamOperatorTask.java | 51 ---- .../org/apache/samza/operators/StreamSpec.java | 46 ++++ .../data/IncomingSystemMessageEnvelope.java | 63 ----- .../operators/data/InputMessageEnvelope.java | 63 +++++ .../samza/operators/data/MessageEnvelope.java | 2 +- .../operators/functions/FilterFunction.java | 13 +- .../operators/functions/FlatMapFunction.java | 15 +- .../operators/functions/InitableFunction.java | 40 +++ .../samza/operators/functions/JoinFunction.java | 39 ++- .../samza/operators/functions/MapFunction.java | 15 +- .../samza/operators/functions/SinkFunction.java | 13 +- .../samza/operators/triggers/AnyTrigger.java | 3 +- .../samza/operators/triggers/CountTrigger.java | 4 +- .../operators/triggers/RepeatingTrigger.java | 4 +- .../triggers/TimeSinceFirstMessageTrigger.java | 3 +- .../triggers/TimeSinceLastMessageTrigger.java | 4 +- .../samza/operators/triggers/TimeTrigger.java | 4 +- .../samza/operators/triggers/Trigger.java | 7 +- .../samza/operators/triggers/Triggers.java | 41 +-- .../apache/samza/operators/windows/Window.java | 20 +- .../samza/operators/windows/WindowKey.java | 19 +- .../samza/operators/windows/WindowPane.java | 9 +- .../apache/samza/operators/windows/Windows.java | 136 +++++----- .../windows/internal/WindowInternal.java | 14 +- .../samza/system/ExecutionEnvironment.java | 73 ++++++ .../java/org/apache/samza/task/TaskContext.java | 10 + .../data/TestIncomingSystemMessage.java | 2 +- .../operators/windows/TestWindowOutput.java | 35 --- .../samza/operators/windows/TestWindowPane.java | 33 +++ .../samza/operators/MessageStreamImpl.java | 146 +++++++---- .../apache/samza/operators/StreamGraphImpl.java | 260 +++++++++++++++++++ .../operators/StreamOperatorAdaptorTask.java | 105 -------- .../functions/PartialJoinFunction.java | 56 ++++ .../samza/operators/impl/OperatorGraph.java | 164 ++++++++++++ .../samza/operators/impl/OperatorImpl.java | 22 +- .../samza/operators/impl/OperatorImpls.java | 124 --------- .../operators/impl/PartialJoinOperatorImpl.java | 15 +- .../samza/operators/impl/RootOperatorImpl.java | 7 +- .../impl/SessionWindowOperatorImpl.java | 52 ++++ .../samza/operators/impl/SinkOperatorImpl.java | 7 +- .../operators/impl/StreamOperatorImpl.java | 14 +- .../operators/impl/WindowOperatorImpl.java | 11 +- .../samza/operators/spec/OperatorSpec.java | 39 ++- .../samza/operators/spec/OperatorSpecs.java | 161 +++++++++--- .../operators/spec/PartialJoinOperatorSpec.java | 58 +++-- .../samza/operators/spec/SinkOperatorSpec.java | 70 ++++- .../operators/spec/StreamOperatorSpec.java | 58 +++-- .../operators/spec/WindowOperatorSpec.java | 41 ++- .../samza/operators/spec/WindowState.java | 16 +- .../system/RemoteExecutionEnvironment.java | 37 +++ .../system/StandaloneExecutionEnvironment.java | 50 ++++ .../apache/samza/task/StreamOperatorTask.java | 111 ++++++++ .../samza/example/KeyValueStoreExample.java | 180 +++++++++++++ .../samza/example/NoContextStreamExample.java | 151 +++++++++++ .../samza/example/OrderShipmentJoinExample.java | 188 ++++++++++++++ .../samza/example/PageViewCounterExample.java | 129 +++++++++ .../samza/example/RepartitionExample.java | 140 ++++++++++ .../samza/example/TestBasicStreamGraphs.java | 99 +++++++ .../samza/example/TestBroadcastExample.java | 113 ++++++++ .../apache/samza/example/TestExampleBase.java | 46 ++++ .../apache/samza/example/TestJoinExample.java | 129 +++++++++ .../apache/samza/example/TestWindowExample.java | 81 ++++++ .../apache/samza/operators/BroadcastTask.java | 96 ------- .../org/apache/samza/operators/JoinTask.java | 77 ------ .../operators/TestFluentStreamAdaptorTask.java | 85 ------ .../samza/operators/TestFluentStreamTasks.java | 112 -------- .../samza/operators/TestMessageStreamImpl.java | 65 +++-- .../operators/TestMessageStreamImplUtil.java | 26 ++ .../org/apache/samza/operators/WindowTask.java | 63 ----- .../samza/operators/impl/TestOperatorImpls.java | 107 ++++++-- .../operators/impl/TestSinkOperatorImpl.java | 11 +- .../operators/impl/TestStreamOperatorImpl.java | 20 +- .../samza/operators/spec/TestOperatorSpecs.java | 65 +++-- 78 files changed, 3381 insertions(+), 1307 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/main/java/org/apache/samza/operators/ContextManager.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/ContextManager.java b/samza-api/src/main/java/org/apache/samza/operators/ContextManager.java new file mode 100644 index 0000000..c3b1cf3 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/ContextManager.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.config.Config; +import org.apache.samza.task.TaskContext; + + +/** + * Interface class defining methods to initialize and finalize the context used by the transformation functions. + */ +@InterfaceStability.Unstable +public interface ContextManager { + /** + * The initialization method to create shared context for the whole task in Samza. Default to NO-OP + * + * @param config the configuration object for the task + * @param context the {@link TaskContext} object + * @return User-defined task-wide context object + */ + default TaskContext initTaskContext(Config config, TaskContext context) { + return context; + } + + /** + * The finalize method to allow users to close resource initialized in {@link #initTaskContext} method. Default to NO-OP. + * + */ + default void finalizeTaskContext() { } +} http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java index 6a2f95b..adeb4c8 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java +++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java @@ -19,7 +19,6 @@ package org.apache.samza.operators; import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.operators.data.MessageEnvelope; import org.apache.samza.operators.functions.FilterFunction; import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.operators.functions.JoinFunction; @@ -29,73 +28,83 @@ import org.apache.samza.operators.windows.Window; import org.apache.samza.operators.windows.WindowPane; import java.util.Collection; +import java.util.function.Function; /** - * Represents a stream of {@link MessageEnvelope}s. + * Represents a stream of messages. * <p> * A {@link MessageStream} can be transformed into another {@link MessageStream} by applying the transforms in this API. * - * @param <M> type of {@link MessageEnvelope}s in this stream + * @param <M> type of messages in this stream */ @InterfaceStability.Unstable -public interface MessageStream<M extends MessageEnvelope> { +public interface MessageStream<M> { /** - * Applies the provided 1:1 {@link MapFunction} to {@link MessageEnvelope}s in this {@link MessageStream} and returns the + * Applies the provided 1:1 function to messages in this {@link MessageStream} and returns the * transformed {@link MessageStream}. * - * @param mapFn the function to transform a {@link MessageEnvelope} to another {@link MessageEnvelope} - * @param <TM> the type of {@link MessageEnvelope}s in the transformed {@link MessageStream} + * @param mapFn the function to transform a message to another message + * @param <TM> the type of messages in the transformed {@link MessageStream} * @return the transformed {@link MessageStream} */ - <TM extends MessageEnvelope> MessageStream<TM> map(MapFunction<M, TM> mapFn); + <TM> MessageStream<TM> map(MapFunction<M, TM> mapFn); /** - * Applies the provided 1:n {@link FlatMapFunction} to transform a {@link MessageEnvelope} in this {@link MessageStream} - * to n {@link MessageEnvelope}s in the transformed {@link MessageStream} + * Applies the provided 1:n function to transform a message in this {@link MessageStream} + * to n messages in the transformed {@link MessageStream} * - * @param flatMapFn the function to transform a {@link MessageEnvelope} to zero or more {@link MessageEnvelope}s - * @param <TM> the type of {@link MessageEnvelope}s in the transformed {@link MessageStream} + * @param flatMapFn the function to transform a message to zero or more messages + * @param <TM> the type of messages in the transformed {@link MessageStream} * @return the transformed {@link MessageStream} */ - <TM extends MessageEnvelope> MessageStream<TM> flatMap(FlatMapFunction<M, TM> flatMapFn); + <TM> MessageStream<TM> flatMap(FlatMapFunction<M, TM> flatMapFn); /** - * Applies the provided {@link FilterFunction} to {@link MessageEnvelope}s in this {@link MessageStream} and returns the + * Applies the provided function to messages in this {@link MessageStream} and returns the * transformed {@link MessageStream}. * <p> - * The {@link FilterFunction} is a predicate which determines whether a {@link MessageEnvelope} in this {@link MessageStream} + * The {@link Function} is a predicate which determines whether a message in this {@link MessageStream} * should be retained in the transformed {@link MessageStream}. * - * @param filterFn the predicate to filter {@link MessageEnvelope}s from this {@link MessageStream} + * @param filterFn the predicate to filter messages from this {@link MessageStream} * @return the transformed {@link MessageStream} */ MessageStream<M> filter(FilterFunction<M> filterFn); /** - * Allows sending {@link MessageEnvelope}s in this {@link MessageStream} to an output - * {@link org.apache.samza.system.SystemStream} using the provided {@link SinkFunction}. + * Allows sending messages in this {@link MessageStream} to an output using the provided {@link SinkFunction}. * - * @param sinkFn the function to send {@link MessageEnvelope}s in this stream to output systems + * NOTE: the output may not be a {@link org.apache.samza.system.SystemStream}. It can be an external database, etc. + * + * @param sinkFn the function to send messages in this stream to output */ void sink(SinkFunction<M> sinkFn); /** - * Groups and processes the {@link MessageEnvelope}s in this {@link MessageStream} according to the provided {@link Window} + * Allows sending messages in this {@link MessageStream} to an output {@link MessageStream}. + * + * NOTE: the {@code stream} has to be a {@link MessageStream}. + * + * @param stream the output {@link MessageStream} + */ + void sendTo(OutputStream<M> stream); + + /** + * Groups the messages in this {@link MessageStream} according to the provided {@link Window} semantics * (e.g. tumbling, sliding or session windows) and returns the transformed {@link MessageStream} of * {@link WindowPane}s. * <p> * Use the {@link org.apache.samza.operators.windows.Windows} helper methods to create the appropriate windows. * - * @param window the window to group and process {@link MessageEnvelope}s from this {@link MessageStream} - * @param <K> the type of key in the {@link MessageEnvelope} in this {@link MessageStream}. If a key is specified, + * @param window the window to group and process messages from this {@link MessageStream} + * @param <K> the type of key in the message in this {@link MessageStream}. If a key is specified, * panes are emitted per-key. * @param <WV> the type of value in the {@link WindowPane} in the transformed {@link MessageStream} - * @param <WM> the type of {@link WindowPane} in the transformed {@link MessageStream} * @return the transformed {@link MessageStream} */ - <K, WV, WM extends WindowPane<K, WV>> MessageStream<WM> window(Window<M, K, WV, WM> window); + <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window); /** * Joins this {@link MessageStream} with another {@link MessageStream} using the provided pairwise {@link JoinFunction}. @@ -103,23 +112,32 @@ public interface MessageStream<M extends MessageEnvelope> { * We currently only support 2-way joins. * * @param otherStream the other {@link MessageStream} to be joined with - * @param joinFn the function to join {@link MessageEnvelope}s from this and the other {@link MessageStream} + * @param joinFn the function to join messages from this and the other {@link MessageStream} * @param <K> the type of join key - * @param <OM> the type of {@link MessageEnvelope}s in the other stream - * @param <RM> the type of {@link MessageEnvelope}s resulting from the {@code joinFn} + * @param <OM> the type of messages in the other stream + * @param <RM> the type of messages resulting from the {@code joinFn} * @return the joined {@link MessageStream} */ - <K, OM extends MessageEnvelope<K, ?>, RM extends MessageEnvelope> MessageStream<RM> join(MessageStream<OM> otherStream, - JoinFunction<M, OM, RM> joinFn); + <K, OM, RM> MessageStream<RM> join(MessageStream<OM> otherStream, JoinFunction<K, M, OM, RM> joinFn); /** * Merge all {@code otherStreams} with this {@link MessageStream}. * <p> - * The merging streams must have the same {@link MessageEnvelope} type {@code M}. + * The merging streams must have the same messages of type {@code M}. * * @param otherStreams other {@link MessageStream}s to be merged with this {@link MessageStream} * @return the merged {@link MessageStream} */ MessageStream<M> merge(Collection<MessageStream<M>> otherStreams); - + + /** + * Send the input message to an output {@link org.apache.samza.system.SystemStream} and consume it as input {@link MessageStream} again. + * + * Note: this is an transform function only used in logic DAG. In a physical DAG, this is either translated to a NOOP function, or a {@code MessageStream#sendThrough} function. + * + * @param parKeyExtractor a {@link Function} that extract the partition key from a message in this {@link MessageStream} + * @param <K> the type of partition key + * @return a {@link MessageStream} object after the re-partition + */ + <K> MessageStream<M> partitionBy(Function<M, K> parKeyExtractor); } http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java b/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java new file mode 100644 index 0000000..179f0e7 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.operators.functions.SinkFunction; + + +/** + * The interface class defining the specific {@link SinkFunction} for a system {@link OutputStream}. + * + * @param <M> The type of message to be send to this output stream + */ +@InterfaceStability.Unstable +public interface OutputStream<M> { + + /** + * Returns the specific {@link SinkFunction} for this output stream. The {@link OutputStream} is created + * via {@link StreamGraph#createOutStream(StreamSpec, Serde, Serde)} or {@link StreamGraph#createIntStream(StreamSpec, Serde, Serde)}. + * Hence, the proper types of serdes for key and value are instantiated and are used in the {@link SinkFunction} returned. + * + * @return The pre-defined {@link SinkFunction} to apply proper serdes before sending the message to the output stream. + */ + SinkFunction<M> getSinkFunction(); +} http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java new file mode 100644 index 0000000..abc9861 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.serializers.Serde; + +import java.util.Map; + + +/** + * Job-level programming interface to create an operator DAG and run in various different runtime environments. + */ +@InterfaceStability.Unstable +public interface StreamGraph { + /** + * Method to add an input {@link MessageStream} from the system + * + * @param streamSpec the {@link StreamSpec} describing the physical characteristics of the input {@link MessageStream} + * @param keySerde the serde used to serialize/deserialize the message key from the input {@link MessageStream} + * @param msgSerde the serde used to serialize/deserialize the message body from the input {@link MessageStream} + * @param <K> the type of key in the input message + * @param <V> the type of message in the input message + * @param <M> the type of {@link MessageEnvelope} in the input {@link MessageStream} + * @return the input {@link MessageStream} object + */ + <K, V, M extends MessageEnvelope<K, V>> MessageStream<M> createInStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde); + + /** + * Method to add an output {@link MessageStream} from the system + * + * @param streamSpec the {@link StreamSpec} describing the physical characteristics of the output {@link MessageStream} + * @param keySerde the serde used to serialize/deserialize the message key from the output {@link MessageStream} + * @param msgSerde the serde used to serialize/deserialize the message body from the output {@link MessageStream} + * @param <K> the type of key in the output message + * @param <V> the type of message in the output message + * @param <M> the type of {@link MessageEnvelope} in the output {@link MessageStream} + * @return the output {@link MessageStream} object + */ + <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createOutStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde); + + /** + * Method to add an intermediate {@link MessageStream} from the system + * + * @param streamSpec the {@link StreamSpec} describing the physical characteristics of the intermediate {@link MessageStream} + * @param keySerde the serde used to serialize/deserialize the message key from the intermediate {@link MessageStream} + * @param msgSerde the serde used to serialize/deserialize the message body from the intermediate {@link MessageStream} + * @param <K> the type of key in the intermediate message + * @param <V> the type of message in the intermediate message + * @param <M> the type of {@link MessageEnvelope} in the intermediate {@link MessageStream} + * @return the intermediate {@link MessageStream} object + */ + <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createIntStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde); + + /** + * Method to get the input {@link MessageStream}s + * + * @return the input {@link MessageStream} + */ + Map<StreamSpec, MessageStream> getInStreams(); + + /** + * Method to get the {@link OutputStream}s + * + * @return the map of all {@link OutputStream}s + */ + Map<StreamSpec, OutputStream> getOutStreams(); + + /** + * Method to set the {@link ContextManager} for this {@link StreamGraph} + * + * @param manager the {@link ContextManager} object + * @return this {@link StreamGraph} object + */ + StreamGraph withContextManager(ContextManager manager); + +} http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java b/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java new file mode 100644 index 0000000..b415cf8 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.config.Config; + + +/** + * This interface defines a factory class that user will implement to create user-defined operator DAG in a {@link StreamGraph} object. + */ +@InterfaceStability.Unstable +public interface StreamGraphBuilder { + /** + * Users are required to implement this abstract method to initialize the processing logic of the application, in terms + * of a DAG of {@link org.apache.samza.operators.MessageStream}s and operators + * + * @param graph an empty {@link StreamGraph} object to be initialized + * @param config the {@link Config} of the application + */ + void init(StreamGraph graph, Config config); +} http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/main/java/org/apache/samza/operators/StreamOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamOperatorTask.java b/samza-api/src/main/java/org/apache/samza/operators/StreamOperatorTask.java deleted file mode 100644 index 16cf27a..0000000 --- a/samza-api/src/main/java/org/apache/samza/operators/StreamOperatorTask.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators; - -import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.operators.data.IncomingSystemMessageEnvelope; -import org.apache.samza.system.SystemStreamPartition; - -import java.util.Map; - - -/** - * A {@link StreamOperatorTask} is the basic interface to implement for processing {@link MessageStream}s. - * Implementations can describe the transformation steps for each {@link MessageStream} in the - * {@link #transform} method using {@link MessageStream} APIs. - * <p> - * Implementations may be augmented by implementing {@link org.apache.samza.task.InitableTask}, - * {@link org.apache.samza.task.WindowableTask} and {@link org.apache.samza.task.ClosableTask} interfaces, - * but should not implement {@link org.apache.samza.task.StreamTask} or {@link org.apache.samza.task.AsyncStreamTask} - * interfaces. - */ -@InterfaceStability.Unstable -public interface StreamOperatorTask { - - /** - * Describe the transformation steps for each {@link MessageStream}s for this task using the - * {@link MessageStream} APIs. Each {@link MessageStream} corresponds to one {@link SystemStreamPartition} - * in the input system. - * - * @param messageStreams the {@link MessageStream}s that receive {@link IncomingSystemMessageEnvelope}s - * from their corresponding {@link org.apache.samza.system.SystemStreamPartition} - */ - void transform(Map<SystemStreamPartition, MessageStream<IncomingSystemMessageEnvelope>> messageStreams); - -} http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/main/java/org/apache/samza/operators/StreamSpec.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamSpec.java b/samza-api/src/main/java/org/apache/samza/operators/StreamSpec.java new file mode 100644 index 0000000..c8a5e8d --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/StreamSpec.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.system.SystemStream; + +import java.util.Properties; + + +/** + * This interface defines the specification of a {@link SystemStream}. It will be used by the {@link org.apache.samza.system.SystemAdmin} + * to create a {@link SystemStream} + */ +@InterfaceStability.Unstable +public interface StreamSpec { + /** + * Get the {@link SystemStream} + * + * @return {@link SystemStream} object + */ + SystemStream getSystemStream(); + + /** + * Get the physical properties of the {@link SystemStream} + * + * @return the properties of this stream + */ + Properties getProperties(); +} http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessageEnvelope.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessageEnvelope.java deleted file mode 100644 index a65809c..0000000 --- a/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessageEnvelope.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.data; - -import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.system.SystemStreamPartition; - - -/** - * A {@link MessageEnvelope} that provides additional information about its input {@link SystemStreamPartition} - * and its {@link Offset} within the {@link SystemStreamPartition}. - * <p> - * Note: the {@link Offset} is only unique and comparable within its {@link SystemStreamPartition}. - */ -public class IncomingSystemMessageEnvelope implements MessageEnvelope<Object, Object> { - - private final IncomingMessageEnvelope ime; - - /** - * Creates an {@code IncomingSystemMessageEnvelope} from the {@link IncomingMessageEnvelope}. - * - * @param ime the {@link IncomingMessageEnvelope} from the input system. - */ - public IncomingSystemMessageEnvelope(IncomingMessageEnvelope ime) { - this.ime = ime; - } - - @Override - public Object getKey() { - return this.ime.getKey(); - } - - @Override - public Object getMessage() { - return this.ime.getMessage(); - } - - public Offset getOffset() { - // TODO: need to add offset factory to generate different types of offset. This is just a placeholder, - // assuming incoming message envelope carries long value as offset (i.e. Kafka case) - return new LongOffset(this.ime.getOffset()); - } - - public SystemStreamPartition getSystemStreamPartition() { - return this.ime.getSystemStreamPartition(); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/main/java/org/apache/samza/operators/data/InputMessageEnvelope.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/InputMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/operators/data/InputMessageEnvelope.java new file mode 100644 index 0000000..306145b --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/data/InputMessageEnvelope.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.data; + +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemStreamPartition; + + +/** + * A {@link MessageEnvelope} that provides additional information about its input {@link SystemStreamPartition} + * and its {@link Offset} within the {@link SystemStreamPartition}. + * <p> + * Note: the {@link Offset} is only unique and comparable within its {@link SystemStreamPartition}. + */ +public class InputMessageEnvelope implements MessageEnvelope<Object, Object> { + + private final IncomingMessageEnvelope ime; + + /** + * Creates an {@code InputMessageEnvelope} from the {@link IncomingMessageEnvelope}. + * + * @param ime the {@link IncomingMessageEnvelope} from the input system. + */ + public InputMessageEnvelope(IncomingMessageEnvelope ime) { + this.ime = ime; + } + + @Override + public Object getKey() { + return this.ime.getKey(); + } + + @Override + public Object getMessage() { + return this.ime.getMessage(); + } + + public Offset getOffset() { + // TODO: need to add offset factory to generate different types of offset. This is just a placeholder, + // assuming incoming message envelope carries long value as offset (i.e. Kafka case) + return new LongOffset(this.ime.getOffset()); + } + + public SystemStreamPartition getSystemStreamPartition() { + return this.ime.getSystemStreamPartition(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java index ad64231..703a44c 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java +++ b/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java @@ -23,7 +23,7 @@ import org.apache.samza.annotation.InterfaceStability; /** - * An entry in the input/output {@link org.apache.samza.operators.MessageStream}s. + * An entry in the input/output {@link org.apache.samza.operators.MessageStream}s */ @InterfaceStability.Unstable public interface MessageEnvelope<K, M> { http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java index e611cd0..58479d6 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java @@ -19,21 +19,20 @@ package org.apache.samza.operators.functions; import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.operators.data.MessageEnvelope; /** - * A function that specifies whether a {@link MessageEnvelope} should be retained for further processing or filtered out. - * @param <M> type of the input {@link MessageEnvelope} + * A function that specifies whether a message should be retained for further processing or filtered out. + * @param <M> type of the input message */ @InterfaceStability.Unstable @FunctionalInterface -public interface FilterFunction<M extends MessageEnvelope> { +public interface FilterFunction<M> extends InitableFunction { /** - * Returns a boolean indicating whether this {@link MessageEnvelope} should be retained or filtered out. - * @param message the {@link MessageEnvelope} to be checked - * @return true if {@link MessageEnvelope} should be retained + * Returns a boolean indicating whether this message should be retained or filtered out. + * @param message the input message to be checked + * @return true if {@code message} should be retained */ boolean apply(M message); http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java index dbc0bd9..bbbddeb 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java @@ -19,25 +19,24 @@ package org.apache.samza.operators.functions; import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.operators.data.MessageEnvelope; import java.util.Collection; /** - * A function that transforms a {@link MessageEnvelope} into a collection of 0 or more {@link MessageEnvelope}s, + * A function that transforms an input message into a collection of 0 or more messages, * possibly of a different type. - * @param <M> type of the input {@link MessageEnvelope} - * @param <OM> type of the transformed {@link MessageEnvelope}s + * @param <M> type of the input message + * @param <OM> type of the transformed messages */ @InterfaceStability.Unstable @FunctionalInterface -public interface FlatMapFunction<M extends MessageEnvelope, OM extends MessageEnvelope> { +public interface FlatMapFunction<M, OM> extends InitableFunction { /** - * Transforms the provided {@link MessageEnvelope} into a collection of 0 or more {@link MessageEnvelope}s. - * @param message the {@link MessageEnvelope} to be transformed - * @return a collection of 0 or more transformed {@link MessageEnvelope}s + * Transforms the provided message into a collection of 0 or more messages. + * @param message the input message to be transformed + * @return a collection of 0 or more transformed messages */ Collection<OM> apply(M message); http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java new file mode 100644 index 0000000..2f738da --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.functions; + +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.config.Config; +import org.apache.samza.task.TaskContext; + + +/** + * interface defined to initalize the context of message transformation functions + */ +@InterfaceStability.Unstable +public interface InitableFunction { + + /** + * Interface method to initialize the context for a specific message transformation function. + * + * @param config the {@link Config} object for this task + * @param context the {@link TaskContext} object for this task + */ + default void init(Config config, TaskContext context) { } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java index 8cb1fce..fc38177 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java @@ -19,26 +19,41 @@ package org.apache.samza.operators.functions; import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.operators.data.MessageEnvelope; /** - * A function that joins {@link MessageEnvelope}s from two {@link org.apache.samza.operators.MessageStream}s and produces - * a joined {@link MessageEnvelope}. - * @param <M> type of the input {@link MessageEnvelope} - * @param <JM> type of the {@link MessageEnvelope} to join with - * @param <RM> type of the joined {@link MessageEnvelope} + * A function that joins messages from two {@link org.apache.samza.operators.MessageStream}s and produces + * a joined message. + * @param <K> type of the join key + * @param <M> type of the input message + * @param <JM> type of the message to join with + * @param <RM> type of the joined message */ @InterfaceStability.Unstable -@FunctionalInterface -public interface JoinFunction<M extends MessageEnvelope, JM extends MessageEnvelope, RM extends MessageEnvelope> { +public interface JoinFunction<K, M, JM, RM> extends InitableFunction { /** - * Join the provided {@link MessageEnvelope}s and produces the joined {@link MessageEnvelope}. - * @param message the input {@link MessageEnvelope} - * @param otherMessage the {@link MessageEnvelope} to join with - * @return the joined {@link MessageEnvelope} + * Join the provided input messages and produces the joined messages. + * @param message the input message + * @param otherMessage the message to join with + * @return the joined message */ RM apply(M message, JM otherMessage); + /** + * Method to get the join key in the messages from the first input stream + * + * @param message the input message from the first input stream + * @return the join key + */ + K getFirstKey(M message); + + /** + * Method to get the join key in the messages from the second input stream + * + * @param message the input message from the second input stream + * @return the join key + */ + K getSecondKey(JM message); + } http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java index 04919a7..05a554f 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java @@ -19,22 +19,21 @@ package org.apache.samza.operators.functions; import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.operators.data.MessageEnvelope; /** - * A function that transforms a {@link MessageEnvelope} into another {@link MessageEnvelope}, possibly of a different type. - * @param <M> type of the input {@link MessageEnvelope} - * @param <OM> type of the transformed {@link MessageEnvelope} + * A function that transforms an input message into another message, possibly of a different type. + * @param <M> type of the input message + * @param <OM> type of the transformed message */ @InterfaceStability.Unstable @FunctionalInterface -public interface MapFunction<M extends MessageEnvelope, OM extends MessageEnvelope> { +public interface MapFunction<M, OM> extends InitableFunction { /** - * Transforms the provided {@link MessageEnvelope} into another {@link MessageEnvelope} - * @param message the {@link MessageEnvelope} to be transformed - * @return the transformed {@link MessageEnvelope} + * Transforms the provided message into another message + * @param message the input message to be transformed + * @return the transformed message */ OM apply(M message); http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java index 505da92..08e090a 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java @@ -19,26 +19,25 @@ package org.apache.samza.operators.functions; import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.operators.data.MessageEnvelope; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskCoordinator; /** - * A function that allows sending a {@link MessageEnvelope} to an output system. - * @param <M> type of the input {@link MessageEnvelope} + * A function that allows sending a message to an output system. + * @param <M> type of the input message */ @InterfaceStability.Unstable @FunctionalInterface -public interface SinkFunction<M extends MessageEnvelope> { +public interface SinkFunction<M> extends InitableFunction { /** - * Allows sending the provided {@link MessageEnvelope} to an output {@link org.apache.samza.system.SystemStream} using + * Allows sending the provided message to an output {@link org.apache.samza.system.SystemStream} using * the provided {@link MessageCollector}. Also provides access to the {@link TaskCoordinator} to request commits * or shut the container down. * - * @param message the {@link MessageEnvelope} to be sent to an output {@link org.apache.samza.system.SystemStream} - * @param messageCollector the {@link MessageCollector} to use to send the {@link MessageEnvelope} + * @param message the input message to be sent to an output {@link org.apache.samza.system.SystemStream} + * @param messageCollector the {@link MessageCollector} to send the {@link org.apache.samza.operators.data.MessageEnvelope} * @param taskCoordinator the {@link TaskCoordinator} to request commits or shutdown */ void apply(M message, MessageCollector messageCollector, TaskCoordinator taskCoordinator); http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java index 3ca4e9a..6e134df 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java @@ -18,13 +18,12 @@ */ package org.apache.samza.operators.triggers; -import org.apache.samza.operators.data.MessageEnvelope; import java.util.List; /** * A {@link Trigger} fires as soon as any of its individual triggers has fired. */ -public class AnyTrigger<M extends MessageEnvelope> implements Trigger { +public class AnyTrigger<M> implements Trigger { private final List<Trigger> triggers; http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java index ba14928..1cf930c 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java @@ -18,13 +18,11 @@ */ package org.apache.samza.operators.triggers; -import org.apache.samza.operators.data.MessageEnvelope; - /** * A {@link Trigger} that fires when the number of messages in the {@link org.apache.samza.operators.windows.WindowPane} * reaches the specified count. */ -public class CountTrigger<M extends MessageEnvelope> implements Trigger { +public class CountTrigger<M> implements Trigger { private final long count; http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java index ae9564d..7f78eb8 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java @@ -18,12 +18,10 @@ */ package org.apache.samza.operators.triggers; -import org.apache.samza.operators.data.MessageEnvelope; - /** * A {@link Trigger} that repeats its underlying trigger forever. */ -class RepeatingTrigger<M extends MessageEnvelope> implements Trigger<M> { +class RepeatingTrigger<M> implements Trigger<M> { private final Trigger<M> trigger; http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java index 13fc3cd..4de60a2 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java @@ -19,7 +19,6 @@ package org.apache.samza.operators.triggers; -import org.apache.samza.operators.data.MessageEnvelope; import java.time.Duration; @@ -27,7 +26,7 @@ import java.time.Duration; * A {@link Trigger} that fires after the specified duration has passed since the first {@link MessageEnvelope} in * the window pane. */ -public class TimeSinceFirstMessageTrigger<M extends MessageEnvelope> implements Trigger { +public class TimeSinceFirstMessageTrigger<M> implements Trigger { private final Duration duration; private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME; http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java index 0150d86..6b09625 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java @@ -18,14 +18,12 @@ */ package org.apache.samza.operators.triggers; -import org.apache.samza.operators.data.MessageEnvelope; - import java.time.Duration; /* * A {@link Trigger} that fires when there are no new {@link MessageEnvelope}s in the window pane for the specified duration. */ -public class TimeSinceLastMessageTrigger<M extends MessageEnvelope> implements Trigger { +public class TimeSinceLastMessageTrigger<M> implements Trigger { private final Duration duration; private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME; http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java index ed7fef7..c5875aa 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java @@ -18,14 +18,12 @@ */ package org.apache.samza.operators.triggers; -import org.apache.samza.operators.data.MessageEnvelope; - import java.time.Duration; /* * A {@link Trigger} that fires after the specified duration in processing time. */ -public class TimeTrigger<M extends MessageEnvelope> implements Trigger { +public class TimeTrigger<M> implements Trigger { private final Duration duration; private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME; http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java index 6dc4f43..be0a877 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/Trigger.java @@ -20,15 +20,16 @@ package org.apache.samza.operators.triggers; -import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.annotation.InterfaceStability; /** * Marker interface for all triggers. The firing of a trigger indicates the completion of a window pane. * * <p> Use the {@link Triggers} APIs to create a {@link Trigger}. * - * @param <M> the type of the incoming {@link MessageEnvelope} + * @param <M> the type of the incoming message */ -public interface Trigger<M extends MessageEnvelope> { +@InterfaceStability.Unstable +public interface Trigger<M> { } http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/main/java/org/apache/samza/operators/triggers/Triggers.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/Triggers.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/Triggers.java index f27cfd8..97fb7b7 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/triggers/Triggers.java +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/Triggers.java @@ -19,7 +19,6 @@ package org.apache.samza.operators.triggers; import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.operators.data.MessageEnvelope; import java.time.Duration; import java.util.ArrayList; @@ -35,61 +34,63 @@ import java.util.List; * <pre> {@code * MessageStream<> windowedStream = stream.window(Windows.tumblingWindow(Duration.of(10, TimeUnit.SECONDS)) * .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.of(4, TimeUnit.SECONDS)))))) - * .accumulateFiredPanes()); + * .setAccumulationMode(AccumulationMode.ACCUMULATING)); * }</pre> * - * @param <M> the type of input {@link MessageEnvelope}s in the {@link org.apache.samza.operators.MessageStream} */ @InterfaceStability.Unstable -public final class Triggers<M extends MessageEnvelope> { +public final class Triggers { private Triggers() { } /** - * Creates a {@link Trigger} that fires when the number of {@link MessageEnvelope}s in the pane + * Creates a {@link Trigger} that fires when the number of messages in the pane * reaches the specified count. * - * @param count the number of {@link MessageEnvelope}s to fire the trigger after + * @param count the number of messages to fire the trigger after + * @param <M> the type of input message in the window * @return the created trigger */ - public static Trigger count(long count) { - return new CountTrigger(count); + public static <M> Trigger<M> count(long count) { + return new CountTrigger<M>(count); } /** - * Creates a trigger that fires after the specified duration has passed since the first {@link MessageEnvelope} in + * Creates a trigger that fires after the specified duration has passed since the first message in * the pane. * * @param duration the duration since the first element + * @param <M> the type of input message in the window * @return the created trigger */ - public static Trigger timeSinceFirstMessage(Duration duration) { - return new TimeSinceFirstMessageTrigger(duration); + public static <M> Trigger<M> timeSinceFirstMessage(Duration duration) { + return new TimeSinceFirstMessageTrigger<M>(duration); } /** - * Creates a trigger that fires when there is no new {@link MessageEnvelope} for the specified duration in the pane. + * Creates a trigger that fires when there is no new message for the specified duration in the pane. * * @param duration the duration since the last element + * @param <M> the type of input message in the window * @return the created trigger */ - public static Trigger timeSinceLastMessage(Duration duration) { - return new TimeSinceLastMessageTrigger(duration); + public static <M> Trigger<M> timeSinceLastMessage(Duration duration) { + return new TimeSinceLastMessageTrigger<M>(duration); } /** * Creates a trigger that fires when any of the provided triggers fire. * - * @param <M> the type of input {@link MessageEnvelope} in the window * @param triggers the individual triggers + * @param <M> the type of input message in the window * @return the created trigger */ - public static <M extends MessageEnvelope> Trigger any(Trigger<M>... triggers) { - List<Trigger> triggerList = new ArrayList<>(); + public static <M> Trigger<M> any(Trigger<M>... triggers) { + List<Trigger<M>> triggerList = new ArrayList<>(); for (Trigger trigger : triggers) { triggerList.add(trigger); } - return new AnyTrigger(Collections.unmodifiableList(triggerList)); + return new AnyTrigger<M>(Collections.unmodifiableList(triggerList)); } /** @@ -98,11 +99,11 @@ public final class Triggers<M extends MessageEnvelope> { * <p>Creating a {@link RepeatingTrigger} from an {@link AnyTrigger} is equivalent to creating an {@link AnyTrigger} from * its individual {@link RepeatingTrigger}s. * - * @param <M> the type of input {@link MessageEnvelope} in the window * @param trigger the individual trigger to repeat + * @param <M> the type of input message in the window * @return the created trigger */ - public static <M extends MessageEnvelope> Trigger repeat(Trigger<M> trigger) { + public static <M> Trigger<M> repeat(Trigger<M> trigger) { return new RepeatingTrigger<>(trigger); } } http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java index 6aae940..9609292 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java @@ -19,18 +19,17 @@ package org.apache.samza.operators.windows; import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.operators.data.MessageEnvelope; import org.apache.samza.operators.triggers.Trigger; /** - * Groups incoming {@link MessageEnvelope}s in the {@link org.apache.samza.operators.MessageStream} into finite + * Groups incoming messages in the {@link org.apache.samza.operators.MessageStream} into finite * windows for processing. * * <p> A window is uniquely identified by its {@link WindowKey}. A window can have one or more associated {@link Trigger}s * that determine when results from the {@link Window} are emitted. * - * <p> Each emitted result contains one or more {@link MessageEnvelope}s in the window and is called a {@link WindowPane}. - * A pane can include all {@link MessageEnvelope}s collected for the window so far or only the new {@link MessageEnvelope}s + * <p> Each emitted result contains one or more messages in the window and is called a {@link WindowPane}. + * A pane can include all messages collected for the window so far or only the new messages * since the last emitted pane. (as determined by the {@link AccumulationMode}) * * <p> A window can have early triggers that allow emitting {@link WindowPane}s speculatively before all data for the window @@ -66,13 +65,12 @@ import org.apache.samza.operators.triggers.Trigger; * <p> Use the {@link Windows} APIs to create various windows and the {@link org.apache.samza.operators.triggers.Triggers} * APIs to create triggers. * - * @param <M> the type of the input {@link MessageEnvelope} - * @param <K> the type of the key in the {@link MessageEnvelope} in this {@link org.apache.samza.operators.MessageStream}. + * @param <M> the type of the input message + * @param <K> the type of the key in the message in this {@link org.apache.samza.operators.MessageStream}. * @param <WV> the type of the value in the {@link WindowPane}. - * @param <WM> the type of the output. */ @InterfaceStability.Unstable -public interface Window<M extends MessageEnvelope, K, WV, WM extends WindowPane<K, WV>> { +public interface Window<M, K, WV> { /** * Set the early triggers for this {@link Window}. @@ -81,7 +79,7 @@ public interface Window<M extends MessageEnvelope, K, WV, WM extends WindowPane< * @param trigger the early trigger * @return the {@link Window} function with the early trigger */ - Window<M, K, WV, WM> setEarlyTrigger(Trigger<M> trigger); + Window<M, K, WV> setEarlyTrigger(Trigger<M> trigger); /** * Set the late triggers for this {@link Window}. @@ -90,7 +88,7 @@ public interface Window<M extends MessageEnvelope, K, WV, WM extends WindowPane< * @param trigger the late trigger * @return the {@link Window} function with the late trigger */ - Window<M, K, WV, WM> setLateTrigger(Trigger<M> trigger); + Window<M, K, WV> setLateTrigger(Trigger<M> trigger); /** * Specify how a {@link Window} should process its previously emitted {@link WindowPane}s. @@ -106,6 +104,6 @@ public interface Window<M extends MessageEnvelope, K, WV, WM extends WindowPane< * @param mode the accumulation mode * @return the {@link Window} function with the specified {@link AccumulationMode}. */ - Window<M, K, WV, WM> setAccumulationMode(AccumulationMode mode); + Window<M, K, WV> setAccumulationMode(AccumulationMode mode); } http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java index 7edf3e1..14bd5ab 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java @@ -21,7 +21,7 @@ package org.apache.samza.operators.windows; /** * Key for a {@link WindowPane} emitted from a {@link Window}. * - * @param <K> the type of the key in the incoming {@link org.apache.samza.operators.data.MessageEnvelope}. + * @param <K> the type of the key in the incoming message. * Windows that are not keyed have a {@link Void} key type. * */ @@ -29,18 +29,27 @@ public class WindowKey<K> { private final K key; - private final String windowId; + private final String paneId; public WindowKey(K key, String windowId) { this.key = key; - this.windowId = windowId; + this.paneId = windowId; } public K getKey() { return key; } - public String getWindowId() { - return windowId; + public String getPaneId() { + return paneId; + } + + @Override + public String toString() { + String wndKey = ""; + if (!(key instanceof Void)) { + wndKey = String.format("%s:", key.toString()); + } + return String.format("%s%s", wndKey, paneId); } } http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java index 0388048..3b66bd1 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java @@ -18,16 +18,13 @@ */ package org.apache.samza.operators.windows; -import org.apache.samza.operators.data.MessageEnvelope; - - /** * Specifies the result emitted from a {@link Window}. * * @param <K> the type of key in the window pane * @param <V> the type of value in the window pane. */ -public final class WindowPane<K, V> implements MessageEnvelope<WindowKey<K>, V> { +public final class WindowPane<K, V> { private final WindowKey<K> key; @@ -41,11 +38,11 @@ public final class WindowPane<K, V> implements MessageEnvelope<WindowKey<K>, V> this.mode = mode; } - @Override public V getMessage() { + public V getMessage() { return this.value; } - @Override public WindowKey<K> getKey() { + public WindowKey<K> getKey() { return this.key; }