http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java index f7e1f36..73fb5c8 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java @@ -20,7 +20,6 @@ package org.apache.samza.operators.windows; import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.operators.data.MessageEnvelope; import org.apache.samza.operators.triggers.TimeTrigger; import org.apache.samza.operators.triggers.Trigger; import org.apache.samza.operators.triggers.Triggers; @@ -34,11 +33,11 @@ import java.util.function.Function; /** * APIs for creating different types of {@link Window}s. * - * Groups the incoming {@link MessageEnvelope}s in the {@link org.apache.samza.operators.MessageStream} into finite windows for processing. + * Groups the incoming messages in the {@link org.apache.samza.operators.MessageStream} into finite windows for processing. * * <p> Each window is uniquely identified by its {@link WindowKey}. A window can have one or more associated {@link Trigger}s * that determine when results from the {@link Window} are emitted. Each emitted result contains one or more - * {@link MessageEnvelope}s in the window and is called a {@link WindowPane}. + * messages in the window and is called a {@link WindowPane}. * * <p> A window can have early triggers that allow emitting {@link WindowPane}s speculatively before all data for the window * has arrived or late triggers that allow handling of late data arrivals. @@ -74,14 +73,14 @@ import java.util.function.Function; * <li> * Session Windows: A session window groups a {@link org.apache.samza.operators.MessageStream} into sessions. * A <i>session</i> captures some period of activity over a {@link org.apache.samza.operators.MessageStream}. - * The boundary for a session is defined by a {@code sessionGap}. All {@link MessageEnvelope}s that that arrive within + * The boundary for a session is defined by a {@code sessionGap}. All messages that that arrive within * the gap are grouped into the same session. * <li> * Global Windows: A global window defines a single infinite window over the entire {@link org.apache.samza.operators.MessageStream}. * An early trigger must be specified when defining a global window. * </ul> * - * <p> A {@link Window} is defined as "keyed" when the incoming {@link MessageEnvelope}s are first grouped based on their key + * <p> A {@link Window} is defined as "keyed" when the incoming messages are first grouped based on their key * and triggers are fired and window panes are emitted per-key. It is possible to construct "keyed" variants of all the above window * types. * @@ -92,7 +91,7 @@ public final class Windows { private Windows() { } /** - * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s into fixed-size, non-overlapping processing + * Creates a {@link Window} that groups incoming messages into fixed-size, non-overlapping processing * time based windows based on the provided keyFn and applies the provided fold function to them. * * <p>The below example computes the maximum value per-key over fixed size 10 second windows. @@ -101,29 +100,29 @@ public final class Windows { * MessageStream<UserClick> stream = ...; * Function<UserClick, String> keyFn = ...; * BiFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseInt(m), c); - * MessageStream<WindowOutput<WindowKey<String>, Integer>> windowedStream = stream.window( - * Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10), maxAggregator)); + * MessageStream<WindowPane<String, Integer>> windowedStream = stream.window( + * Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10), maxAggregator)); * } * </pre> * - * @param keyFn the function to extract the window key from a {@link MessageEnvelope} + * @param keyFn the function to extract the window key from a message * @param interval the duration in processing time - * @param foldFn the function to aggregate {@link MessageEnvelope}s in the {@link WindowPane} - * @param <M> the type of the input {@link MessageEnvelope} + * @param foldFn the function to aggregate messages in the {@link WindowPane} + * @param <M> the type of the input message * @param <WV> the type of the {@link WindowPane} output value * @param <K> the type of the key in the {@link Window} * @return the created {@link Window} function. */ - public static <M extends MessageEnvelope, K, WV> Window<M, K, WV, WindowPane<K, WV>> + public static <M, K, WV> Window<M, K, WV> keyedTumblingWindow(Function<M, K> keyFn, Duration interval, BiFunction<M, WV, WV> foldFn) { - Trigger defaultTrigger = new TimeTrigger(interval); + Trigger<M> defaultTrigger = new TimeTrigger<>(interval); return new WindowInternal<M, K, WV>(defaultTrigger, foldFn, keyFn, null); } /** - * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s into fixed-size, non-overlapping + * Creates a {@link Window} that groups incoming messages into fixed-size, non-overlapping * processing time based windows using the provided keyFn. * * <p>The below example groups the stream into fixed-size 10 second windows for each key. @@ -131,19 +130,18 @@ public final class Windows { * <pre> {@code * MessageStream<UserClick> stream = ...; * Function<UserClick, String> keyFn = ...; - * MessageStream<WindowOutput<WindowKey<String>, Collection<UserClick>>> windowedStream = stream.window( - * Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10))); + * MessageStream<WindowPane<String, Collection<UserClick>>> windowedStream = stream.window( + * Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10))); * } * </pre> * - * @param keyFn function to extract key from the {@link MessageEnvelope} + * @param keyFn function to extract key from the message * @param interval the duration in processing time - * @param <M> the type of the input {@link MessageEnvelope} + * @param <M> the type of the input message * @param <K> the type of the key in the {@link Window} * @return the created {@link Window} function */ - public static <M extends MessageEnvelope, K> Window<M, K, Collection<M>, WindowPane<K, Collection<M>>> - keyedTumblingWindow(Function<M, K> keyFn, Duration interval) { + public static <M, K> Window<M, K, Collection<M>> keyedTumblingWindow(Function<M, K> keyFn, Duration interval) { BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> { c.add(m); return c; @@ -160,25 +158,25 @@ public final class Windows { * <pre> {@code * MessageStream<String> stream = ...; * BiFunction<String, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseInt(m), c); - * MessageStream<WindowOutput<WindowKey, Integer>> windowedStream = stream.window( - * Windows.tumblingWindow(Duration.ofSeconds(10), maxAggregator)); + * MessageStream<WindowPane<Void, Integer>> windowedStream = stream.window( + * Windows.tumblingWindow(Duration.ofSeconds(10), maxAggregator)); * } * </pre> * * @param duration the duration in processing time - * @param foldFn to aggregate {@link MessageEnvelope}s in the {@link WindowPane} - * @param <M> the type of the input {@link MessageEnvelope} + * @param foldFn to aggregate messages in the {@link WindowPane} + * @param <M> the type of the input message * @param <WV> the type of the {@link WindowPane} output value * @return the created {@link Window} function */ - public static <M extends MessageEnvelope, WV> Window<M, Void, WV, WindowPane<Void, WV>> + public static <M, WV> Window<M, Void, WV> tumblingWindow(Duration duration, BiFunction<M, WV, WV> foldFn) { - Trigger defaultTrigger = Triggers.repeat(new TimeTrigger(duration)); - return new WindowInternal<M, Void, WV>(defaultTrigger, foldFn, null, null); + Trigger<M> defaultTrigger = Triggers.repeat(new TimeTrigger<>(duration)); + return new WindowInternal<>(defaultTrigger, foldFn, null, null); } /** - * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s into fixed-size, non-overlapping + * Creates a {@link Window} that groups incoming messages into fixed-size, non-overlapping * processing time based windows. * * <p>The below example groups the stream into fixed-size 10 second windows and computes a windowed-percentile. @@ -187,16 +185,16 @@ public final class Windows { * MessageStream<Long> stream = ...; * Function<Collection<Long, Long>> percentile99 = .. * - * MessageStream<WindowOutput<WindowKey, Collection<Long>>> windowedStream = integerStream.window(Windows.tumblingWindow(Duration.ofSeconds(10))); + * MessageStream<WindowPane<Void, Collection<Long>>> windowedStream = integerStream.window(Windows.tumblingWindow(Duration.ofSeconds(10))); * MessageStream<Long> windowedPercentiles = windowed.map(windowedOutput -> percentile99(windowedOutput.getMessage()); * } * </pre> * * @param duration the duration in processing time - * @param <M> the type of the input {@link MessageEnvelope} + * @param <M> the type of the input message * @return the created {@link Window} function */ - public static <M extends MessageEnvelope> Window<M, Void, Collection<M>, WindowPane<Void, Collection<M>>> tumblingWindow(Duration duration) { + public static <M> Window<M, Void, Collection<M>> tumblingWindow(Duration duration) { BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> { c.add(m); return c; @@ -205,11 +203,11 @@ public final class Windows { } /** - * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s into sessions per-key based on the provided {@code sessionGap} + * Creates a {@link Window} that groups incoming messages into sessions per-key based on the provided {@code sessionGap} * and applies the provided fold function to them. * * <p>A <i>session</i> captures some period of activity over a {@link org.apache.samza.operators.MessageStream}. - * A session is considered complete when no new messages arrive within the {@code sessionGap}. All {@link MessageEnvelope}s that arrive within + * A session is considered complete when no new messages arrive within the {@code sessionGap}. All messages that arrive within * the gap are grouped into the same session. * * <p>The below example computes the maximum value per-key over a session window of gap 10 seconds. @@ -218,29 +216,29 @@ public final class Windows { * MessageStream<UserClick> stream = ...; * BiFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseInt(m), c); * Function<UserClick, String> userIdExtractor = m -> m.getUserId()..; - * MessageStream<WindowOutput<WindowKey<String>, Integer>> windowedStream = stream.window( - * Windows.keyedSessionWindow(userIdExtractor, Duration.minute(1), maxAggregator)); + * MessageStream<WindowPane<String, Integer>> windowedStream = stream.window( + * Windows.keyedSessionWindow(userIdExtractor, Duration.minute(1), maxAggregator)); * } * </pre> * - * @param keyFn the function to extract the window key from a {@link MessageEnvelope} + * @param keyFn the function to extract the window key from a message * @param sessionGap the timeout gap for defining the session - * @param foldFn the function to aggregate {@link MessageEnvelope}s in the {@link WindowPane} - * @param <M> the type of the input {@link MessageEnvelope} + * @param foldFn the function to aggregate messages in the {@link WindowPane} + * @param <M> the type of the input message * @param <K> the type of the key in the {@link Window} * @param <WV> the type of the output value in the {@link WindowPane} * @return the created {@link Window} function */ - public static <M extends MessageEnvelope, K, WV> Window<M, K, WV, WindowPane<K, WV>> keyedSessionWindow(Function<M, K> keyFn, Duration sessionGap, BiFunction<M, WV, WV> foldFn) { - Trigger defaultTrigger = Triggers.timeSinceLastMessage(sessionGap); - return new WindowInternal<M, K, WV>(defaultTrigger, foldFn, keyFn, null); + public static <M, K, WV> Window<M, K, WV> keyedSessionWindow(Function<M, K> keyFn, Duration sessionGap, BiFunction<M, WV, WV> foldFn) { + Trigger<M> defaultTrigger = Triggers.timeSinceLastMessage(sessionGap); + return new WindowInternal<>(defaultTrigger, foldFn, keyFn, null); } /** - * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s into sessions per-key based on the provided {@code sessionGap}. + * Creates a {@link Window} that groups incoming messages into sessions per-key based on the provided {@code sessionGap}. * * <p>A <i>session</i> captures some period of activity over a {@link org.apache.samza.operators.MessageStream}. The - * boundary for the session is defined by a {@code sessionGap}. All {@link MessageEnvelope}s that that arrive within + * boundary for the session is defined by a {@code sessionGap}. All messages that that arrive within * the gap are grouped into the same session. * * <p>The below example groups the stream into per-key session windows of gap 10 seconds. @@ -249,18 +247,18 @@ public final class Windows { * MessageStream<UserClick> stream = ...; * BiFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseIntField(m), c); * Function<UserClick, String> userIdExtractor = m -> m.getUserId()..; - * MessageStream<WindowOutput<WindowKey<String>, Collection<M>>> windowedStream = stream.window( - * Windows.keyedSessionWindow(userIdExtractor, Duration.ofSeconds(10))); + * MessageStream<WindowPane<String>, Collection<M>> windowedStream = stream.window( + * Windows.keyedSessionWindow(userIdExtractor, Duration.ofSeconds(10))); * } * </pre> * - * @param keyFn the function to extract the window key from a {@link MessageEnvelope} + * @param keyFn the function to extract the window key from a message} * @param sessionGap the timeout gap for defining the session - * @param <M> the type of the input {@link MessageEnvelope} + * @param <M> the type of the input message * @param <K> the type of the key in the {@link Window} * @return the created {@link Window} function */ - public static <M extends MessageEnvelope, K> Window<M, K, Collection<M>, WindowPane<K, Collection<M>>> keyedSessionWindow(Function<M, K> keyFn, Duration sessionGap) { + public static <M, K> Window<M, K, Collection<M>> keyedSessionWindow(Function<M, K> keyFn, Duration sessionGap) { BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> { c.add(m); @@ -271,7 +269,7 @@ public final class Windows { /** - * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s into a single global window. This window does not have a + * Creates a {@link Window} that groups incoming messages into a single global window. This window does not have a * default trigger. The triggering behavior must be specified by setting an early trigger. * * <p>The below example computes the maximum value over a count based window. The window emits {@link WindowPane}s when @@ -280,36 +278,36 @@ public final class Windows { * <pre> {@code * MessageStream<Long> stream = ...; * BiFunction<Long, Long, Long> maxAggregator = (m, c)-> Math.max(m, c); - * MessageStream<WindowOutput<WindowKey, Long>> windowedStream = stream.window(Windows.globalWindow(maxAggregator) + * MessageStream<WindowPane<Void, Long>> windowedStream = stream.window(Windows.globalWindow(maxAggregator) * .setEarlyTriggers(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.ofSeconds(10)))))) * } * </pre> * - * @param foldFn the function to aggregate {@link MessageEnvelope}s in the {@link WindowPane} - * @param <M> the type of {@link MessageEnvelope} + * @param foldFn the function to aggregate messages in the {@link WindowPane} + * @param <M> the type of message * @param <WV> type of the output value in the {@link WindowPane} * @return the created {@link Window} function. */ - public static <M extends MessageEnvelope, WV> Window<M, Void, WV, WindowPane<Void, WV>> globalWindow(BiFunction<M, WV, WV> foldFn) { - return new WindowInternal<M, Void, WV>(null, foldFn, null, null); + public static <M, WV> Window<M, Void, WV> globalWindow(BiFunction<M, WV, WV> foldFn) { + return new WindowInternal<>(null, foldFn, null, null); } /** - * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s into a single global window. This window does not have a + * Creates a {@link Window} that groups incoming messages into a single global window. This window does not have a * default trigger. The triggering behavior must be specified by setting an early trigger. * * The below example groups the stream into count based windows that trigger every 50 messages or every 10 minutes. * <pre> {@code * MessageStream<Long> stream = ...; - * MessageStream<WindowOutput<WindowKey, Collection<Long>>> windowedStream = stream.window(Windows.globalWindow() + * MessageStream<WindowPane<Void, Collection<Long>> windowedStream = stream.window(Windows.globalWindow() * .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.ofSeconds(10)))))) * } * </pre> * - * @param <M> the type of {@link MessageEnvelope} + * @param <M> the type of message * @return the created {@link Window} function. */ - public static <M extends MessageEnvelope> Window<M, Void, Collection<M>, WindowPane<Void, Collection<M>>> globalWindow() { + public static <M> Window<M, Void, Collection<M>> globalWindow() { BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> { c.add(m); return c; @@ -318,7 +316,7 @@ public final class Windows { } /** - * Returns a global {@link Window} that groups incoming {@link MessageEnvelope}s using the provided keyFn. + * Returns a global {@link Window} that groups incoming messages using the provided keyFn. * The window does not have a default trigger. The triggering behavior must be specified by setting an early * trigger. * @@ -329,24 +327,24 @@ public final class Windows { * MessageStream<UserClick> stream = ...; * BiFunction<UserClick, Long, Long> maxAggregator = (m, c)-> Math.max(parseLongField(m), c); * Function<UserClick, String> keyFn = ...; - * MessageStream<WindowOutput<WindowKey<String>, Long>> windowedStream = stream.window(Windows.keyedGlobalWindow(keyFn, maxAggregator) + * MessageStream<WindowPane<String, Long>> windowedStream = stream.window(Windows.keyedGlobalWindow(keyFn, maxAggregator) * .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.minutes(10)))))) * } * </pre> * - * @param keyFn the function to extract the window key from a {@link MessageEnvelope} - * @param foldFn the function to aggregate {@link MessageEnvelope}s in the {@link WindowPane} - * @param <M> the type of {@link MessageEnvelope} + * @param keyFn the function to extract the window key from a message + * @param foldFn the function to aggregate messages in the {@link WindowPane} + * @param <M> the type of message * @param <K> type of the key in the {@link Window} * @param <WV> the type of the output value in the {@link WindowPane} * @return the created {@link Window} function */ - public static <M extends MessageEnvelope, K, WV> Window<M, K, WV, WindowPane<K, WV>> keyedGlobalWindow(Function<M, K> keyFn, BiFunction<M, WV, WV> foldFn) { + public static <M, K, WV> Window<M, K, WV> keyedGlobalWindow(Function<M, K> keyFn, BiFunction<M, WV, WV> foldFn) { return new WindowInternal<M, K, WV>(null, foldFn, keyFn, null); } /** - * Returns a global {@link Window} that groups incoming {@link MessageEnvelope}s using the provided keyFn. + * Returns a global {@link Window} that groups incoming messages using the provided keyFn. * The window does not have a default trigger. The triggering behavior must be specified by setting an early trigger. * * <p> The below example groups the stream per-key into count based windows. The window triggers every 50 messages or @@ -355,17 +353,17 @@ public final class Windows { * <pre> {@code * MessageStream<UserClick> stream = ...; * Function<UserClick, String> keyFn = ...; - * MessageStream<WindowOutput<WindowKey<String>, Collection<UserClick>>> windowedStream = stream.window(Windows.keyedGlobalWindow(keyFn) + * MessageStream<WindowPane<String, Collection<UserClick>> windowedStream = stream.window(Windows.keyedGlobalWindow(keyFn) * .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.minutes(10)))))) * } * </pre> * - * @param keyFn the function to extract the window key from a {@link MessageEnvelope} - * @param <M> the type of {@link MessageEnvelope} + * @param keyFn the function to extract the window key from a message + * @param <M> the type of message * @param <K> the type of the key in the {@link Window} * @return the created {@link Window} function */ - public static <M extends MessageEnvelope, K> Window<M, K, Collection<M>, WindowPane<K, Collection<M>>> keyedGlobalWindow(Function<M, K> keyFn) { + public static <M, K> Window<M, K, Collection<M>> keyedGlobalWindow(Function<M, K> keyFn) { BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> { c.add(m); return c;
http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java index 8825867..9479eea 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java @@ -18,11 +18,9 @@ */ package org.apache.samza.operators.windows.internal; import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.operators.data.MessageEnvelope; import org.apache.samza.operators.triggers.Trigger; import org.apache.samza.operators.windows.AccumulationMode; import org.apache.samza.operators.windows.Window; -import org.apache.samza.operators.windows.WindowPane; import java.util.function.BiFunction; import java.util.function.Function; @@ -32,9 +30,13 @@ import java.util.function.Function; * and whether to accumulate or discard previously emitted panes. * * Note: This class is meant to be used internally by Samza, and is not to be instantiated by programmers. + * + * @param <M> the type of input message + * @param <K> the type of key for the window + * @param <WV> the type of aggregated value in the window output */ @InterfaceStability.Unstable -public final class WindowInternal<M extends MessageEnvelope, K, WV> implements Window<M, K, WV, WindowPane<K, WV>> { +public final class WindowInternal<M, K, WV> implements Window<M, K, WV> { private final Trigger defaultTrigger; @@ -67,19 +69,19 @@ public final class WindowInternal<M extends MessageEnvelope, K, WV> implements W } @Override - public Window<M, K, WV, WindowPane<K, WV>> setEarlyTrigger(Trigger trigger) { + public Window<M, K, WV> setEarlyTrigger(Trigger trigger) { this.earlyTrigger = trigger; return this; } @Override - public Window<M, K, WV, WindowPane<K, WV>> setLateTrigger(Trigger trigger) { + public Window<M, K, WV> setLateTrigger(Trigger trigger) { this.lateTrigger = trigger; return this; } @Override - public Window<M, K, WV, WindowPane<K, WV>> setAccumulationMode(AccumulationMode mode) { + public Window<M, K, WV> setAccumulationMode(AccumulationMode mode) { this.mode = mode; return this; } http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java b/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java new file mode 100644 index 0000000..d0c5985 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.system; + +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.config.ConfigException; +import org.apache.samza.operators.StreamGraphFactory; +import org.apache.samza.config.Config; + + +/** + * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.operators.StreamGraph} + */ +@InterfaceStability.Unstable +public interface ExecutionEnvironment { + + String ENVIRONMENT_CONFIG = "job.execution.environment.class"; + String DEFAULT_ENVIRONMENT_CLASS = "org.apache.samza.system.StandaloneExecutionEnvironment"; + + /** + * Static method to load the local standalone environment + * + * @param config configuration passed in to initialize the Samza standalone process + * @return the standalone {@link ExecutionEnvironment} to run the user-defined stream applications + */ + static ExecutionEnvironment getLocalEnvironment(Config config) { + return null; + } + + /** + * Static method to load the non-standalone environment. + * + * @param config configuration passed in to initialize the Samza processes + * @return the configure-driven {@link ExecutionEnvironment} to run the user-defined stream applications + */ + static ExecutionEnvironment fromConfig(Config config) { + try { + if (ExecutionEnvironment.class.isAssignableFrom(Class.forName(config.get(ENVIRONMENT_CONFIG, DEFAULT_ENVIRONMENT_CLASS)))) { + return (ExecutionEnvironment) Class.forName(config.get(ENVIRONMENT_CONFIG, DEFAULT_ENVIRONMENT_CLASS)).newInstance(); + } + } catch (Exception e) { + throw new ConfigException(String.format("Problem in loading ExecutionEnvironment class %s", config.get(ENVIRONMENT_CONFIG)), e); + } + throw new ConfigException(String.format( + "Class %s does not implement interface ExecutionEnvironment properly", + config.get(ENVIRONMENT_CONFIG))); + } + + /** + * Method to be invoked to deploy and run the actual Samza jobs to execute {@link org.apache.samza.operators.StreamGraph} + * + * @param graphFactory the user-defined {@link StreamGraphFactory} object + * @param config the {@link Config} object for this job + */ + void run(StreamGraphFactory graphFactory, Config config); + +} http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-api/src/main/java/org/apache/samza/task/TaskContext.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java index a85e0b4..5779071 100644 --- a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java +++ b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java @@ -54,4 +54,14 @@ public interface TaskContext { * */ void setStartingOffset(SystemStreamPartition ssp, String offset); + + /** + * Method to allow user to return customized context + * + * @param <T> the type of user-defined task context + * @return user-defined task context object + */ + default <T extends TaskContext> T getUserDefinedContext() { + return null; + }; } http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java b/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java index 7bd62a7..e3a1290 100644 --- a/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java +++ b/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java @@ -33,7 +33,7 @@ public class TestIncomingSystemMessage { @Test public void testConstructor() { IncomingMessageEnvelope ime = mock(IncomingMessageEnvelope.class); - IncomingSystemMessageEnvelope ism = new IncomingSystemMessageEnvelope(ime); + InputMessageEnvelope ism = new InputMessageEnvelope(ime); Object mockKey = mock(Object.class); Object mockValue = mock(Object.class); http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java deleted file mode 100644 index 9679e1d..0000000 --- a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.windows; - -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; - - -public class TestWindowOutput { - @Test - public void testConstructor() { - WindowPane<String, Integer> wndOutput = WindowPane.of(new WindowKey("testMsg", null), 10); - assertEquals(wndOutput.getKey().getKey(), "testMsg"); - assertEquals(wndOutput.getMessage(), Integer.valueOf(10)); - assertFalse(wndOutput.isDelete()); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java new file mode 100644 index 0000000..809c5b4 --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.windows; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + + +public class TestWindowPane { + @Test + public void testConstructor() { + WindowPane<String, Integer> wndOutput = WindowPane.of(new WindowKey("testMsg", null), 10); + assertEquals(wndOutput.getKey().getKey(), "testMsg"); + assertEquals(wndOutput.getMessage(), Integer.valueOf(10)); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java index 286893c..d85d488 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java @@ -19,107 +19,161 @@ package org.apache.samza.operators; -import org.apache.samza.operators.data.MessageEnvelope; +import java.util.Collection; +import java.util.Collections; +import java.util.function.Function; +import java.util.HashSet; +import java.util.Set; +import org.apache.samza.config.Config; import org.apache.samza.operators.functions.FilterFunction; import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.functions.MapFunction; +import org.apache.samza.operators.functions.PartialJoinFunction; import org.apache.samza.operators.functions.SinkFunction; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.OperatorSpecs; import org.apache.samza.operators.windows.Window; import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.internal.WindowInternal; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -import java.util.function.BiFunction; +import org.apache.samza.task.TaskContext; /** * The implementation for input/output {@link MessageStream}s to/from the operators. * Users use the {@link MessageStream} API methods to describe and chain the operators specs. * - * @param <M> type of {@link MessageEnvelope}s in this {@link MessageStream} + * @param <M> type of messages in this {@link MessageStream} */ -public class MessageStreamImpl<M extends MessageEnvelope> implements MessageStream<M> { +public class MessageStreamImpl<M> implements MessageStream<M> { + /** + * The {@link StreamGraphImpl} object that contains this {@link MessageStreamImpl} + */ + private final StreamGraphImpl graph; /** - * The set of operators that consume the {@link MessageEnvelope}s in this {@link MessageStream} + * The set of operators that consume the messages in this {@link MessageStream} */ private final Set<OperatorSpec> registeredOperatorSpecs = new HashSet<>(); - @Override - public <OM extends MessageEnvelope> MessageStream<OM> map(MapFunction<M, OM> mapFn) { - OperatorSpec<OM> op = OperatorSpecs.<M, OM>createStreamOperatorSpec(m -> new ArrayList<OM>() { { - OM r = mapFn.apply(m); - if (r != null) { - this.add(r); - } - } }); + /** + * Default constructor + * + * @param graph the {@link StreamGraphImpl} object that this stream belongs to + */ + MessageStreamImpl(StreamGraphImpl graph) { + this.graph = graph; + } + + @Override public <TM> MessageStream<TM> map(MapFunction<M, TM> mapFn) { + OperatorSpec<TM> op = OperatorSpecs.<M, TM>createMapOperatorSpec(mapFn, this.graph, new MessageStreamImpl<>(this.graph)); this.registeredOperatorSpecs.add(op); - return op.getOutputStream(); + return op.getNextStream(); } - @Override - public <OM extends MessageEnvelope> MessageStream<OM> flatMap(FlatMapFunction<M, OM> flatMapFn) { - OperatorSpec<OM> op = OperatorSpecs.createStreamOperatorSpec(flatMapFn); + @Override public MessageStream<M> filter(FilterFunction<M> filterFn) { + OperatorSpec<M> op = OperatorSpecs.<M>createFilterOperatorSpec(filterFn, this.graph, new MessageStreamImpl<>(this.graph)); this.registeredOperatorSpecs.add(op); - return op.getOutputStream(); + return op.getNextStream(); } @Override - public MessageStream<M> filter(FilterFunction<M> filterFn) { - OperatorSpec<M> op = OperatorSpecs.<M, M>createStreamOperatorSpec(t -> new ArrayList<M>() { { - if (filterFn.apply(t)) { - this.add(t); - } - } }); + public <TM> MessageStream<TM> flatMap(FlatMapFunction<M, TM> flatMapFn) { + OperatorSpec<TM> op = OperatorSpecs.createStreamOperatorSpec(flatMapFn, this.graph, new MessageStreamImpl<>(this.graph)); this.registeredOperatorSpecs.add(op); - return op.getOutputStream(); + return op.getNextStream(); } @Override public void sink(SinkFunction<M> sinkFn) { - this.registeredOperatorSpecs.add(OperatorSpecs.createSinkOperatorSpec(sinkFn)); + this.registeredOperatorSpecs.add(OperatorSpecs.createSinkOperatorSpec(sinkFn, this.graph)); } - @Override - public <K, WV, WM extends WindowPane<K, WV>> MessageStream<WM> window( - Window<M, K, WV, WM> window) { - OperatorSpec<WM> wndOp = OperatorSpecs.createWindowOperatorSpec((WindowInternal<MessageEnvelope, K, WV>) window); - this.registeredOperatorSpecs.add(wndOp); - return wndOp.getOutputStream(); + @Override public void sendTo(OutputStream<M> stream) { + this.registeredOperatorSpecs.add(OperatorSpecs.createSendToOperatorSpec(stream.getSinkFunction(), this.graph, stream)); + } + + @Override public MessageStream<M> sendThrough(OutputStream<M> stream) { + this.sendTo(stream); + return this.graph.getIntStream(stream); } @Override - public <K, JM extends MessageEnvelope<K, ?>, RM extends MessageEnvelope> MessageStream<RM> join( - MessageStream<JM> otherStream, JoinFunction<M, JM, RM> joinFn) { - MessageStreamImpl<RM> outputStream = new MessageStreamImpl<>(); + public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window) { + OperatorSpec<WindowPane<K, WV>> wndOp = OperatorSpecs.createWindowOperatorSpec((WindowInternal<M, K, WV>) window, + this.graph, new MessageStreamImpl<>(this.graph)); + this.registeredOperatorSpecs.add(wndOp); + return wndOp.getNextStream(); + } - BiFunction<M, JM, RM> parJoin1 = joinFn::apply; - BiFunction<JM, M, RM> parJoin2 = (m, t1) -> joinFn.apply(t1, m); + @Override public <K, OM, RM> MessageStream<RM> join(MessageStream<OM> otherStream, JoinFunction<K, M, OM, RM> joinFn) { + MessageStreamImpl<RM> outputStream = new MessageStreamImpl<>(this.graph); + + PartialJoinFunction<K, M, OM, RM> parJoin1 = new PartialJoinFunction<K, M, OM, RM>() { + @Override + public RM apply(M m1, OM om) { + return joinFn.apply(m1, om); + } + + @Override + public K getKey(M message) { + return joinFn.getFirstKey(message); + } + + @Override + public K getOtherKey(OM message) { + return joinFn.getSecondKey(message); + } + + @Override + public void init(Config config, TaskContext context) { + joinFn.init(config, context); + } + }; + + PartialJoinFunction<K, OM, M, RM> parJoin2 = new PartialJoinFunction<K, OM, M, RM>() { + @Override + public RM apply(OM m1, M m) { + return joinFn.apply(m, m1); + } + + @Override + public K getKey(OM message) { + return joinFn.getSecondKey(message); + } + + @Override + public K getOtherKey(M message) { + return joinFn.getFirstKey(message); + } + }; // TODO: need to add default store functions for the two partial join functions - ((MessageStreamImpl<JM>) otherStream).registeredOperatorSpecs.add(OperatorSpecs.createPartialJoinOperatorSpec(parJoin2, outputStream)); - this.registeredOperatorSpecs.add(OperatorSpecs.createPartialJoinOperatorSpec(parJoin1, outputStream)); + ((MessageStreamImpl<OM>) otherStream).registeredOperatorSpecs.add( + OperatorSpecs.<OM, K, M, RM>createPartialJoinOperatorSpec(parJoin2, this.graph, outputStream)); + this.registeredOperatorSpecs.add(OperatorSpecs.<M, K, OM, RM>createPartialJoinOperatorSpec(parJoin1, this.graph, outputStream)); return outputStream; } @Override public MessageStream<M> merge(Collection<MessageStream<M>> otherStreams) { - MessageStreamImpl<M> outputStream = new MessageStreamImpl<>(); + MessageStreamImpl<M> outputStream = new MessageStreamImpl<>(this.graph); otherStreams.add(this); - otherStreams.forEach(other -> - ((MessageStreamImpl<M>) other).registeredOperatorSpecs.add(OperatorSpecs.createMergeOperatorSpec(outputStream))); + otherStreams.forEach(other -> ((MessageStreamImpl<M>) other).registeredOperatorSpecs. + add(OperatorSpecs.createMergeOperatorSpec(this.graph, outputStream))); return outputStream; } + @Override + public <K> MessageStream<M> partitionBy(Function<M, K> parKeyExtractor) { + MessageStreamImpl<M> intStream = this.graph.createIntStream(parKeyExtractor); + OutputStream<M> outputStream = this.graph.getOutputStream(intStream); + this.registeredOperatorSpecs.add(OperatorSpecs.createPartitionOperatorSpec(outputStream.getSinkFunction(), + this.graph, outputStream)); + return intStream; + } /** * Gets the operator specs registered to consume the output of this {@link MessageStream}. This is an internal API and * should not be exposed to users. @@ -129,4 +183,5 @@ public class MessageStreamImpl<M extends MessageEnvelope> implements MessageStre public Collection<OperatorSpec> getRegisteredOperatorSpecs() { return Collections.unmodifiableSet(this.registeredOperatorSpecs); } + } http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/operators/StreamGraphImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/StreamGraphImpl.java new file mode 100644 index 0000000..dca3469 --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/StreamGraphImpl.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import java.util.Properties; +import java.util.function.Function; +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.functions.SinkFunction; +import org.apache.samza.serializers.Serde; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.SystemStream; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * The implementation of {@link StreamGraph} interface. This class provides implementation of methods to allow users to + * create system input/output/intermediate streams. + */ +public class StreamGraphImpl implements StreamGraph { + + /** + * Unique identifier for each {@link org.apache.samza.operators.spec.OperatorSpec} added to transform the {@link MessageEnvelope} + * in the input {@link MessageStream}s. + */ + private int opId = 0; + + private class InputStreamImpl<K, V, M extends MessageEnvelope<K, V>> extends MessageStreamImpl<M> { + final StreamSpec spec; + final Serde<K> keySerde; + final Serde<V> msgSerde; + + InputStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) { + super(graph); + this.spec = streamSpec; + this.keySerde = keySerde; + this.msgSerde = msgSerde; + } + + StreamSpec getSpec() { + return this.spec; + } + + } + + private class OutputStreamImpl<K, V, M extends MessageEnvelope<K, V>> implements OutputStream<M> { + final StreamSpec spec; + final Serde<K> keySerde; + final Serde<V> msgSerde; + + OutputStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) { + this.spec = streamSpec; + this.keySerde = keySerde; + this.msgSerde = msgSerde; + } + + StreamSpec getSpec() { + return this.spec; + } + + @Override + public SinkFunction<M> getSinkFunction() { + return (M message, MessageCollector mc, TaskCoordinator tc) -> { + // TODO: need to find a way to directly pass in the serde class names + // mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.keySerde.getClass().getName(), this.msgSerde.getClass().getName(), + // message.getKey(), message.getKey(), message.getMessage())); + mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), message.getKey(), message.getMessage())); + }; + } + } + + private class IntermediateStreamImpl<PK, K, V, M extends MessageEnvelope<K, V>> extends InputStreamImpl<K, V, M> implements OutputStream<M> { + final Function<M, PK> parKeyFn; + + /** + * Default constructor + * + * @param graph the {@link StreamGraphImpl} object that this stream belongs to + */ + IntermediateStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) { + this(graph, streamSpec, keySerde, msgSerde, null); + } + + IntermediateStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde, Function<M, PK> parKeyFn) { + super(graph, streamSpec, keySerde, msgSerde); + this.parKeyFn = parKeyFn; + } + + @Override + public SinkFunction<M> getSinkFunction() { + return (M message, MessageCollector mc, TaskCoordinator tc) -> { + // TODO: need to find a way to directly pass in the serde class names + // mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.keySerde.getClass().getName(), this.msgSerde.getClass().getName(), + // message.getKey(), message.getKey(), message.getMessage())); + if (this.parKeyFn == null) { + mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), message.getKey(), message.getMessage())); + } else { + // apply partition key function + mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.parKeyFn.apply(message), message.getKey(), message.getMessage())); + } + }; + } + } + + /** + * Maps keeping all {@link SystemStream}s that are input and output of operators in {@link StreamGraphImpl} + */ + private final Map<SystemStream, MessageStream> inStreams = new HashMap<>(); + private final Map<SystemStream, OutputStream> outStreams = new HashMap<>(); + + private ContextManager contextManager = new ContextManager() { }; + + @Override + public <K, V, M extends MessageEnvelope<K, V>> MessageStream<M> createInStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) { + if (!this.inStreams.containsKey(streamSpec.getSystemStream())) { + this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new InputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde)); + } + return this.inStreams.get(streamSpec.getSystemStream()); + } + + /** + * Helper method to be used by {@link MessageStreamImpl} class + * + * @param streamSpec the {@link StreamSpec} object defining the {@link SystemStream} as the output + * @param <M> the type of {@link MessageEnvelope}s in the output {@link SystemStream} + * @return the {@link MessageStreamImpl} object + */ + @Override + public <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createOutStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) { + if (!this.outStreams.containsKey(streamSpec.getSystemStream())) { + this.outStreams.putIfAbsent(streamSpec.getSystemStream(), new OutputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde)); + } + return this.outStreams.get(streamSpec.getSystemStream()); + } + + /** + * Helper method to be used by {@link MessageStreamImpl} class + * + * @param streamSpec the {@link StreamSpec} object defining the {@link SystemStream} as an intermediate {@link SystemStream} + * @param <M> the type of {@link MessageEnvelope}s in the output {@link SystemStream} + * @return the {@link MessageStreamImpl} object + */ + @Override + public <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createIntStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) { + if (!this.inStreams.containsKey(streamSpec.getSystemStream())) { + this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new IntermediateStreamImpl<K, K, V, M>(this, streamSpec, keySerde, msgSerde)); + } + IntermediateStreamImpl<K, K, V, M> intStream = (IntermediateStreamImpl<K, K, V, M>) this.inStreams.get(streamSpec.getSystemStream()); + if (!this.outStreams.containsKey(streamSpec.getSystemStream())) { + this.outStreams.putIfAbsent(streamSpec.getSystemStream(), intStream); + } + return intStream; + } + + @Override public Map<StreamSpec, MessageStream> getInStreams() { + Map<StreamSpec, MessageStream> inStreamMap = new HashMap<>(); + this.inStreams.forEach((ss, entry) -> inStreamMap.put(((InputStreamImpl) entry).getSpec(), entry)); + return Collections.unmodifiableMap(inStreamMap); + } + + @Override public Map<StreamSpec, OutputStream> getOutStreams() { + Map<StreamSpec, OutputStream> outStreamMap = new HashMap<>(); + this.outStreams.forEach((ss, entry) -> outStreamMap.put(((OutputStreamImpl) entry).getSpec(), entry)); + return Collections.unmodifiableMap(outStreamMap); + } + + @Override + public StreamGraph withContextManager(ContextManager manager) { + this.contextManager = manager; + return this; + } + + public int getNextOpId() { + return this.opId++; + } + + public ContextManager getContextManager() { + return this.contextManager; + } + + /** + * Helper method to be get the input stream via {@link SystemStream} + * + * @param systemStream the {@link SystemStream} + * @return a {@link MessageStreamImpl} object corresponding to the {@code systemStream} + */ + public MessageStreamImpl getInputStream(SystemStream systemStream) { + if (this.inStreams.containsKey(systemStream)) { + return (MessageStreamImpl) this.inStreams.get(systemStream); + } + return null; + } + + <M> OutputStream<M> getOutputStream(MessageStreamImpl<M> intStream) { + if (this.outStreams.containsValue(intStream)) { + return (OutputStream<M>) intStream; + } + return null; + } + + <M> MessageStream<M> getIntStream(OutputStream<M> outStream) { + if (this.inStreams.containsValue(outStream)) { + return (MessageStream<M>) outStream; + } + return null; + } + + /** + * Method to create intermediate topics for {@link MessageStreamImpl#partitionBy(Function)} method. + * + * @param parKeyFn the function to extract the partition key from the input message + * @param <PK> the type of partition key + * @param <M> the type of input message + * @return the {@link OutputStream} object for the re-partitioned stream + */ + <PK, M> MessageStreamImpl<M> createIntStream(Function<M, PK> parKeyFn) { + // TODO: placeholder to auto-generate intermediate streams via {@link StreamSpec} + StreamSpec streamSpec = new StreamSpec() { + @Override + public SystemStream getSystemStream() { + // TODO: should auto-generate intermedaite stream name here + return new SystemStream("intermediate", String.format("par-%d", StreamGraphImpl.this.opId)); + } + + @Override + public Properties getProperties() { + return null; + } + }; + + if (!this.inStreams.containsKey(streamSpec.getSystemStream())) { + this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new IntermediateStreamImpl(this, streamSpec, null, null, parKeyFn)); + } + IntermediateStreamImpl intStream = (IntermediateStreamImpl) this.inStreams.get(streamSpec.getSystemStream()); + if (!this.outStreams.containsKey(streamSpec.getSystemStream())) { + this.outStreams.putIfAbsent(streamSpec.getSystemStream(), intStream); + } + return intStream; + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/operators/StreamOperatorAdaptorTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/StreamOperatorAdaptorTask.java b/samza-operator/src/main/java/org/apache/samza/operators/StreamOperatorAdaptorTask.java deleted file mode 100644 index 152cd92..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/StreamOperatorAdaptorTask.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators; - -import org.apache.samza.config.Config; -import org.apache.samza.operators.data.IncomingSystemMessageEnvelope; -import org.apache.samza.operators.data.MessageEnvelope; -import org.apache.samza.operators.impl.OperatorImpl; -import org.apache.samza.operators.impl.OperatorImpls; -import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.system.SystemStreamPartition; -import org.apache.samza.task.InitableTask; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.StreamTask; -import org.apache.samza.task.TaskContext; -import org.apache.samza.task.TaskCoordinator; -import org.apache.samza.task.WindowableTask; - -import java.util.HashMap; -import java.util.Map; - - -/** - * An {@link StreamTask} implementation that receives {@link IncomingSystemMessageEnvelope}s and propagates them - * through the user's stream transformations defined in {@link StreamOperatorTask#transform(Map)} using the - * {@link MessageStream} APIs. - * <p> - * This class brings all the operator API implementation components together and feeds the - * {@link IncomingSystemMessageEnvelope}s into the transformation chains. - * <p> - * It accepts an instance of the user implemented {@link StreamOperatorTask}. When its own {@link #init(Config, TaskContext)} - * method is called during startup, it creates a {@link MessageStreamImpl} corresponding to each of its input - * {@link SystemStreamPartition}s and then calls the user's {@link StreamOperatorTask#transform(Map)} method. - * <p> - * When users invoke the methods on the {@link MessageStream} API to describe their stream transformations in the - * {@link StreamOperatorTask#transform(Map)} method, the underlying {@link MessageStreamImpl} creates the - * corresponding {@link org.apache.samza.operators.spec.OperatorSpec} to record information about the desired - * transformation, and returns the output {@link MessageStream} to allow further transform chaining. - * <p> - * Once the user's transformation DAGs have been described for all {@link MessageStream}s (i.e., when the - * {@link StreamOperatorTask#transform(Map)} call returns), it calls - * {@link OperatorImpls#createOperatorImpls(MessageStreamImpl, TaskContext)} for each of the input - * {@link MessageStreamImpl}. This instantiates the {@link org.apache.samza.operators.impl.OperatorImpl} DAG - * corresponding to the aforementioned {@link org.apache.samza.operators.spec.OperatorSpec} DAG and returns the - * root node of the DAG, which this class saves. - * <p> - * Now that it has the root for the DAG corresponding to each {@link SystemStreamPartition}, it can pass the message - * envelopes received in {@link StreamTask#process(IncomingMessageEnvelope, MessageCollector, TaskCoordinator)} along - * to the appropriate root nodes. From then on, each {@link org.apache.samza.operators.impl.OperatorImpl} propagates - * its transformed output to the next set of {@link org.apache.samza.operators.impl.OperatorImpl}s. - */ -public final class StreamOperatorAdaptorTask implements StreamTask, InitableTask, WindowableTask { - - /** - * A mapping from each {@link SystemStreamPartition} to the root node of its operator chain DAG. - */ - private final Map<SystemStreamPartition, OperatorImpl<IncomingSystemMessageEnvelope, ? extends MessageEnvelope>> operatorChains = new HashMap<>(); - - private final StreamOperatorTask userTask; - - public StreamOperatorAdaptorTask(StreamOperatorTask userTask) { - this.userTask = userTask; - } - - @Override - public final void init(Config config, TaskContext context) throws Exception { - if (this.userTask instanceof InitableTask) { - ((InitableTask) this.userTask).init(config, context); - } - Map<SystemStreamPartition, MessageStream<IncomingSystemMessageEnvelope>> messageStreams = new HashMap<>(); - context.getSystemStreamPartitions().forEach(ssp -> messageStreams.put(ssp, new MessageStreamImpl<>())); - this.userTask.transform(messageStreams); - messageStreams.forEach((ssp, ms) -> - operatorChains.put(ssp, OperatorImpls.createOperatorImpls((MessageStreamImpl<IncomingSystemMessageEnvelope>) ms, context))); - } - - @Override - public final void process(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) { - this.operatorChains.get(ime.getSystemStreamPartition()) - .onNext(new IncomingSystemMessageEnvelope(ime), collector, coordinator); - } - - @Override - public final void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception { - if (this.userTask instanceof WindowableTask) { - ((WindowableTask) this.userTask).window(collector, coordinator); - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java b/samza-operator/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java new file mode 100644 index 0000000..3583b92 --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.functions; + +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.config.Config; +import org.apache.samza.task.TaskContext; + + +/** + * This defines the interface function a two-way join functions that takes input messages from two input + * {@link org.apache.samza.operators.MessageStream}s and merge them into a single output joined message in the join output + */ +@InterfaceStability.Unstable +public interface PartialJoinFunction<K, M, OM, RM> extends InitFunction { + + /** + * Method to perform join method on the two input messages + * + * @param m1 message from the first input stream + * @param om message from the second input stream + * @return the joined message in the output stream + */ + RM apply(M m1, OM om); + + /** + * Method to get the key from the input message + * + * @param message the input message from the first strean + * @return the join key in the {@code message} + */ + K getKey(M message); + + /** + * Method to get the key from the input message in the other stream + * + * @param message the input message from the other stream + * @return the join key in the {@code message} + */ + K getOtherKey(OM message); + + /** + * Init method to initialize the context for this {@link PartialJoinFunction}. The default implementation is NO-OP. + * + * @param config the {@link Config} object for this task + * @param context the {@link TaskContext} object for this task + */ + default void init(Config config, TaskContext context) { } +} http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java new file mode 100644 index 0000000..66336f8 --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.spec.OperatorSpec; +import org.apache.samza.operators.spec.PartialJoinOperatorSpec; +import org.apache.samza.operators.spec.SinkOperatorSpec; +import org.apache.samza.operators.spec.StreamOperatorSpec; +import org.apache.samza.operators.spec.WindowOperatorSpec; +import org.apache.samza.system.SystemStream; +import org.apache.samza.task.TaskContext; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + + +/** + * Instantiates the DAG of {@link OperatorImpl}s corresponding to the {@link OperatorSpec}s for a + * {@link MessageStreamImpl} + */ +public class OperatorGraph { + + /** + * A {@link Map} from {@link OperatorSpec} to {@link OperatorImpl}. This map registers all {@link OperatorImpl} in the DAG + * of {@link OperatorImpl} in a {@link org.apache.samza.container.TaskInstance}. Each {@link OperatorImpl} is created + * according to a single instance of {@link OperatorSpec}. + */ + private final Map<OperatorSpec, OperatorImpl> operators = new HashMap<>(); + + /** + * This {@link Map} describes the DAG of {@link OperatorImpl} that are chained together to process the input messages. + */ + private final Map<SystemStream, RootOperatorImpl> operatorGraph = new HashMap<>(); + + /** + * Initialize the whole DAG of {@link OperatorImpl}s, based on the input {@link MessageStreamImpl} from the {@link org.apache.samza.operators.StreamGraph}. + * This method will traverse each input {@link org.apache.samza.operators.MessageStream} in the {@code inputStreams} and + * instantiate the corresponding {@link OperatorImpl} chains that take the {@link org.apache.samza.operators.MessageStream} as input. + * + * @param inputStreams the map of input {@link org.apache.samza.operators.MessageStream}s + * @param config the {@link Config} required to instantiate operators + * @param context the {@link TaskContext} required to instantiate operators + */ + public void init(Map<SystemStream, MessageStreamImpl> inputStreams, Config config, TaskContext context) { + inputStreams.forEach((ss, mstream) -> this.operatorGraph.put(ss, this.createOperatorImpls(mstream, config, context))); + } + + /** + * Method to get the corresponding {@link RootOperatorImpl} + * + * @param ss input {@link SystemStream} + * @param <M> the type of input message + * @return the {@link OperatorImpl} that starts processing the input message + */ + public <M> OperatorImpl<M, M> get(SystemStream ss) { + return this.operatorGraph.get(ss); + } + + /** + * Traverses the DAG of {@link OperatorSpec}s starting from the provided {@link MessageStreamImpl}, + * creates the corresponding DAG of {@link OperatorImpl}s, and returns its root {@link RootOperatorImpl} node. + * + * @param source the input {@link MessageStreamImpl} to instantiate {@link OperatorImpl}s for + * @param <M> the type of messagess in the {@code source} {@link MessageStreamImpl} + * @param config the {@link Config} required to instantiate operators + * @param context the {@link TaskContext} required to instantiate operators + * @return root node for the {@link OperatorImpl} DAG + */ + private <M> RootOperatorImpl<M> createOperatorImpls(MessageStreamImpl<M> source, Config config, + TaskContext context) { + // since the source message stream might have multiple operator specs registered on it, + // create a new root node as a single point of entry for the DAG. + RootOperatorImpl<M> rootOperator = new RootOperatorImpl<>(); + // create the pipeline/topology starting from the source + source.getRegisteredOperatorSpecs().forEach(registeredOperator -> { + // pass in the source and context s.t. stateful stream operators can initialize their stores + OperatorImpl<M, ?> operatorImpl = + this.createAndRegisterOperatorImpl(registeredOperator, source, config, context); + rootOperator.registerNextOperator(operatorImpl); + }); + return rootOperator; + } + + /** + * Helper method to recursively traverse the {@link OperatorSpec} DAG and instantiate and link the corresponding + * {@link OperatorImpl}s. + * + * @param operatorSpec the operatorSpec registered with the {@code source} + * @param source the source {@link MessageStreamImpl} + * @param <M> type of input message + * @param config the {@link Config} required to instantiate operators + * @param context the {@link TaskContext} required to instantiate operators + * @return the operator implementation for the operatorSpec + */ + private <M> OperatorImpl<M, ?> createAndRegisterOperatorImpl(OperatorSpec operatorSpec, + MessageStreamImpl<M> source, Config config, TaskContext context) { + if (!operators.containsKey(operatorSpec)) { + OperatorImpl<M, ?> operatorImpl = createOperatorImpl(source, operatorSpec, config, context); + if (operators.putIfAbsent(operatorSpec, operatorImpl) == null) { + // this is the first time we've added the operatorImpl corresponding to the operatorSpec, + // so traverse and initialize and register the rest of the DAG. + // initialize the corresponding operator function + operatorSpec.init(config, context); + MessageStreamImpl nextStream = operatorSpec.getNextStream(); + if (nextStream != null) { + Collection<OperatorSpec> registeredSpecs = nextStream.getRegisteredOperatorSpecs(); + registeredSpecs.forEach(registeredSpec -> { + OperatorImpl subImpl = this.createAndRegisterOperatorImpl(registeredSpec, nextStream, config, context); + operatorImpl.registerNextOperator(subImpl); + }); + } + return operatorImpl; + } + } + + // the implementation corresponding to operatorSpec has already been instantiated + // and registered, so we do not need to traverse the DAG further. + return operators.get(operatorSpec); + } + + /** + * Creates a new {@link OperatorImpl} instance for the provided {@link OperatorSpec}. + * + * @param source the source {@link MessageStreamImpl} + * @param <M> type of input message + * @param operatorSpec the immutable {@link OperatorSpec} definition. + * @param config the {@link Config} required to instantiate operators + * @param context the {@link TaskContext} required to instantiate operators + * @return the {@link OperatorImpl} implementation instance + */ + private static <M> OperatorImpl<M, ?> createOperatorImpl(MessageStreamImpl<M> source, OperatorSpec operatorSpec, Config config, TaskContext context) { + if (operatorSpec instanceof StreamOperatorSpec) { + StreamOperatorSpec<M, ?> streamOpSpec = (StreamOperatorSpec<M, ?>) operatorSpec; + return new StreamOperatorImpl<>(streamOpSpec, source, config, context); + } else if (operatorSpec instanceof SinkOperatorSpec) { + return new SinkOperatorImpl<>((SinkOperatorSpec<M>) operatorSpec, config, context); + } else if (operatorSpec instanceof WindowOperatorSpec) { + return new WindowOperatorImpl<>((WindowOperatorSpec<M, ?, ?>) operatorSpec, source, config, context); + } else if (operatorSpec instanceof PartialJoinOperatorSpec) { + return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec, source, config, context); + } + throw new IllegalArgumentException( + String.format("Unsupported OperatorSpec: %s", operatorSpec.getClass().getName())); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java index c77914e..abb1fa9 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java @@ -18,10 +18,7 @@ */ package org.apache.samza.operators.impl; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.data.MessageEnvelope; import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import java.util.HashSet; @@ -31,32 +28,24 @@ import java.util.Set; /** * Abstract base class for all stream operator implementations. */ -public abstract class OperatorImpl<M extends MessageEnvelope, RM extends MessageEnvelope> { +public abstract class OperatorImpl<M, RM> { - private final Set<OperatorImpl<RM, ? extends MessageEnvelope>> nextOperators = new HashSet<>(); + private final Set<OperatorImpl<RM, ?>> nextOperators = new HashSet<>(); /** * Register the next operator in the chain that this operator should propagate its output to. * @param nextOperator the next operator in the chain. */ - void registerNextOperator(OperatorImpl<RM, ? extends MessageEnvelope> nextOperator) { + void registerNextOperator(OperatorImpl<RM, ?> nextOperator) { nextOperators.add(nextOperator); } /** - * Initialize the initial state for stateful operators. - * - * @param source the source that this {@link OperatorImpl} operator is registered with - * @param context the task context to initialize the operator implementation - */ - public void init(MessageStream<M> source, TaskContext context) {} - - /** * Perform the transformation required for this operator and call the downstream operators. * * Must call {@link #propagateResult} to propage the output to registered downstream operators correctly. * - * @param message the input {@link MessageEnvelope} + * @param message the input message * @param collector the {@link MessageCollector} in the context * @param coordinator the {@link TaskCoordinator} in the context */ @@ -67,11 +56,12 @@ public abstract class OperatorImpl<M extends MessageEnvelope, RM extends Message * * This method <b>must</b> be called from {@link #onNext} to propagate the operator output correctly. * - * @param outputMessage output {@link MessageEnvelope} + * @param outputMessage output message * @param collector the {@link MessageCollector} in the context * @param coordinator the {@link TaskCoordinator} in the context */ void propagateResult(RM outputMessage, MessageCollector collector, TaskCoordinator coordinator) { nextOperators.forEach(sub -> sub.onNext(outputMessage, collector, coordinator)); } + } http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpls.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpls.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpls.java deleted file mode 100644 index 02095cb..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpls.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.impl; - - -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.operators.data.MessageEnvelope; -import org.apache.samza.operators.spec.OperatorSpec; -import org.apache.samza.operators.spec.PartialJoinOperatorSpec; -import org.apache.samza.operators.spec.SinkOperatorSpec; -import org.apache.samza.operators.spec.StreamOperatorSpec; -import org.apache.samza.operators.spec.WindowOperatorSpec; -import org.apache.samza.operators.windows.WindowPane; -import org.apache.samza.task.TaskContext; - -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - - -/** - * Instantiates the DAG of {@link OperatorImpl}s corresponding to the {@link OperatorSpec}s for a - * {@link MessageStreamImpl} - */ -public class OperatorImpls { - - /** - * Holds the mapping between the {@link OperatorSpec} and {@link OperatorImpl}s instances. - */ - private static final Map<OperatorSpec, OperatorImpl> OPERATOR_IMPLS = new ConcurrentHashMap<>(); - - /** - * Traverses the DAG of {@link OperatorSpec}s starting from the provided {@link MessageStreamImpl}, - * creates the corresponding DAG of {@link OperatorImpl}s, and returns its root {@link RootOperatorImpl} node. - * - * @param source the input {@link MessageStreamImpl} to instantiate {@link OperatorImpl}s for - * @param <M> the type of {@link MessageEnvelope}s in the {@code source} {@link MessageStream} - * @param context the {@link TaskContext} required to instantiate operators - * @return root node for the {@link OperatorImpl} DAG - */ - public static <M extends MessageEnvelope> RootOperatorImpl createOperatorImpls(MessageStreamImpl<M> source, TaskContext context) { - // since the source message stream might have multiple operator specs registered on it, - // create a new root node as a single point of entry for the DAG. - RootOperatorImpl<M> rootOperator = new RootOperatorImpl<>(); - // create the pipeline/topology starting from the source - source.getRegisteredOperatorSpecs().forEach(registeredOperator -> { - // pass in the source and context s.t. stateful stream operators can initialize their stores - OperatorImpl<M, ? extends MessageEnvelope> operatorImpl = - createAndRegisterOperatorImpl(registeredOperator, source, context); - rootOperator.registerNextOperator(operatorImpl); - }); - return rootOperator; - } - - /** - * Helper method to recursively traverse the {@link OperatorSpec} DAG and instantiate and link the corresponding - * {@link OperatorImpl}s. - * - * @param operatorSpec the operatorSpec registered with the {@code source} - * @param source the source {@link MessageStreamImpl} - * @param context the context of the task - * @return the operator implementation for the operatorSpec - */ - private static <M extends MessageEnvelope> OperatorImpl<M, ? extends MessageEnvelope> createAndRegisterOperatorImpl(OperatorSpec operatorSpec, - MessageStream source, TaskContext context) { - if (!OPERATOR_IMPLS.containsKey(operatorSpec)) { - OperatorImpl<M, ? extends MessageEnvelope> operatorImpl = createOperatorImpl(operatorSpec); - if (OPERATOR_IMPLS.putIfAbsent(operatorSpec, operatorImpl) == null) { - // this is the first time we've added the operatorImpl corresponding to the operatorSpec, - // so traverse and initialize and register the rest of the DAG. - MessageStream<? extends MessageEnvelope> outStream = operatorSpec.getOutputStream(); - Collection<OperatorSpec> registeredSpecs = ((MessageStreamImpl) outStream).getRegisteredOperatorSpecs(); - registeredSpecs.forEach(registeredSpec -> { - OperatorImpl subImpl = createAndRegisterOperatorImpl(registeredSpec, outStream, context); - operatorImpl.registerNextOperator(subImpl); - }); - operatorImpl.init(source, context); - return operatorImpl; - } - } - - // the implementation corresponding to operatorSpec has already been instantiated - // and registered, so we do not need to traverse the DAG further. - return OPERATOR_IMPLS.get(operatorSpec); - } - - /** - * Creates a new {@link OperatorImpl} instance for the provided {@link OperatorSpec}. - * - * @param operatorSpec the immutable {@link OperatorSpec} definition. - * @param <M> type of input {@link MessageEnvelope} - * @return the {@link OperatorImpl} implementation instance - */ - protected static <M extends MessageEnvelope> OperatorImpl<M, ? extends MessageEnvelope> createOperatorImpl(OperatorSpec operatorSpec) { - if (operatorSpec instanceof StreamOperatorSpec) { - return new StreamOperatorImpl<>((StreamOperatorSpec<M, ? extends MessageEnvelope>) operatorSpec); - } else if (operatorSpec instanceof SinkOperatorSpec) { - return new SinkOperatorImpl<>((SinkOperatorSpec<M>) operatorSpec); - } else if (operatorSpec instanceof WindowOperatorSpec) { - return new WindowOperatorImpl<>((WindowOperatorSpec<M, ?, ?, ?, ? extends WindowPane>) operatorSpec); - } else if (operatorSpec instanceof PartialJoinOperatorSpec) { - return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec); - } - throw new IllegalArgumentException( - String.format("Unsupported OperatorSpec: %s", operatorSpec.getClass().getName())); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java index 90569b4..c8515e1 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java @@ -18,9 +18,11 @@ */ package org.apache.samza.operators.impl; -import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.spec.PartialJoinOperatorSpec; import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; @@ -28,14 +30,13 @@ import org.apache.samza.task.TaskCoordinator; * Implementation of a {@link PartialJoinOperatorSpec}. This class implements function * that only takes in one input stream among all inputs to the join and generate the join output. * - * @param <M> type of {@link MessageEnvelope}s in the input stream - * @param <JM> type of {@link MessageEnvelope}s in the stream to join with - * @param <RM> type of {@link MessageEnvelope}s in the joined stream + * @param <M> type of messages in the input stream + * @param <JM> type of messages in the stream to join with + * @param <RM> type of messages in the joined stream */ -class PartialJoinOperatorImpl<M extends MessageEnvelope<K, ?>, K, JM extends MessageEnvelope<K, ?>, RM extends MessageEnvelope> - extends OperatorImpl<M, RM> { +class PartialJoinOperatorImpl<M, K, JM, RM> extends OperatorImpl<M, RM> { - PartialJoinOperatorImpl(PartialJoinOperatorSpec<M, K, JM, RM> joinOp) { + PartialJoinOperatorImpl(PartialJoinOperatorSpec<M, K, JM, RM> joinOp, MessageStreamImpl<M> source, Config config, TaskContext context) { // TODO: implement PartialJoinOperatorImpl constructor } http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java index 7132b86..4b30a5d 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java @@ -18,16 +18,15 @@ */ package org.apache.samza.operators.impl; -import org.apache.samza.operators.data.MessageEnvelope; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskCoordinator; /** - * A no-op operator implementation that forwards incoming {@link MessageEnvelope}s to all of its subscribers. - * @param <M> type of incoming {@link MessageEnvelope}s + * A no-op operator implementation that forwards incoming messages to all of its subscribers. + * @param <M> type of incoming messages */ -final class RootOperatorImpl<M extends MessageEnvelope> extends OperatorImpl<M, M> { +final class RootOperatorImpl<M> extends OperatorImpl<M, M> { @Override public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java new file mode 100644 index 0000000..2bb362c --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.spec.WindowOperatorSpec; +import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; + + +/** + * Default implementation class of a {@link WindowOperatorSpec} for a session window. + * + * @param <M> the type of input message + * @param <RK> the type of window key + * @param <WV> the type of window state + */ +class SessionWindowOperatorImpl<M, RK, WV> extends OperatorImpl<M, WindowPane<RK, WV>> { + + private final WindowOperatorSpec<M, RK, WV> windowSpec; + + SessionWindowOperatorImpl(WindowOperatorSpec<M, RK, WV> windowSpec, MessageStreamImpl<M> source, Config config, TaskContext context) { + this.windowSpec = windowSpec; + } + + @Override + public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { + } + + public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { + // This is to periodically check the timeout triggers to get the list of window states to be updated + } +}