http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java deleted file mode 100644 index e057c2b..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.spec; - -import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.operators.functions.PartialJoinFunction; -import org.apache.samza.task.TaskContext; - - -/** - * Spec for the partial join operator that takes messages from one input stream, joins with buffered - * messages from another stream, and produces join results to an output {@link MessageStreamImpl}. - * - * @param <M> the type of input message - * @param <K> the type of join key - * @param <JM> the type of message in the other join stream - * @param <RM> the type of message in the join output stream - */ -public class PartialJoinOperatorSpec<M, K, JM, RM> implements OperatorSpec<RM> { - - private final MessageStreamImpl<RM> joinOutput; - - /** - * The transformation function of {@link PartialJoinOperatorSpec} that takes an input message of - * type {@code M}, joins with a stream of buffered messages of type {@code JM} from another stream, - * and generates a joined result message of type {@code RM}. - */ - private final PartialJoinFunction<K, M, JM, RM> transformFn; - - - /** - * The unique ID for this operator. - */ - private final int opId; - - /** - * Default constructor for a {@link PartialJoinOperatorSpec}. - * - * @param partialJoinFn partial join function that take type {@code M} of input message and join - * w/ type {@code JM} of buffered message from another stream - * @param joinOutput the output {@link MessageStreamImpl} of the join results - */ - PartialJoinOperatorSpec(PartialJoinFunction<K, M, JM, RM> partialJoinFn, MessageStreamImpl<RM> joinOutput, int opId) { - this.joinOutput = joinOutput; - this.transformFn = partialJoinFn; - this.opId = opId; - } - - @Override - public MessageStreamImpl<RM> getNextStream() { - return this.joinOutput; - } - - public PartialJoinFunction<K, M, JM, RM> getTransformFn() { - return this.transformFn; - } - - public OperatorSpec.OpCode getOpCode() { - return OpCode.JOIN; - } - - public int getOpId() { - return this.opId; - } - - @Override public void init(Config config, TaskContext context) { - this.transformFn.init(config, context); - } -}
http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java deleted file mode 100644 index ba30d67..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.spec; - -import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.functions.SinkFunction; -import org.apache.samza.task.TaskContext; - - -/** - * The spec for a sink operator that accepts user-defined logic to output a {@link MessageStreamImpl} to an external - * system. This is a terminal operator and does allows further operator chaining. - * - * @param <M> the type of input message - */ -public class SinkOperatorSpec<M> implements OperatorSpec { - - /** - * {@link OpCode} for this {@link SinkOperatorSpec} - */ - private final OperatorSpec.OpCode opCode; - - /** - * The unique ID for this operator. - */ - private final int opId; - - /** - * The user-defined sink function - */ - private final SinkFunction<M> sinkFn; - - /** - * Potential output stream defined by the {@link SinkFunction} - */ - private final OutputStream<M> outStream; - - /** - * Default constructor for a {@link SinkOperatorSpec} w/o an output stream. (e.g. output is sent to remote database) - * - * @param sinkFn a user defined {@link SinkFunction} that will be called with the output message, - * the output {@link org.apache.samza.task.MessageCollector} and the - * {@link org.apache.samza.task.TaskCoordinator}. - * @param opCode the specific {@link OpCode} for this {@link SinkOperatorSpec}. It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO}, - * or {@link OpCode#PARTITION_BY} - * @param opId the unique id of this {@link SinkOperatorSpec} in the {@link org.apache.samza.operators.StreamGraph} - */ - SinkOperatorSpec(SinkFunction<M> sinkFn, OperatorSpec.OpCode opCode, int opId) { - this(sinkFn, opCode, opId, null); - } - - /** - * Default constructor for a {@link SinkOperatorSpec} that sends the output to an {@link OutputStream} - * - * @param sinkFn a user defined {@link SinkFunction} that will be called with the output message, - * the output {@link org.apache.samza.task.MessageCollector} and the - * {@link org.apache.samza.task.TaskCoordinator}. - * @param opCode the specific {@link OpCode} for this {@link SinkOperatorSpec}. It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO}, - * or {@link OpCode#PARTITION_BY} - * @param opId the unique id of this {@link SinkOperatorSpec} in the {@link org.apache.samza.operators.StreamGraph} - * @param opId the {@link OutputStream} for this {@link SinkOperatorSpec} - */ - SinkOperatorSpec(SinkFunction<M> sinkFn, OperatorSpec.OpCode opCode, int opId, OutputStream<M> outStream) { - this.sinkFn = sinkFn; - this.opCode = opCode; - this.opId = opId; - this.outStream = outStream; - } - - /** - * This is a terminal operator and doesn't allow further operator chaining. - * @return null - */ - @Override - public MessageStreamImpl<M> getNextStream() { - return null; - } - - public SinkFunction<M> getSinkFn() { - return this.sinkFn; - } - - public OperatorSpec.OpCode getOpCode() { - return this.opCode; - } - - public int getOpId() { - return this.opId; - } - - public OutputStream<M> getOutStream() { - return this.outStream; - } - - @Override public void init(Config config, TaskContext context) { - this.sinkFn.init(config, context); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java deleted file mode 100644 index d7813f7..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.spec; - -import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.operators.functions.FlatMapFunction; -import org.apache.samza.task.TaskContext; - - -/** - * The spec for a linear stream operator that outputs 0 or more messages for each input message. - * - * @param <M> the type of input message - * @param <OM> the type of output message - */ -public class StreamOperatorSpec<M, OM> implements OperatorSpec<OM> { - - /** - * {@link OpCode} for this {@link StreamOperatorSpec} - */ - private final OperatorSpec.OpCode opCode; - - /** - * The unique ID for this operator. - */ - private final int opId; - - /** - * The output {@link MessageStreamImpl} from this {@link StreamOperatorSpec} - */ - private final MessageStreamImpl<OM> outputStream; - - /** - * Transformation function applied in this {@link StreamOperatorSpec} - */ - private final FlatMapFunction<M, OM> transformFn; - - /** - * Constructor for a {@link StreamOperatorSpec} that accepts an output {@link MessageStreamImpl}. - * - * @param transformFn the transformation function - * @param outputStream the output {@link MessageStreamImpl} - * @param opCode the {@link OpCode} for this {@link StreamOperatorSpec} - * @param opId the unique id for this {@link StreamOperatorSpec} in a {@link org.apache.samza.operators.StreamGraph} - */ - StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, MessageStreamImpl outputStream, OperatorSpec.OpCode opCode, int opId) { - this.outputStream = outputStream; - this.transformFn = transformFn; - this.opCode = opCode; - this.opId = opId; - } - - @Override - public MessageStreamImpl<OM> getNextStream() { - return this.outputStream; - } - - public FlatMapFunction<M, OM> getTransformFn() { - return this.transformFn; - } - - public OperatorSpec.OpCode getOpCode() { - return this.opCode; - } - - public int getOpId() { - return this.opId; - } - - @Override - public void init(Config config, TaskContext context) { - this.transformFn.init(config, context); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java deleted file mode 100644 index 46417ed..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.operators.spec; - -import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.operators.windows.WindowPane; -import org.apache.samza.operators.windows.internal.WindowInternal; - - -/** - * Default window operator spec object - * - * @param <M> the type of input message to the window - * @param <WK> the type of key of the window - * @param <WV> the type of aggregated value in the window output {@link WindowPane} - */ -public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK, WV>> { - - private final WindowInternal<M, WK, WV> window; - - private final MessageStreamImpl<WindowPane<WK, WV>> outputStream; - - private final int opId; - - - /** - * Constructor for {@link WindowOperatorSpec}. - * - * @param window the window function - * @param outputStream the output {@link MessageStreamImpl} from this {@link WindowOperatorSpec} - * @param opId auto-generated unique ID of this operator - */ - WindowOperatorSpec(WindowInternal<M, WK, WV> window, MessageStreamImpl<WindowPane<WK, WV>> outputStream, int opId) { - this.outputStream = outputStream; - this.window = window; - this.opId = opId; - } - - @Override - public MessageStreamImpl<WindowPane<WK, WV>> getNextStream() { - return this.outputStream; - } - - public WindowInternal getWindow() { - return window; - } - - public OpCode getOpCode() { - return OpCode.WINDOW; - } - - public int getOpId() { - return this.opId; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java deleted file mode 100644 index 53bca2e..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.spec; - -import org.apache.samza.annotation.InterfaceStability; - - -/** - * This interface defines the methods a window state class has to implement. The programmers are allowed to implement - * customized window state to be stored in window state stores by implementing this interface class. - * - * @param <WV> the type for window output value - */ -@InterfaceStability.Unstable -public interface WindowState<WV> { - /** - * Method to get the system time when the first message in the window is received - * - * @return nano-second of system time for the first message received in the window - */ - long getFirstMessageTimeNs(); - - /** - * Method to get the system time when the last message in the window is received - * - * @return nano-second of system time for the last message received in the window - */ - long getLastMessageTimeNs(); - - /** - * Method to get the earliest event time in the window - * - * @return the earliest event time in nano-second in the window - */ - long getEarliestEventTimeNs(); - - /** - * Method to get the latest event time in the window - * - * @return the latest event time in nano-second in the window - */ - long getLatestEventTimeNs(); - - /** - * Method to get the total number of messages received in the window - * - * @return number of messages in the window - */ - long getNumberMessages(); - - /** - * Method to get the corresponding window's output value - * - * @return the corresponding window's output value - */ - WV getOutputValue(); - - /** - * Method to set the corresponding window's output value - * - * @param value the corresponding window's output value - */ - void setOutputValue(WV value); - -} http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-operator/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java b/samza-operator/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java deleted file mode 100644 index fafa2cb..0000000 --- a/samza-operator/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.system; - -import org.apache.samza.operators.StreamGraphBuilder; -import org.apache.samza.config.Config; - -/** - * This class implements the {@link ExecutionEnvironment} that runs the applications in YARN environment - */ -public class RemoteExecutionEnvironment implements ExecutionEnvironment { - - @Override public void run(StreamGraphBuilder app, Config config) { - // TODO: add description of ProcessContext that is going to create a sub-DAG of the {@code graph} - // TODO: actually instantiate the tasks and run the job, i.e. - // 1. create all input/output/intermediate topics - // 2. create the single job configuration - // 3. execute JobRunner to submit the single job for the whole graph - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java b/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java deleted file mode 100644 index f0f6ef2..0000000 --- a/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.system; - -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.operators.StreamGraphBuilder; -import org.apache.samza.config.Config; -import org.apache.samza.operators.StreamGraphImpl; - - -/** - * This class implements the {@link ExecutionEnvironment} that runs the applications in standalone environment - */ -public class StandaloneExecutionEnvironment implements ExecutionEnvironment { - - // TODO: may want to move this to a common base class for all {@link ExecutionEnvironment} - StreamGraph createGraph(StreamGraphBuilder app, Config config) { - StreamGraphImpl graph = new StreamGraphImpl(); - app.init(graph, config); - return graph; - } - - @Override public void run(StreamGraphBuilder app, Config config) { - // 1. get logic graph for optimization - // StreamGraph logicGraph = this.createGraph(app, config); - // 2. potential optimization.... - // 3. create new instance of StreamGraphBuilder that would generate the optimized graph - // 4. create all input/output/intermediate topics - // 5. create the configuration for StreamProcessor - // 6. start the StreamProcessor w/ optimized instance of StreamGraphBuilder - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java deleted file mode 100644 index b007e3c..0000000 --- a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.task; - -import org.apache.samza.config.Config; -import org.apache.samza.operators.ContextManager; -import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.operators.StreamGraphBuilder; -import org.apache.samza.operators.StreamGraphImpl; -import org.apache.samza.operators.data.InputMessageEnvelope; -import org.apache.samza.operators.impl.OperatorGraph; -import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.system.SystemStream; - -import java.util.HashMap; -import java.util.Map; - - -/** - * Execution of the logic sub-DAG - * - * - * An {@link StreamTask} implementation that receives {@link InputMessageEnvelope}s and propagates them - * through the user's stream transformations defined in {@link StreamGraphImpl} using the - * {@link org.apache.samza.operators.MessageStream} APIs. - * <p> - * This class brings all the operator API implementation components together and feeds the - * {@link InputMessageEnvelope}s into the transformation chains. - * <p> - * It accepts an instance of the user implemented factory {@link StreamGraphBuilder} as input parameter of the constructor. - * When its own {@link #init(Config, TaskContext)} method is called during startup, it instantiate a user-defined {@link StreamGraphImpl} - * from the {@link StreamGraphBuilder}, calls {@link StreamGraphImpl#getContextManager()} to initialize the task-wide context - * for the graph, and creates a {@link MessageStreamImpl} corresponding to each of its input - * {@link org.apache.samza.system.SystemStreamPartition}s. Each input {@link MessageStreamImpl} - * will be corresponding to either an input stream or intermediate stream in {@link StreamGraphImpl}. - * <p> - * Then, this task calls {@link org.apache.samza.operators.impl.OperatorGraph#init(Map, Config, TaskContext)} for each of the input - * {@link MessageStreamImpl}. This instantiates the {@link org.apache.samza.operators.impl.OperatorImpl} DAG - * corresponding to the aforementioned {@link org.apache.samza.operators.spec.OperatorSpec} DAG and returns the - * root node of the DAG, which this class saves. - * <p> - * Now that it has the root for the DAG corresponding to each {@link org.apache.samza.system.SystemStreamPartition}, it - * can pass the message envelopes received in {@link StreamTask#process(IncomingMessageEnvelope, MessageCollector, TaskCoordinator)} - * along to the appropriate root nodes. From then on, each {@link org.apache.samza.operators.impl.OperatorImpl} propagates - * its transformed output to the next set of {@link org.apache.samza.operators.impl.OperatorImpl}s. - */ -public final class StreamOperatorTask implements StreamTask, InitableTask, WindowableTask, ClosableTask { - - /** - * A mapping from each {@link SystemStream} to the root node of its operator chain DAG. - */ - private final OperatorGraph operatorGraph = new OperatorGraph(); - - private final StreamGraphBuilder graphBuilder; - - private ContextManager contextManager; - - public StreamOperatorTask(StreamGraphBuilder graphBuilder) { - this.graphBuilder = graphBuilder; - } - - @Override - public final void init(Config config, TaskContext context) throws Exception { - // create the MessageStreamsImpl object and initialize app-specific logic DAG within the task - StreamGraphImpl streams = new StreamGraphImpl(); - this.graphBuilder.init(streams, config); - // get the context manager of the {@link StreamGraph} and initialize the task-specific context - this.contextManager = streams.getContextManager(); - - Map<SystemStream, MessageStreamImpl> inputBySystemStream = new HashMap<>(); - context.getSystemStreamPartitions().forEach(ssp -> { - if (!inputBySystemStream.containsKey(ssp.getSystemStream())) { - // create mapping from the physical input {@link SystemStream} to the logic {@link MessageStream} - inputBySystemStream.putIfAbsent(ssp.getSystemStream(), streams.getInputStream(ssp.getSystemStream())); - } - }); - operatorGraph.init(inputBySystemStream, config, this.contextManager.initTaskContext(config, context)); - } - - @Override - public final void process(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) { - this.operatorGraph.get(ime.getSystemStreamPartition().getSystemStream()) - .onNext(new InputMessageEnvelope(ime), collector, coordinator); - } - - @Override - public final void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception { - // TODO: invoke timer based triggers - } - - @Override - public void close() throws Exception { - this.contextManager.finalizeTaskContext(); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java deleted file mode 100644 index 85ebc6c..0000000 --- a/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.example; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import org.apache.samza.operators.*; -import org.apache.samza.operators.StreamGraphBuilder; -import org.apache.samza.config.Config; -import org.apache.samza.operators.data.MessageEnvelope; -import org.apache.samza.operators.functions.FlatMapFunction; -import org.apache.samza.serializers.JsonSerde; -import org.apache.samza.serializers.StringSerde; -import org.apache.samza.storage.kv.KeyValueStore; -import org.apache.samza.system.ExecutionEnvironment; -import org.apache.samza.system.SystemStream; -import org.apache.samza.task.TaskContext; -import org.apache.samza.util.CommandLine; - -import java.util.Properties; - - -/** - * Example code using {@link KeyValueStore} to implement event-time window - */ -public class KeyValueStoreExample implements StreamGraphBuilder { - - /** - * used by remote execution environment to launch the job in remote program. The remote program should follow the similar - * invoking context as in standalone: - * - * public static void main(String args[]) throws Exception { - * CommandLine cmdLine = new CommandLine(); - * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - * ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config); - * UserMainExample runnableApp = new UserMainExample(); - * runnableApp.run(remoteEnv, config); - * } - * - */ - @Override public void init(StreamGraph graph, Config config) { - - MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>()); - OutputStream<StatsOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<StatsOutput>()); - - pageViewEvents. - partitionBy(m -> m.getMessage().memberId). - flatMap(new MyStatsCounter()). - sendTo(pageViewPerMemberCounters); - - } - - // standalone local program model - public static void main(String[] args) throws Exception { - CommandLine cmdLine = new CommandLine(); - Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config); - standaloneEnv.run(new KeyValueStoreExample(), config); - } - - class MyStatsCounter implements FlatMapFunction<PageViewEvent, StatsOutput> { - private final int timeoutMs = 10 * 60 * 1000; - - KeyValueStore<String, StatsWindowState> statsStore; - - class StatsWindowState { - int lastCount = 0; - long timeAtLastOutput = 0; - int newCount = 0; - } - - @Override - public Collection<StatsOutput> apply(PageViewEvent message) { - List<StatsOutput> outputStats = new ArrayList<>(); - long wndTimestamp = (long) Math.floor(TimeUnit.MILLISECONDS.toMinutes(message.getMessage().timestamp) / 5) * 5; - String wndKey = String.format("%s-%d", message.getMessage().memberId, wndTimestamp); - StatsWindowState curState = this.statsStore.get(wndKey); - curState.newCount++; - long curTimeMs = System.currentTimeMillis(); - if (curState.newCount > 0 && curState.timeAtLastOutput + timeoutMs < curTimeMs) { - curState.timeAtLastOutput = curTimeMs; - curState.lastCount += curState.newCount; - curState.newCount = 0; - outputStats.add(new StatsOutput(message.getMessage().memberId, wndTimestamp, curState.lastCount)); - } - // update counter w/o generating output - this.statsStore.put(wndKey, curState); - return outputStats; - } - - @Override - public void init(Config config, TaskContext context) { - this.statsStore = (KeyValueStore<String, StatsWindowState>) context.getStore("my-stats-wnd-store"); - } - } - - StreamSpec input1 = new StreamSpec() { - @Override public SystemStream getSystemStream() { - return new SystemStream("kafka", "PageViewEvent"); - } - - @Override public Properties getProperties() { - return null; - } - }; - - StreamSpec output = new StreamSpec() { - @Override public SystemStream getSystemStream() { - return new SystemStream("kafka", "PageViewPerMember5min"); - } - - @Override public Properties getProperties() { - return null; - } - }; - - class PageViewEvent implements MessageEnvelope<String, PageViewEvent> { - String pageId; - String memberId; - long timestamp; - - PageViewEvent(String pageId, String memberId, long timestamp) { - this.pageId = pageId; - this.memberId = memberId; - this.timestamp = timestamp; - } - - @Override - public String getKey() { - return this.pageId; - } - - @Override - public PageViewEvent getMessage() { - return this; - } - } - - class StatsOutput implements MessageEnvelope<String, StatsOutput> { - private String memberId; - private long timestamp; - private Integer count; - - StatsOutput(String key, long timestamp, Integer count) { - this.memberId = key; - this.timestamp = timestamp; - this.count = count; - } - - @Override - public String getKey() { - return this.memberId; - } - - @Override - public StatsOutput getMessage() { - return this; - } - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java b/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java deleted file mode 100644 index c6d2e6e..0000000 --- a/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.example; - -import org.apache.samza.operators.*; -import org.apache.samza.operators.StreamGraphBuilder; -import org.apache.samza.config.Config; -import org.apache.samza.operators.data.InputMessageEnvelope; -import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; -import org.apache.samza.operators.data.Offset; -import org.apache.samza.operators.functions.JoinFunction; -import org.apache.samza.serializers.JsonSerde; -import org.apache.samza.serializers.StringSerde; -import org.apache.samza.system.ExecutionEnvironment; -import org.apache.samza.system.SystemStream; -import org.apache.samza.system.SystemStreamPartition; -import org.apache.samza.util.CommandLine; - -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; - - -/** - * Example {@link StreamGraphBuilder} code to test the API methods - */ -public class NoContextStreamExample implements StreamGraphBuilder { - - StreamSpec input1 = new StreamSpec() { - @Override public SystemStream getSystemStream() { - return new SystemStream("kafka", "input1"); - } - - @Override public Properties getProperties() { - return null; - } - }; - - StreamSpec input2 = new StreamSpec() { - @Override public SystemStream getSystemStream() { - return new SystemStream("kafka", "input2"); - } - - @Override public Properties getProperties() { - return null; - } - }; - - StreamSpec output = new StreamSpec() { - @Override public SystemStream getSystemStream() { - return new SystemStream("kafka", "output"); - } - - @Override public Properties getProperties() { - return null; - } - }; - - class MessageType { - String joinKey; - List<String> joinFields = new ArrayList<>(); - } - - class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> { - JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) { - super(key, data, offset, partition); - } - } - - private JsonMessageEnvelope getInputMessage(InputMessageEnvelope ism) { - return new JsonMessageEnvelope( - ((MessageType) ism.getMessage()).joinKey, - (MessageType) ism.getMessage(), - ism.getOffset(), - ism.getSystemStreamPartition()); - } - - class MyJoinFunction implements JoinFunction<String, JsonMessageEnvelope, JsonMessageEnvelope, JsonIncomingSystemMessageEnvelope<MessageType>> { - - @Override - public JsonIncomingSystemMessageEnvelope<MessageType> apply(JsonMessageEnvelope m1, - JsonMessageEnvelope m2) { - MessageType newJoinMsg = new MessageType(); - newJoinMsg.joinKey = m1.getKey(); - newJoinMsg.joinFields.addAll(m1.getMessage().joinFields); - newJoinMsg.joinFields.addAll(m2.getMessage().joinFields); - return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null); - } - - @Override - public String getFirstKey(JsonMessageEnvelope message) { - return message.getKey(); - } - - @Override - public String getSecondKey(JsonMessageEnvelope message) { - return message.getKey(); - } - } - - /** - * used by remote execution environment to launch the job in remote program. The remote program should follow the similar - * invoking context as in standalone: - * - * public static void main(String args[]) throws Exception { - * CommandLine cmdLine = new CommandLine(); - * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - * ExecutionEnvironment remoteEnv = ExecutionEnvironment.fromConfig(config); - * remoteEnv.run(new NoContextStreamExample(), config); - * } - * - */ - @Override public void init(StreamGraph graph, Config config) { - MessageStream<InputMessageEnvelope> inputSource1 = graph.<Object, Object, InputMessageEnvelope>createInStream( - input1, null, null); - MessageStream<InputMessageEnvelope> inputSource2 = graph.<Object, Object, InputMessageEnvelope>createInStream( - input2, null, null); - OutputStream<JsonIncomingSystemMessageEnvelope<MessageType>> outStream = graph.createOutStream(output, - new StringSerde("UTF-8"), new JsonSerde<>()); - - inputSource1.map(this::getInputMessage). - join(inputSource2.map(this::getInputMessage), new MyJoinFunction()). - sendTo(outStream); - - } - - // standalone local program model - public static void main(String[] args) throws Exception { - CommandLine cmdLine = new CommandLine(); - Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config); - standaloneEnv.run(new NoContextStreamExample(), config); - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java deleted file mode 100644 index 0477066..0000000 --- a/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.example; - -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraphBuilder; -import org.apache.samza.config.Config; -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.operators.StreamSpec; -import org.apache.samza.operators.data.MessageEnvelope; -import org.apache.samza.operators.functions.JoinFunction; -import org.apache.samza.serializers.JsonSerde; -import org.apache.samza.serializers.StringSerde; -import org.apache.samza.system.ExecutionEnvironment; -import org.apache.samza.system.SystemStream; -import org.apache.samza.util.CommandLine; - -import java.util.Properties; - - -/** - * Simple 2-way stream-to-stream join example - */ -public class OrderShipmentJoinExample implements StreamGraphBuilder { - - /** - * used by remote execution environment to launch the job in remote program. The remote program should follow the similar - * invoking context as in standalone: - * - * public static void main(String args[]) throws Exception { - * CommandLine cmdLine = new CommandLine(); - * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - * ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config); - * UserMainExample runnableApp = new UserMainExample(); - * runnableApp.run(remoteEnv, config); - * } - * - */ - @Override public void init(StreamGraph graph, Config config) { - - MessageStream<OrderRecord> orders = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>()); - MessageStream<ShipmentRecord> shipments = graph.createInStream(input2, new StringSerde("UTF-8"), new JsonSerde<>()); - OutputStream<FulFilledOrderRecord> fulfilledOrders = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>()); - - orders.join(shipments, new MyJoinFunction()).sendTo(fulfilledOrders); - - } - - // standalone local program model - public static void main(String[] args) throws Exception { - CommandLine cmdLine = new CommandLine(); - Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config); - standaloneEnv.run(new OrderShipmentJoinExample(), config); - } - - StreamSpec input1 = new StreamSpec() { - @Override public SystemStream getSystemStream() { - return new SystemStream("kafka", "Orders"); - } - - @Override public Properties getProperties() { - return null; - } - }; - - StreamSpec input2 = new StreamSpec() { - @Override public SystemStream getSystemStream() { - return new SystemStream("kafka", "Shipment"); - } - - @Override public Properties getProperties() { - return null; - } - }; - - StreamSpec output = new StreamSpec() { - @Override public SystemStream getSystemStream() { - return new SystemStream("kafka", "FulfilledOrders"); - } - - @Override public Properties getProperties() { - return null; - } - }; - - class OrderRecord implements MessageEnvelope<String, OrderRecord> { - String orderId; - long orderTimeMs; - - OrderRecord(String orderId, long timeMs) { - this.orderId = orderId; - this.orderTimeMs = timeMs; - } - - @Override - public String getKey() { - return this.orderId; - } - - @Override - public OrderRecord getMessage() { - return this; - } - } - - class ShipmentRecord implements MessageEnvelope<String, ShipmentRecord> { - String orderId; - long shipTimeMs; - - ShipmentRecord(String orderId, long timeMs) { - this.orderId = orderId; - this.shipTimeMs = timeMs; - } - - @Override - public String getKey() { - return this.orderId; - } - - @Override - public ShipmentRecord getMessage() { - return this; - } - } - - class FulFilledOrderRecord implements MessageEnvelope<String, FulFilledOrderRecord> { - String orderId; - long orderTimeMs; - long shipTimeMs; - - FulFilledOrderRecord(String orderId, long orderTimeMs, long shipTimeMs) { - this.orderId = orderId; - this.orderTimeMs = orderTimeMs; - this.shipTimeMs = shipTimeMs; - } - - - @Override - public String getKey() { - return this.orderId; - } - - @Override - public FulFilledOrderRecord getMessage() { - return this; - } - } - - FulFilledOrderRecord myJoinResult(OrderRecord m1, ShipmentRecord m2) { - return new FulFilledOrderRecord(m1.getKey(), m1.orderTimeMs, m2.shipTimeMs); - } - - class MyJoinFunction implements JoinFunction<String, OrderRecord, ShipmentRecord, FulFilledOrderRecord> { - - @Override - public FulFilledOrderRecord apply(OrderRecord message, ShipmentRecord otherMessage) { - return OrderShipmentJoinExample.this.myJoinResult(message, otherMessage); - } - - @Override - public String getFirstKey(OrderRecord message) { - return message.getKey(); - } - - @Override - public String getSecondKey(ShipmentRecord message) { - return message.getKey(); - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java deleted file mode 100644 index f7d8bda..0000000 --- a/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.example; - -import org.apache.samza.operators.*; -import org.apache.samza.operators.StreamGraphBuilder; -import org.apache.samza.config.Config; -import org.apache.samza.operators.data.MessageEnvelope; -import org.apache.samza.operators.triggers.Triggers; -import org.apache.samza.operators.windows.AccumulationMode; -import org.apache.samza.operators.windows.WindowPane; -import org.apache.samza.operators.windows.Windows; -import org.apache.samza.serializers.JsonSerde; -import org.apache.samza.serializers.StringSerde; -import org.apache.samza.system.ExecutionEnvironment; -import org.apache.samza.system.SystemStream; -import org.apache.samza.util.CommandLine; - -import java.time.Duration; -import java.util.Properties; - - -/** - * Example code to implement window-based counter - */ -public class PageViewCounterExample implements StreamGraphBuilder { - - @Override public void init(StreamGraph graph, Config config) { - - MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>()); - OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>()); - - pageViewEvents. - window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.getMessage().memberId, Duration.ofSeconds(10), (m, c) -> c + 1). - setEarlyTrigger(Triggers.repeat(Triggers.count(5))). - setAccumulationMode(AccumulationMode.DISCARDING)). - map(MyStreamOutput::new). - sendTo(pageViewPerMemberCounters); - - } - - public static void main(String[] args) { - CommandLine cmdLine = new CommandLine(); - Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config); - standaloneEnv.run(new PageViewCounterExample(), config); - } - - StreamSpec input1 = new StreamSpec() { - @Override public SystemStream getSystemStream() { - return new SystemStream("kafka", "PageViewEvent"); - } - - @Override public Properties getProperties() { - return null; - } - }; - - StreamSpec output = new StreamSpec() { - @Override public SystemStream getSystemStream() { - return new SystemStream("kafka", "PageViewPerMember5min"); - } - - @Override public Properties getProperties() { - return null; - } - }; - - class PageViewEvent implements MessageEnvelope<String, PageViewEvent> { - String pageId; - String memberId; - long timestamp; - - PageViewEvent(String pageId, String memberId, long timestamp) { - this.pageId = pageId; - this.memberId = memberId; - this.timestamp = timestamp; - } - - @Override - public String getKey() { - return this.pageId; - } - - @Override - public PageViewEvent getMessage() { - return this; - } - } - - class MyStreamOutput implements MessageEnvelope<String, MyStreamOutput> { - String memberId; - long timestamp; - int count; - - MyStreamOutput(WindowPane<String, Integer> m) { - this.memberId = m.getKey().getKey(); - this.timestamp = Long.valueOf(m.getKey().getPaneId()); - this.count = m.getMessage(); - } - - @Override - public String getKey() { - return this.memberId; - } - - @Override - public MyStreamOutput getMessage() { - return this; - } - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java deleted file mode 100644 index 6994ac4..0000000 --- a/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.example; - -import org.apache.samza.operators.*; -import org.apache.samza.operators.StreamGraphBuilder; -import org.apache.samza.config.Config; -import org.apache.samza.operators.data.MessageEnvelope; -import org.apache.samza.operators.windows.WindowPane; -import org.apache.samza.operators.windows.Windows; -import org.apache.samza.serializers.JsonSerde; -import org.apache.samza.serializers.StringSerde; -import org.apache.samza.system.ExecutionEnvironment; -import org.apache.samza.system.SystemStream; -import org.apache.samza.util.CommandLine; - -import java.time.Duration; -import java.util.*; - - -/** - * Example {@link StreamGraphBuilder} code to test the API methods with re-partition operator - */ -public class RepartitionExample implements StreamGraphBuilder { - - /** - * used by remote execution environment to launch the job in remote program. The remote program should follow the similar - * invoking context as in standalone: - * - * public static void main(String args[]) throws Exception { - * CommandLine cmdLine = new CommandLine(); - * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - * ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config); - * remoteEnv.run(new UserMainExample(), config); - * } - * - */ - @Override public void init(StreamGraph graph, Config config) { - - MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>()); - OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>()); - - pageViewEvents. - partitionBy(m -> m.getMessage().memberId). - window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow( - msg -> msg.getMessage().memberId, Duration.ofMinutes(5), (m, c) -> c + 1)). - map(MyStreamOutput::new). - sendTo(pageViewPerMemberCounters); - - } - - // standalone local program model - public static void main(String[] args) throws Exception { - CommandLine cmdLine = new CommandLine(); - Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config); - standaloneEnv.run(new RepartitionExample(), config); - } - - StreamSpec input1 = new StreamSpec() { - @Override public SystemStream getSystemStream() { - return new SystemStream("kafka", "PageViewEvent"); - } - - @Override public Properties getProperties() { - return null; - } - }; - - StreamSpec output = new StreamSpec() { - @Override public SystemStream getSystemStream() { - return new SystemStream("kafka", "PageViewPerMember5min"); - } - - @Override public Properties getProperties() { - return null; - } - }; - - class PageViewEvent implements MessageEnvelope<String, PageViewEvent> { - String pageId; - String memberId; - long timestamp; - - PageViewEvent(String pageId, String memberId, long timestamp) { - this.pageId = pageId; - this.memberId = memberId; - this.timestamp = timestamp; - } - - @Override - public String getKey() { - return this.pageId; - } - - @Override - public PageViewEvent getMessage() { - return this; - } - } - - class MyStreamOutput implements MessageEnvelope<String, MyStreamOutput> { - String memberId; - long timestamp; - int count; - - MyStreamOutput(WindowPane<String, Integer> m) { - this.memberId = m.getKey().getKey(); - this.timestamp = Long.valueOf(m.getKey().getPaneId()); - this.count = m.getMessage(); - } - - @Override - public String getKey() { - return this.memberId; - } - - @Override - public MyStreamOutput getMessage() { - return this; - } - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-operator/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java b/samza-operator/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java deleted file mode 100644 index 8ecd44f..0000000 --- a/samza-operator/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.example; - -import java.lang.reflect.Field; -import org.apache.samza.Partition; -import org.apache.samza.config.Config; -import org.apache.samza.operators.impl.OperatorGraph; -import org.apache.samza.system.SystemStreamPartition; -import org.apache.samza.task.StreamOperatorTask; -import org.apache.samza.task.TaskContext; -import org.junit.Test; - -import java.util.HashSet; -import java.util.Set; - -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - - -/** - * Unit test for {@link StreamOperatorTask} - */ -public class TestBasicStreamGraphs { - - private final Set<SystemStreamPartition> inputPartitions = new HashSet<SystemStreamPartition>() { { - for (int i = 0; i < 4; i++) { - this.add(new SystemStreamPartition("my-system", String.format("my-topic%d", i), new Partition(i))); - } - } }; - - @Test - public void testUserTask() throws Exception { - Config mockConfig = mock(Config.class); - TaskContext mockContext = mock(TaskContext.class); - when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions); - TestWindowExample userTask = new TestWindowExample(this.inputPartitions); - StreamOperatorTask adaptorTask = new StreamOperatorTask(userTask); - Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph"); - pipelineMapFld.setAccessible(true); - OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask); - - adaptorTask.init(mockConfig, mockContext); - this.inputPartitions.forEach(partition -> { - assertNotNull(opGraph.get(partition.getSystemStream())); - }); - } - - @Test - public void testSplitTask() throws Exception { - Config mockConfig = mock(Config.class); - TaskContext mockContext = mock(TaskContext.class); - when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions); - TestBroadcastExample splitTask = new TestBroadcastExample(this.inputPartitions); - StreamOperatorTask adaptorTask = new StreamOperatorTask(splitTask); - Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph"); - pipelineMapFld.setAccessible(true); - OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask); - - adaptorTask.init(mockConfig, mockContext); - this.inputPartitions.forEach(partition -> { - assertNotNull(opGraph.get(partition.getSystemStream())); - }); - } - - @Test - public void testJoinTask() throws Exception { - Config mockConfig = mock(Config.class); - TaskContext mockContext = mock(TaskContext.class); - when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions); - TestJoinExample joinTask = new TestJoinExample(this.inputPartitions); - StreamOperatorTask adaptorTask = new StreamOperatorTask(joinTask); - Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph"); - pipelineMapFld.setAccessible(true); - OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask); - - adaptorTask.init(mockConfig, mockContext); - this.inputPartitions.forEach(partition -> { - assertNotNull(opGraph.get(partition.getSystemStream())); - }); - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-operator/src/test/java/org/apache/samza/example/TestBroadcastExample.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestBroadcastExample.java b/samza-operator/src/test/java/org/apache/samza/example/TestBroadcastExample.java deleted file mode 100644 index d22324b..0000000 --- a/samza-operator/src/test/java/org/apache/samza/example/TestBroadcastExample.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.example; - -import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.operators.StreamSpec; -import org.apache.samza.operators.data.InputMessageEnvelope; -import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; -import org.apache.samza.operators.data.Offset; -import org.apache.samza.operators.triggers.Triggers; -import org.apache.samza.operators.windows.Windows; -import org.apache.samza.system.SystemStream; -import org.apache.samza.system.SystemStreamPartition; - -import java.time.Duration; -import java.util.function.BiFunction; -import java.util.Properties; -import java.util.Set; - - -/** - * Example implementation of split stream tasks - * - */ -public class TestBroadcastExample extends TestExampleBase { - - TestBroadcastExample(Set<SystemStreamPartition> inputs) { - super(inputs); - } - - class MessageType { - String field1; - String field2; - String field3; - String field4; - String parKey; - private long timestamp; - - public long getTimestamp() { - return this.timestamp; - } - } - - class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> { - JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) { - super(key, data, offset, partition); - } - } - - @Override - public void init(StreamGraph graph, Config config) { - BiFunction<JsonMessageEnvelope, Integer, Integer> sumAggregator = (m, c) -> c + 1; - inputs.keySet().forEach(entry -> { - MessageStream<JsonMessageEnvelope> inputStream = graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() { - @Override public SystemStream getSystemStream() { - return entry; - } - - @Override public Properties getProperties() { - return null; - } - }, null, null).map(this::getInputMessage); - - inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator) - .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10))))); - - inputStream.filter(this::myFilter2).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator) - .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10))))); - - inputStream.filter(this::myFilter3).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator) - .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10))))); - - }); - } - - JsonMessageEnvelope getInputMessage(InputMessageEnvelope m1) { - return (JsonMessageEnvelope) m1.getMessage(); - } - - boolean myFilter1(JsonMessageEnvelope m1) { - // Do user defined processing here - return m1.getMessage().parKey.equals("key1"); - } - - boolean myFilter2(JsonMessageEnvelope m1) { - // Do user defined processing here - return m1.getMessage().parKey.equals("key2"); - } - - boolean myFilter3(JsonMessageEnvelope m1) { - return m1.getMessage().parKey.equals("key3"); - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-operator/src/test/java/org/apache/samza/example/TestExampleBase.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestExampleBase.java b/samza-operator/src/test/java/org/apache/samza/example/TestExampleBase.java deleted file mode 100644 index c4df9d4..0000000 --- a/samza-operator/src/test/java/org/apache/samza/example/TestExampleBase.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.example; - -import org.apache.samza.operators.StreamGraphBuilder; -import org.apache.samza.system.SystemStream; -import org.apache.samza.system.SystemStreamPartition; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -/** - * Base class for test examples - * - */ -public abstract class TestExampleBase implements StreamGraphBuilder { - - protected final Map<SystemStream, Set<SystemStreamPartition>> inputs; - - TestExampleBase(Set<SystemStreamPartition> inputs) { - this.inputs = new HashMap<>(); - for (SystemStreamPartition input : inputs) { - this.inputs.putIfAbsent(input.getSystemStream(), new HashSet<>()); - this.inputs.get(input.getSystemStream()).add(input); - } - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-operator/src/test/java/org/apache/samza/example/TestJoinExample.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestJoinExample.java b/samza-operator/src/test/java/org/apache/samza/example/TestJoinExample.java deleted file mode 100644 index fe6e7e7..0000000 --- a/samza-operator/src/test/java/org/apache/samza/example/TestJoinExample.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.example; - -import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.operators.StreamSpec; -import org.apache.samza.operators.data.InputMessageEnvelope; -import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; -import org.apache.samza.operators.data.Offset; -import org.apache.samza.operators.functions.JoinFunction; -import org.apache.samza.serializers.JsonSerde; -import org.apache.samza.serializers.StringSerde; -import org.apache.samza.system.SystemStream; -import org.apache.samza.system.SystemStreamPartition; - -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.Set; - - -/** - * Example implementation of unique key-based stream-stream join tasks - * - */ -public class TestJoinExample extends TestExampleBase { - - TestJoinExample(Set<SystemStreamPartition> inputs) { - super(inputs); - } - - class MessageType { - String joinKey; - List<String> joinFields = new ArrayList<>(); - } - - class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> { - JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) { - super(key, data, offset, partition); - } - } - - MessageStream<JsonMessageEnvelope> joinOutput = null; - - @Override - public void init(StreamGraph graph, Config config) { - - for (SystemStream input : inputs.keySet()) { - MessageStream<JsonMessageEnvelope> newSource = graph.<Object, Object, InputMessageEnvelope>createInStream( - new StreamSpec() { - @Override public SystemStream getSystemStream() { - return input; - } - - @Override public Properties getProperties() { - return null; - } - }, null, null).map(this::getInputMessage); - if (joinOutput == null) { - joinOutput = newSource; - } else { - joinOutput = joinOutput.join(newSource, new MyJoinFunction()); - } - } - - joinOutput.sendTo(graph.createOutStream(new StreamSpec() { - @Override public SystemStream getSystemStream() { - return null; - } - - @Override public Properties getProperties() { - return null; - } - }, new StringSerde("UTF-8"), new JsonSerde<>())); - - } - - private JsonMessageEnvelope getInputMessage(InputMessageEnvelope ism) { - return new JsonMessageEnvelope( - ((MessageType) ism.getMessage()).joinKey, - (MessageType) ism.getMessage(), - ism.getOffset(), - ism.getSystemStreamPartition()); - } - - class MyJoinFunction implements JoinFunction<String, JsonMessageEnvelope, JsonMessageEnvelope, JsonMessageEnvelope> { - JsonMessageEnvelope myJoinResult(JsonMessageEnvelope m1, JsonMessageEnvelope m2) { - MessageType newJoinMsg = new MessageType(); - newJoinMsg.joinKey = m1.getKey(); - newJoinMsg.joinFields.addAll(m1.getMessage().joinFields); - newJoinMsg.joinFields.addAll(m2.getMessage().joinFields); - return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null); - } - - @Override - public JsonMessageEnvelope apply(JsonMessageEnvelope message, JsonMessageEnvelope otherMessage) { - return this.myJoinResult(message, otherMessage); - } - - @Override - public String getFirstKey(JsonMessageEnvelope message) { - return message.getKey(); - } - - @Override - public String getSecondKey(JsonMessageEnvelope message) { - return message.getKey(); - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-operator/src/test/java/org/apache/samza/example/TestWindowExample.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestWindowExample.java b/samza-operator/src/test/java/org/apache/samza/example/TestWindowExample.java deleted file mode 100644 index e08ca20..0000000 --- a/samza-operator/src/test/java/org/apache/samza/example/TestWindowExample.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.example; - -import org.apache.samza.config.Config; -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.operators.StreamSpec; -import org.apache.samza.operators.data.InputMessageEnvelope; -import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; -import org.apache.samza.operators.data.MessageEnvelope; -import org.apache.samza.operators.data.Offset; -import org.apache.samza.operators.windows.Windows; -import org.apache.samza.system.SystemStream; -import org.apache.samza.system.SystemStreamPartition; - -import java.time.Duration; -import java.util.function.BiFunction; -import java.util.Properties; -import java.util.Set; - - -/** - * Example implementation of a simple user-defined tasks w/ window operators - * - */ -public class TestWindowExample extends TestExampleBase { - class MessageType { - String field1; - String field2; - } - - TestWindowExample(Set<SystemStreamPartition> inputs) { - super(inputs); - } - - class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> { - - JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) { - super(key, data, offset, partition); - } - } - - @Override - public void init(StreamGraph graph, Config config) { - BiFunction<JsonMessageEnvelope, Integer, Integer> maxAggregator = (m, c) -> c + 1; - inputs.keySet().forEach(source -> graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() { - @Override public SystemStream getSystemStream() { - return source; - } - - @Override public Properties getProperties() { - return null; - } - }, null, null). - map(m1 -> new JsonMessageEnvelope(this.myMessageKeyFunction(m1), (MessageType) m1.getMessage(), m1.getOffset(), - m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200), maxAggregator))); - - } - - String myMessageKeyFunction(MessageEnvelope<Object, Object> m) { - return m.getKey().toString(); - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java deleted file mode 100644 index 160a47a..0000000 --- a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators; - -import org.apache.samza.operators.functions.FilterFunction; -import org.apache.samza.operators.functions.FlatMapFunction; -import org.apache.samza.operators.functions.JoinFunction; -import org.apache.samza.operators.functions.MapFunction; -import org.apache.samza.operators.functions.SinkFunction; -import org.apache.samza.operators.spec.OperatorSpec; -import org.apache.samza.operators.spec.PartialJoinOperatorSpec; -import org.apache.samza.operators.spec.SinkOperatorSpec; -import org.apache.samza.operators.spec.StreamOperatorSpec; -import org.apache.samza.system.OutgoingMessageEnvelope; -import org.apache.samza.system.SystemStream; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskCoordinator; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.Set; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - - -public class TestMessageStreamImpl { - - private StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); - - @Test - public void testMap() { - MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph); - MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xMap = (TestMessageEnvelope m) -> - new TestOutputMessageEnvelope(m.getKey(), m.getMessage().getValue().length() + 1); - MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.map(xMap); - Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs(); - assertEquals(subs.size(), 1); - OperatorSpec<TestOutputMessageEnvelope> mapOp = subs.iterator().next(); - assertTrue(mapOp instanceof StreamOperatorSpec); - assertEquals(mapOp.getNextStream(), outputStream); - // assert that the transformation function is what we defined above - TestMessageEnvelope xTestMsg = mock(TestMessageEnvelope.class); - TestMessageEnvelope.MessageType mockInnerTestMessage = mock(TestMessageEnvelope.MessageType.class); - when(xTestMsg.getKey()).thenReturn("test-msg-key"); - when(xTestMsg.getMessage()).thenReturn(mockInnerTestMessage); - when(mockInnerTestMessage.getValue()).thenReturn("123456789"); - - Collection<TestOutputMessageEnvelope> cOutputMsg = ((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) mapOp).getTransformFn().apply(xTestMsg); - assertEquals(cOutputMsg.size(), 1); - TestOutputMessageEnvelope outputMessage = cOutputMsg.iterator().next(); - assertEquals(outputMessage.getKey(), xTestMsg.getKey()); - assertEquals(outputMessage.getMessage(), Integer.valueOf(xTestMsg.getMessage().getValue().length() + 1)); - } - - @Test - public void testFlatMap() { - MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph); - Set<TestOutputMessageEnvelope> flatOuts = new HashSet<TestOutputMessageEnvelope>() { { - this.add(mock(TestOutputMessageEnvelope.class)); - this.add(mock(TestOutputMessageEnvelope.class)); - this.add(mock(TestOutputMessageEnvelope.class)); - } }; - FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xFlatMap = (TestMessageEnvelope message) -> flatOuts; - MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.flatMap(xFlatMap); - Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs(); - assertEquals(subs.size(), 1); - OperatorSpec<TestOutputMessageEnvelope> flatMapOp = subs.iterator().next(); - assertTrue(flatMapOp instanceof StreamOperatorSpec); - assertEquals(flatMapOp.getNextStream(), outputStream); - // assert that the transformation function is what we defined above - assertEquals(((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) flatMapOp).getTransformFn(), xFlatMap); - } - - @Test - public void testFilter() { - MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph); - FilterFunction<TestMessageEnvelope> xFilter = (TestMessageEnvelope m) -> m.getMessage().getEventTime() > 123456L; - MessageStream<TestMessageEnvelope> outputStream = inputStream.filter(xFilter); - Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs(); - assertEquals(subs.size(), 1); - OperatorSpec<TestMessageEnvelope> filterOp = subs.iterator().next(); - assertTrue(filterOp instanceof StreamOperatorSpec); - assertEquals(filterOp.getNextStream(), outputStream); - // assert that the transformation function is what we defined above - FlatMapFunction<TestMessageEnvelope, TestMessageEnvelope> txfmFn = ((StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope>) filterOp).getTransformFn(); - TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class); - TestMessageEnvelope.MessageType mockInnerTestMessage = mock(TestMessageEnvelope.MessageType.class); - when(mockMsg.getMessage()).thenReturn(mockInnerTestMessage); - when(mockInnerTestMessage.getEventTime()).thenReturn(11111L); - Collection<TestMessageEnvelope> output = txfmFn.apply(mockMsg); - assertTrue(output.isEmpty()); - when(mockMsg.getMessage()).thenReturn(mockInnerTestMessage); - when(mockInnerTestMessage.getEventTime()).thenReturn(999999L); - output = txfmFn.apply(mockMsg); - assertEquals(output.size(), 1); - assertEquals(output.iterator().next(), mockMsg); - } - - @Test - public void testSink() { - MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph); - SinkFunction<TestMessageEnvelope> xSink = (TestMessageEnvelope m, MessageCollector mc, TaskCoordinator tc) -> { - mc.send(new OutgoingMessageEnvelope(new SystemStream("test-sys", "test-stream"), m.getMessage())); - tc.commit(TaskCoordinator.RequestScope.CURRENT_TASK); - }; - inputStream.sink(xSink); - Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs(); - assertEquals(subs.size(), 1); - OperatorSpec<TestMessageEnvelope> sinkOp = subs.iterator().next(); - assertTrue(sinkOp instanceof SinkOperatorSpec); - assertEquals(((SinkOperatorSpec) sinkOp).getSinkFn(), xSink); - assertNull(((SinkOperatorSpec) sinkOp).getNextStream()); - } - - @Test - public void testJoin() { - MessageStreamImpl<TestMessageEnvelope> source1 = new MessageStreamImpl<>(mockGraph); - MessageStreamImpl<TestMessageEnvelope> source2 = new MessageStreamImpl<>(mockGraph); - JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joiner = - new JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope>() { - @Override - public TestOutputMessageEnvelope apply(TestMessageEnvelope m1, TestMessageEnvelope m2) { - return new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length()); - } - - @Override - public String getFirstKey(TestMessageEnvelope message) { - return message.getKey(); - } - - @Override - public String getSecondKey(TestMessageEnvelope message) { - return message.getKey(); - } - }; - - MessageStream<TestOutputMessageEnvelope> joinOutput = source1.join(source2, joiner); - Collection<OperatorSpec> subs = source1.getRegisteredOperatorSpecs(); - assertEquals(subs.size(), 1); - OperatorSpec<TestMessageEnvelope> joinOp1 = subs.iterator().next(); - assertTrue(joinOp1 instanceof PartialJoinOperatorSpec); - assertEquals(((PartialJoinOperatorSpec) joinOp1).getNextStream(), joinOutput); - subs = source2.getRegisteredOperatorSpecs(); - assertEquals(subs.size(), 1); - OperatorSpec<TestMessageEnvelope> joinOp2 = subs.iterator().next(); - assertTrue(joinOp2 instanceof PartialJoinOperatorSpec); - assertEquals(((PartialJoinOperatorSpec) joinOp2).getNextStream(), joinOutput); - TestMessageEnvelope joinMsg1 = new TestMessageEnvelope("test-join-1", "join-msg-001", 11111L); - TestMessageEnvelope joinMsg2 = new TestMessageEnvelope("test-join-2", "join-msg-002", 22222L); - TestOutputMessageEnvelope xOut = (TestOutputMessageEnvelope) ((PartialJoinOperatorSpec) joinOp1).getTransformFn().apply(joinMsg1, joinMsg2); - assertEquals(xOut.getKey(), "test-join-1"); - assertEquals(xOut.getMessage(), Integer.valueOf(24)); - xOut = (TestOutputMessageEnvelope) ((PartialJoinOperatorSpec) joinOp2).getTransformFn().apply(joinMsg2, joinMsg1); - assertEquals(xOut.getKey(), "test-join-1"); - assertEquals(xOut.getMessage(), Integer.valueOf(24)); - } - - @Test - public void testMerge() { - MessageStream<TestMessageEnvelope> merge1 = new MessageStreamImpl<>(mockGraph); - Collection<MessageStream<TestMessageEnvelope>> others = new ArrayList<MessageStream<TestMessageEnvelope>>() { { - this.add(new MessageStreamImpl<>(mockGraph)); - this.add(new MessageStreamImpl<>(mockGraph)); - } }; - MessageStream<TestMessageEnvelope> mergeOutput = merge1.merge(others); - validateMergeOperator(merge1, mergeOutput); - - others.forEach(merge -> validateMergeOperator(merge, mergeOutput)); - } - - private void validateMergeOperator(MessageStream<TestMessageEnvelope> mergeSource, MessageStream<TestMessageEnvelope> mergeOutput) { - Collection<OperatorSpec> subs = ((MessageStreamImpl<TestMessageEnvelope>) mergeSource).getRegisteredOperatorSpecs(); - assertEquals(subs.size(), 1); - OperatorSpec<TestMessageEnvelope> mergeOp = subs.iterator().next(); - assertTrue(mergeOp instanceof StreamOperatorSpec); - assertEquals(((StreamOperatorSpec) mergeOp).getNextStream(), mergeOutput); - TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class); - Collection<TestMessageEnvelope> outputs = ((StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope>) mergeOp).getTransformFn().apply(mockMsg); - assertEquals(outputs.size(), 1); - assertEquals(outputs.iterator().next(), mockMsg); - } -}