http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpl.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpl.java index 705cab7..b186cdb 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpl.java @@ -46,14 +46,14 @@ public interface TriggerImpl<M, WK> { * @param message the incoming message * @param context the {@link TriggerScheduler} to schedule and cancel callbacks */ - public void onMessage(M message, TriggerScheduler<WK> context); + void onMessage(M message, TriggerScheduler<WK> context); /** * Returns {@code true} if the current state of the trigger indicates that its condition * is satisfied and it is ready to fire. * @return if this trigger should fire. */ - public boolean shouldFire(); + boolean shouldFire(); /** * Invoked when the execution of this {@link TriggerImpl} is canceled by an up-stream {@link TriggerImpl}. @@ -61,6 +61,6 @@ public interface TriggerImpl<M, WK> { * No calls to {@link #onMessage(Object, TriggerScheduler)} or {@link #shouldFire()} will be invoked * after this invocation. */ - public void cancel(); + void cancel(); }
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java index 68962ce..5043977 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java @@ -29,7 +29,8 @@ import org.apache.samza.config.StreamConfig; import org.apache.samza.execution.ExecutionPlan; import org.apache.samza.execution.ExecutionPlanner; import org.apache.samza.execution.StreamManager; -import org.apache.samza.operators.StreamGraphImpl; +import org.apache.samza.operators.OperatorSpecGraph; +import org.apache.samza.operators.StreamGraphSpec; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemAdmins; import org.slf4j.Logger; @@ -44,7 +45,7 @@ import java.util.Set; /** - * Defines common, core behavior for implementations of the {@link ApplicationRunner} API + * Defines common, core behavior for implementations of the {@link ApplicationRunner} API. */ public abstract class AbstractApplicationRunner extends ApplicationRunner { private static final Logger log = LoggerFactory.getLogger(AbstractApplicationRunner.class); @@ -52,8 +53,14 @@ public abstract class AbstractApplicationRunner extends ApplicationRunner { private final StreamManager streamManager; private final SystemAdmins systemAdmins; + /** + * The {@link ApplicationRunner} is supposed to run a single {@link StreamApplication} instance in the full life-cycle + */ + protected final StreamGraphSpec graphSpec; + public AbstractApplicationRunner(Config config) { super(config); + this.graphSpec = new StreamGraphSpec(this, config); this.systemAdmins = new SystemAdmins(config); this.streamManager = new StreamManager(systemAdmins); } @@ -126,23 +133,23 @@ public abstract class AbstractApplicationRunner extends ApplicationRunner { /* package private */ ExecutionPlan getExecutionPlan(StreamApplication app, String runId) throws Exception { // build stream graph - StreamGraphImpl streamGraph = new StreamGraphImpl(this, config); - app.init(streamGraph, config); + app.init(graphSpec, config); + OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph(); // create the physical execution plan Map<String, String> cfg = new HashMap<>(config); if (StringUtils.isNoneEmpty(runId)) { cfg.put(ApplicationConfig.APP_RUN_ID, runId); } - Set<StreamSpec> inputStreams = new HashSet<>(streamGraph.getInputOperators().keySet()); - inputStreams.removeAll(streamGraph.getOutputStreams().keySet()); + Set<StreamSpec> inputStreams = new HashSet<>(specGraph.getInputOperators().keySet()); + inputStreams.removeAll(specGraph.getOutputStreams().keySet()); ApplicationMode mode = inputStreams.stream().allMatch(StreamSpec::isBounded) ? ApplicationMode.BATCH : ApplicationMode.STREAM; cfg.put(ApplicationConfig.APP_MODE, mode.name()); ExecutionPlanner planner = new ExecutionPlanner(new MapConfig(cfg), streamManager); - return planner.plan(streamGraph); + return planner.plan(specGraph); } /* package private for testing */ http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index 1284060..d64e57a 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -42,6 +42,7 @@ import org.apache.samza.coordinator.CoordinationUtils; import org.apache.samza.coordinator.DistributedLockWithState; import org.apache.samza.execution.ExecutionPlan; import org.apache.samza.job.ApplicationStatus; +import org.apache.samza.operators.StreamGraphSpec; import org.apache.samza.processor.StreamProcessor; import org.apache.samza.processor.StreamProcessorLifecycleListener; import org.apache.samza.system.StreamSpec; @@ -139,7 +140,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner { LOG.info("LocalApplicationRunner will run " + taskName); LocalStreamProcessorLifeCycleListener listener = new LocalStreamProcessorLifeCycleListener(); - StreamProcessor processor = createStreamProcessor(jobConfig, null, listener); + StreamProcessor processor = createStreamProcessor(jobConfig, listener); numProcessorsToStart.set(1); listener.setProcessor(processor); @@ -169,7 +170,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner { plan.getJobConfigs().forEach(jobConfig -> { LOG.debug("Starting job {} StreamProcessor with config {}", jobConfig.getName(), jobConfig); LocalStreamProcessorLifeCycleListener listener = new LocalStreamProcessorLifeCycleListener(); - StreamProcessor processor = createStreamProcessor(jobConfig, app, listener); + StreamProcessor processor = createStreamProcessor(jobConfig, graphSpec, listener); listener.setProcessor(processor); processors.add(processor); }); @@ -284,15 +285,32 @@ public class LocalApplicationRunner extends AbstractApplicationRunner { /** * Create {@link StreamProcessor} based on {@link StreamApplication} and the config * @param config config - * @param app {@link StreamApplication} * @return {@link StreamProcessor]} */ /* package private */ StreamProcessor createStreamProcessor( Config config, - StreamApplication app, StreamProcessorLifecycleListener listener) { - Object taskFactory = TaskFactoryUtil.createTaskFactory(config, app, new LocalApplicationRunner(config)); + Object taskFactory = TaskFactoryUtil.createTaskFactory(config); + return getStreamProcessorInstance(config, taskFactory, listener); + } + + /** + * Create {@link StreamProcessor} based on {@link StreamApplication} and the config + * @param config config + * @param graphBuilder {@link StreamGraphSpec} + * @return {@link StreamProcessor]} + */ + /* package private */ + StreamProcessor createStreamProcessor( + Config config, + StreamGraphSpec graphBuilder, + StreamProcessorLifecycleListener listener) { + Object taskFactory = TaskFactoryUtil.createTaskFactory(graphBuilder.getOperatorSpecGraph(), graphBuilder.getContextManager()); + return getStreamProcessorInstance(config, taskFactory, listener); + } + + private StreamProcessor getStreamProcessorInstance(Config config, Object taskFactory, StreamProcessorLifecycleListener listener) { if (taskFactory instanceof StreamTaskFactory) { return new StreamProcessor( config, new HashMap<>(), (StreamTaskFactory) taskFactory, listener); http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java index 5831910..7751241 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java @@ -70,8 +70,7 @@ public class LocalContainerRunner extends AbstractApplicationRunner { @Override public void run(StreamApplication streamApp) { - super.run(streamApp); - Object taskFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, this); + Object taskFactory = getTaskFactory(streamApp); container = SamzaContainer$.MODULE$.apply( containerId, @@ -106,6 +105,14 @@ public class LocalContainerRunner extends AbstractApplicationRunner { } } + private Object getTaskFactory(StreamApplication streamApp) { + if (streamApp != null) { + streamApp.init(graphSpec, config); + return TaskFactoryUtil.createTaskFactory(graphSpec.getOperatorSpecGraph(), graphSpec.getContextManager()); + } + return TaskFactoryUtil.createTaskFactory(config); + } + @Override public void kill(StreamApplication streamApp) { // Ultimately this class probably won't end up extending ApplicationRunner, so this will be deleted http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java index e4b3c62..fdd134f 100644 --- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java +++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java @@ -18,16 +18,14 @@ */ package org.apache.samza.task; -import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; +import org.apache.samza.operators.OperatorSpecGraph; import org.apache.samza.system.EndOfStreamMessage; import org.apache.samza.system.MessageType; import org.apache.samza.operators.ContextManager; import org.apache.samza.operators.KV; -import org.apache.samza.operators.StreamGraphImpl; import org.apache.samza.operators.impl.InputOperatorImpl; import org.apache.samza.operators.impl.OperatorImplGraph; -import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemStream; import org.apache.samza.system.WatermarkMessage; @@ -39,41 +37,45 @@ import org.slf4j.LoggerFactory; /** * A {@link StreamTask} implementation that brings all the operator API implementation components together and - * feeds the input messages into the user-defined transformation chains in {@link StreamApplication}. + * feeds the input messages into the user-defined transformation chains in {@link OperatorSpecGraph}. */ public class StreamOperatorTask implements StreamTask, InitableTask, WindowableTask, ClosableTask { private static final Logger LOG = LoggerFactory.getLogger(StreamOperatorTask.class); - private final StreamApplication streamApplication; - private final ApplicationRunner runner; + private final OperatorSpecGraph specGraph; + // TODO: to be replaced by proper scope of shared context factory in SAMZA-1714 + private final ContextManager contextManager; private final Clock clock; private OperatorImplGraph operatorImplGraph; - private ContextManager contextManager; /** - * Constructs an adaptor task to run the user-implemented {@link StreamApplication}. - * @param streamApplication the user-implemented {@link StreamApplication} that creates the logical DAG - * @param runner the {@link ApplicationRunner} to get the mapping between logical and physical streams + * Constructs an adaptor task to run the user-implemented {@link OperatorSpecGraph}. + * @param specGraph the serialized version of user-implemented {@link OperatorSpecGraph} + * that includes the logical DAG + * @param contextManager the {@link ContextManager} used to set up the shared context used by operators in the DAG * @param clock the {@link Clock} to use for time-keeping */ - public StreamOperatorTask(StreamApplication streamApplication, ApplicationRunner runner, Clock clock) { - this.streamApplication = streamApplication; - this.runner = runner; + public StreamOperatorTask(OperatorSpecGraph specGraph, ContextManager contextManager, Clock clock) { + this.specGraph = specGraph.clone(); + this.contextManager = contextManager; this.clock = clock; } - public StreamOperatorTask(StreamApplication application, ApplicationRunner runner) { - this(application, runner, SystemClock.instance()); + public StreamOperatorTask(OperatorSpecGraph specGraph, ContextManager contextManager) { + this(specGraph, contextManager, SystemClock.instance()); } /** * Initializes this task during startup. * <p> - * Implementation: Initializes the user-implemented {@link StreamApplication}. The {@link StreamApplication} sets - * the input and output streams and the task-wide context manager using the {@link StreamGraphImpl} APIs, - * and the logical transforms using the {@link org.apache.samza.operators.MessageStream} APIs. It then uses - * the {@link StreamGraphImpl} to create the {@link OperatorImplGraph} corresponding to the logical DAG. + * Implementation: Initializes the runtime {@link OperatorImplGraph} according to user-defined {@link OperatorSpecGraph}. + * The {@link org.apache.samza.operators.StreamGraphSpec} sets the input and output streams and the task-wide + * context manager using the {@link org.apache.samza.operators.StreamGraph} APIs, + * and the logical transforms using the {@link org.apache.samza.operators.MessageStream} APIs. After the + * {@link org.apache.samza.operators.StreamGraphSpec} is initialized once by the application, it then creates + * an immutable {@link OperatorSpecGraph} accordingly, which is passed in to this class to create the {@link OperatorImplGraph} + * corresponding to the logical DAG. * * @param config allows accessing of fields in the configuration files that this StreamTask is specified in * @param context allows initializing and accessing contextual data of this StreamTask @@ -81,18 +83,14 @@ public class StreamOperatorTask implements StreamTask, InitableTask, WindowableT */ @Override public final void init(Config config, TaskContext context) throws Exception { - StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config); - // initialize the user-implemented stream application. - this.streamApplication.init(streamGraph, config); - // get the user-implemented context manager and initialize it - this.contextManager = streamGraph.getContextManager(); + // get the user-implemented per task context manager and initialize it if (this.contextManager != null) { this.contextManager.init(config, context); } // create the operator impl DAG corresponding to the logical operator spec DAG - this.operatorImplGraph = new OperatorImplGraph(streamGraph, config, context, clock); + this.operatorImplGraph = new OperatorImplGraph(specGraph, config, context, clock); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java b/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java index 2a894ae..38ae854 100644 --- a/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java +++ b/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java @@ -24,7 +24,8 @@ import org.apache.samza.config.Config; import org.apache.samza.config.ConfigException; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.TaskConfig; -import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.operators.ContextManager; +import org.apache.samza.operators.OperatorSpecGraph; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,19 +42,28 @@ public class TaskFactoryUtil { private static final Logger log = LoggerFactory.getLogger(TaskFactoryUtil.class); /** - * This method creates a task factory class based on the configuration and {@link StreamApplication} + * This method creates a task factory class based on the {@link StreamApplication} + * + * @param specGraph the {@link OperatorSpecGraph} + * @param contextManager the {@link ContextManager} to set up initial context for {@code specGraph} + * @return a task factory object, either a instance of {@link StreamTaskFactory} or {@link AsyncStreamTaskFactory} + */ + public static Object createTaskFactory(OperatorSpecGraph specGraph, ContextManager contextManager) { + return createStreamOperatorTaskFactory(specGraph, contextManager); + } + + /** + * This method creates a task factory class based on the configuration * * @param config the {@link Config} for this job - * @param streamApp the {@link StreamApplication} - * @param runner the {@link ApplicationRunner} to run this job * @return a task factory object, either a instance of {@link StreamTaskFactory} or {@link AsyncStreamTaskFactory} */ - public static Object createTaskFactory(Config config, StreamApplication streamApp, ApplicationRunner runner) { - return (streamApp != null) ? createStreamOperatorTaskFactory(streamApp, runner) : fromTaskClassConfig(config); + public static Object createTaskFactory(Config config) { + return fromTaskClassConfig(config); } - private static StreamTaskFactory createStreamOperatorTaskFactory(StreamApplication streamApp, ApplicationRunner runner) { - return () -> new StreamOperatorTask(streamApp, runner); + private static StreamTaskFactory createStreamOperatorTaskFactory(OperatorSpecGraph specGraph, ContextManager contextManager) { + return () -> new StreamOperatorTask(specGraph, contextManager); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index 61e8c77..64ee7f3 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -28,7 +28,6 @@ import org.apache.samza.config.Config import org.apache.samza.config.StreamConfig.Config2Stream import org.apache.samza.job.model.JobModel import org.apache.samza.metrics.MetricsReporter -import org.apache.samza.operators.functions.TimerFunction import org.apache.samza.storage.TaskStorageManager import org.apache.samza.system._ import org.apache.samza.table.TableManager http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala index e5ce3c8..029b375 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala @@ -27,10 +27,12 @@ import org.apache.samza.coordinator.JobModelManager import org.apache.samza.coordinator.stream.CoordinatorStreamManager import org.apache.samza.job.{StreamJob, StreamJobFactory} import org.apache.samza.metrics.{JmxServer, MetricsRegistryMap, MetricsReporter} +import org.apache.samza.operators.StreamGraphSpec import org.apache.samza.runtime.LocalContainerRunner import org.apache.samza.storage.ChangelogStreamManager import org.apache.samza.task.TaskFactoryUtil import org.apache.samza.util.Logging + import scala.collection.JavaConversions._ import scala.collection.mutable @@ -71,7 +73,14 @@ class ThreadJobFactory extends StreamJobFactory with Logging { val jmxServer = new JmxServer val streamApp = TaskFactoryUtil.createStreamApplication(config) val appRunner = new LocalContainerRunner(jobModel, "0") - val taskFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, appRunner) + + val taskFactory = if (streamApp != null) { + val graphSpec = new StreamGraphSpec(appRunner, config) + streamApp.init(graphSpec, config) + TaskFactoryUtil.createTaskFactory(graphSpec.getOperatorSpecGraph(), graphSpec.getContextManager) + } else { + TaskFactoryUtil.createTaskFactory(config) + } // Give developers a nice friendly warning if they've specified task.opts and are using a threaded job. config.getTaskOpts match { http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java b/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java deleted file mode 100644 index 7061732..0000000 --- a/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.example; - -import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.Config; -import org.apache.samza.operators.KV; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.runtime.LocalApplicationRunner; -import org.apache.samza.serializers.JsonSerdeV2; -import org.apache.samza.serializers.KVSerde; -import org.apache.samza.serializers.StringSerde; -import org.apache.samza.util.CommandLine; - - -/** - * Example implementation of a task that splits its input into multiple output streams. - */ -public class BroadcastExample implements StreamApplication { - - @Override - public void init(StreamGraph graph, Config config) { - graph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class))); - - MessageStream<KV<String, PageViewEvent>> inputStream = graph.getInputStream("inputStream"); - OutputStream<KV<String, PageViewEvent>> outputStream1 = graph.getOutputStream("outputStream1"); - OutputStream<KV<String, PageViewEvent>> outputStream2 = graph.getOutputStream("outputStream2"); - OutputStream<KV<String, PageViewEvent>> outputStream3 = graph.getOutputStream("outputStream3"); - - inputStream.filter(m -> m.key.equals("key1")).sendTo(outputStream1); - inputStream.filter(m -> m.key.equals("key2")).sendTo(outputStream2); - inputStream.filter(m -> m.key.equals("key3")).sendTo(outputStream3); - } - - // local execution mode - public static void main(String[] args) throws Exception { - CommandLine cmdLine = new CommandLine(); - Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - LocalApplicationRunner localRunner = new LocalApplicationRunner(config); - localRunner.run(new BroadcastExample()); - } - - class PageViewEvent { - String key; - long timestamp; - - public PageViewEvent(String key, long timestamp) { - this.key = key; - this.timestamp = timestamp; - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java deleted file mode 100644 index f9e0a3a..0000000 --- a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.example; - - -import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.Config; -import org.apache.samza.operators.KV; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.operators.functions.FlatMapFunction; -import org.apache.samza.runtime.LocalApplicationRunner; -import org.apache.samza.serializers.JsonSerdeV2; -import org.apache.samza.serializers.KVSerde; -import org.apache.samza.serializers.StringSerde; -import org.apache.samza.storage.kv.KeyValueStore; -import org.apache.samza.task.TaskContext; -import org.apache.samza.util.CommandLine; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.TimeUnit; - - -/** - * Example code using {@link KeyValueStore} to implement event-time window - */ -public class KeyValueStoreExample implements StreamApplication { - - @Override public void init(StreamGraph graph, Config config) { - MessageStream<PageViewEvent> pageViewEvents = - graph.getInputStream("pageViewEventStream", new JsonSerdeV2<>(PageViewEvent.class)); - OutputStream<KV<String, StatsOutput>> pageViewEventPerMember = - graph.getOutputStream("pageViewEventPerMember", - KVSerde.of(new StringSerde(), new JsonSerdeV2<>(StatsOutput.class))); - - pageViewEvents - .partitionBy(pve -> pve.memberId, pve -> pve, - KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class)), "partitionBy") - .map(KV::getValue) - .flatMap(new MyStatsCounter()) - .map(stats -> KV.of(stats.memberId, stats)) - .sendTo(pageViewEventPerMember); - } - - // local execution mode - public static void main(String[] args) throws Exception { - CommandLine cmdLine = new CommandLine(); - Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - LocalApplicationRunner localRunner = new LocalApplicationRunner(config); - localRunner.run(new KeyValueStoreExample()); - } - - class MyStatsCounter implements FlatMapFunction<PageViewEvent, StatsOutput> { - private final int timeoutMs = 10 * 60 * 1000; - - KeyValueStore<String, StatsWindowState> statsStore; - - class StatsWindowState { - int lastCount = 0; - long timeAtLastOutput = 0; - int newCount = 0; - } - - @Override - public Collection<StatsOutput> apply(PageViewEvent message) { - List<StatsOutput> outputStats = new ArrayList<>(); - long wndTimestamp = (long) Math.floor(TimeUnit.MILLISECONDS.toMinutes(message.timestamp) / 5) * 5; - String wndKey = String.format("%s-%d", message.memberId, wndTimestamp); - StatsWindowState curState = this.statsStore.get(wndKey); - curState.newCount++; - long curTimeMs = System.currentTimeMillis(); - if (curState.newCount > 0 && curState.timeAtLastOutput + timeoutMs < curTimeMs) { - curState.timeAtLastOutput = curTimeMs; - curState.lastCount += curState.newCount; - curState.newCount = 0; - outputStats.add(new StatsOutput(message.memberId, wndTimestamp, curState.lastCount)); - } - // update counter w/o generating output - this.statsStore.put(wndKey, curState); - return outputStats; - } - - @Override - public void init(Config config, TaskContext context) { - this.statsStore = (KeyValueStore<String, StatsWindowState>) context.getStore("my-stats-wnd-store"); - } - } - - class PageViewEvent { - String pageId; - String memberId; - long timestamp; - - PageViewEvent(String pageId, String memberId, long timestamp) { - this.pageId = pageId; - this.memberId = memberId; - this.timestamp = timestamp; - } - } - - class StatsOutput { - private String memberId; - private long timestamp; - private Integer count; - - StatsOutput(String key, long timestamp, Integer count) { - this.memberId = key; - this.timestamp = timestamp; - this.count = count; - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/example/MergeExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/MergeExample.java b/samza-core/src/test/java/org/apache/samza/example/MergeExample.java deleted file mode 100644 index 4702c9a..0000000 --- a/samza-core/src/test/java/org/apache/samza/example/MergeExample.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.example; - -import com.google.common.collect.ImmutableList; -import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.Config; -import org.apache.samza.operators.KV; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.runtime.LocalApplicationRunner; -import org.apache.samza.serializers.IntegerSerde; -import org.apache.samza.serializers.KVSerde; -import org.apache.samza.serializers.StringSerde; -import org.apache.samza.util.CommandLine; - -public class MergeExample implements StreamApplication { - - @Override - public void init(StreamGraph graph, Config config) { - graph.setDefaultSerde(new StringSerde()); - - MessageStream<String> inputStream1 = graph.getInputStream("inputStream1"); - MessageStream<String> inputStream2 = graph.getInputStream("inputStream2"); - MessageStream<String> inputStream3 = graph.getInputStream("inputStream3"); - OutputStream<KV<Integer, String>> outputStream = - graph.getOutputStream("outputStream", KVSerde.of(new IntegerSerde(), new StringSerde())); - - MessageStream - .mergeAll(ImmutableList.of(inputStream1, inputStream2, inputStream3)) - .map(m -> KV.of(m.hashCode(), m)) - .sendTo(outputStream); - } - - // local execution mode - public static void main(String[] args) throws Exception { - CommandLine cmdLine = new CommandLine(); - Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - LocalApplicationRunner localRunner = new LocalApplicationRunner(config); - localRunner.run(new MergeExample()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java deleted file mode 100644 index ff785d9..0000000 --- a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.example; - -import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.Config; -import org.apache.samza.operators.KV; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.operators.functions.JoinFunction; -import org.apache.samza.runtime.LocalApplicationRunner; -import org.apache.samza.serializers.JsonSerdeV2; -import org.apache.samza.serializers.KVSerde; -import org.apache.samza.serializers.StringSerde; -import org.apache.samza.util.CommandLine; - -import java.time.Duration; - -/** - * Simple 2-way stream-to-stream join example - */ -public class OrderShipmentJoinExample implements StreamApplication { - - @Override - public void init(StreamGraph graph, Config config) { - MessageStream<OrderRecord> orders = - graph.getInputStream("orders", new JsonSerdeV2<>(OrderRecord.class)); - MessageStream<ShipmentRecord> shipments = - graph.getInputStream("shipments", new JsonSerdeV2<>(ShipmentRecord.class)); - OutputStream<KV<String, FulfilledOrderRecord>> fulfilledOrders = - graph.getOutputStream("fulfilledOrders", - KVSerde.of(new StringSerde(), new JsonSerdeV2<>(FulfilledOrderRecord.class))); - - orders - .join(shipments, new MyJoinFunction(), - new StringSerde(), new JsonSerdeV2<>(OrderRecord.class), new JsonSerdeV2<>(ShipmentRecord.class), - Duration.ofMinutes(1), "join") - .map(fulFilledOrder -> KV.of(fulFilledOrder.orderId, fulFilledOrder)) - .sendTo(fulfilledOrders); - } - - // local execution mode - public static void main(String[] args) throws Exception { - CommandLine cmdLine = new CommandLine(); - Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - LocalApplicationRunner localRunner = new LocalApplicationRunner(config); - localRunner.run(new OrderShipmentJoinExample()); - } - - class MyJoinFunction implements JoinFunction<String, OrderRecord, ShipmentRecord, FulfilledOrderRecord> { - @Override - public FulfilledOrderRecord apply(OrderRecord message, ShipmentRecord otherMessage) { - return new FulfilledOrderRecord(message.orderId, message.orderTimeMs, otherMessage.shipTimeMs); - } - - @Override - public String getFirstKey(OrderRecord message) { - return message.orderId; - } - - @Override - public String getSecondKey(ShipmentRecord message) { - return message.orderId; - } - } - - class OrderRecord { - String orderId; - long orderTimeMs; - - OrderRecord(String orderId, long timeMs) { - this.orderId = orderId; - this.orderTimeMs = timeMs; - } - } - - class ShipmentRecord { - String orderId; - long shipTimeMs; - - ShipmentRecord(String orderId, long timeMs) { - this.orderId = orderId; - this.shipTimeMs = timeMs; - } - } - - class FulfilledOrderRecord { - String orderId; - long orderTimeMs; - long shipTimeMs; - - FulfilledOrderRecord(String orderId, long orderTimeMs, long shipTimeMs) { - this.orderId = orderId; - this.orderTimeMs = orderTimeMs; - this.shipTimeMs = shipTimeMs; - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java deleted file mode 100644 index 846b9f8..0000000 --- a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.example; - -import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.Config; -import org.apache.samza.operators.KV; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.operators.functions.FoldLeftFunction; -import org.apache.samza.operators.triggers.Triggers; -import org.apache.samza.operators.windows.AccumulationMode; -import org.apache.samza.operators.windows.WindowPane; -import org.apache.samza.operators.windows.Windows; -import org.apache.samza.runtime.LocalApplicationRunner; -import org.apache.samza.serializers.JsonSerdeV2; -import org.apache.samza.serializers.KVSerde; -import org.apache.samza.serializers.StringSerde; -import org.apache.samza.util.CommandLine; - -import java.time.Duration; -import java.util.function.Supplier; - - -/** - * Example code to implement window-based counter - */ -public class PageViewCounterExample implements StreamApplication { - - @Override public void init(StreamGraph graph, Config config) { - MessageStream<PageViewEvent> pageViewEvents = - graph.getInputStream("pageViewEventStream", new JsonSerdeV2<>(PageViewEvent.class)); - OutputStream<KV<String, PageViewCount>> pageViewEventPerMemberStream = - graph.getOutputStream("pageViewEventPerMemberStream", - KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewCount.class))); - - Supplier<Integer> initialValue = () -> 0; - FoldLeftFunction<PageViewEvent, Integer> foldLeftFn = (m, c) -> c + 1; - pageViewEvents - .window(Windows.keyedTumblingWindow(m -> m.memberId, Duration.ofSeconds(10), initialValue, foldLeftFn, null, null) - .setEarlyTrigger(Triggers.repeat(Triggers.count(5))) - .setAccumulationMode(AccumulationMode.DISCARDING), "tumblingWindow") - .map(windowPane -> KV.of(windowPane.getKey().getKey(), new PageViewCount(windowPane))) - .sendTo(pageViewEventPerMemberStream); - } - - // local execution mode - public static void main(String[] args) { - CommandLine cmdLine = new CommandLine(); - Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - LocalApplicationRunner localRunner = new LocalApplicationRunner(config); - localRunner.run(new PageViewCounterExample()); - } - - class PageViewEvent { - String pageId; - String memberId; - long timestamp; - - PageViewEvent(String pageId, String memberId, long timestamp) { - this.pageId = pageId; - this.memberId = memberId; - this.timestamp = timestamp; - } - } - - class PageViewCount { - String memberId; - long timestamp; - int count; - - PageViewCount(WindowPane<String, Integer> m) { - this.memberId = m.getKey().getKey(); - this.timestamp = Long.valueOf(m.getKey().getPaneId()); - this.count = m.getMessage(); - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java deleted file mode 100644 index c9bcc45..0000000 --- a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.example; - -import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.Config; -import org.apache.samza.operators.KV; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.operators.windows.WindowPane; -import org.apache.samza.operators.windows.Windows; -import org.apache.samza.runtime.LocalApplicationRunner; -import org.apache.samza.serializers.JsonSerdeV2; -import org.apache.samza.serializers.KVSerde; -import org.apache.samza.serializers.StringSerde; -import org.apache.samza.util.CommandLine; - -import java.time.Duration; - - -/** - * Example {@link StreamApplication} code to test the API methods with re-partition operator - */ -public class RepartitionExample implements StreamApplication { - - @Override public void init(StreamGraph graph, Config config) { - MessageStream<PageViewEvent> pageViewEvents = - graph.getInputStream("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class)); - OutputStream<KV<String, MyStreamOutput>> pageViewEventPerMember = - graph.getOutputStream("pageViewEventPerMember", - KVSerde.of(new StringSerde(), new JsonSerdeV2<>(MyStreamOutput.class))); - - pageViewEvents - .partitionBy(pve -> pve.memberId, pve -> pve, - KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class)), "partitionBy") - .window(Windows.keyedTumblingWindow(KV::getKey, Duration.ofMinutes(5), () -> 0, (m, c) -> c + 1, null, null), - "window") - .map(windowPane -> KV.of(windowPane.getKey().getKey(), new MyStreamOutput(windowPane))) - .sendTo(pageViewEventPerMember); - } - - // local execution mode - public static void main(String[] args) throws Exception { - CommandLine cmdLine = new CommandLine(); - Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - LocalApplicationRunner localRunner = new LocalApplicationRunner(config); - localRunner.run(new RepartitionExample()); - } - - class PageViewEvent { - String pageId; - String memberId; - long timestamp; - - PageViewEvent(String pageId, String memberId, long timestamp) { - this.pageId = pageId; - this.memberId = memberId; - this.timestamp = timestamp; - } - } - - class MyStreamOutput { - String memberId; - long timestamp; - int count; - - MyStreamOutput(WindowPane<String, Integer> m) { - this.memberId = m.getKey().getKey(); - this.timestamp = Long.valueOf(m.getKey().getPaneId()); - this.count = m.getMessage(); - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/example/WindowExample.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/example/WindowExample.java b/samza-core/src/test/java/org/apache/samza/example/WindowExample.java deleted file mode 100644 index 3c37c31..0000000 --- a/samza-core/src/test/java/org/apache/samza/example/WindowExample.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.example; - -import org.apache.samza.application.StreamApplication; -import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraph; -import org.apache.samza.operators.functions.FoldLeftFunction; -import org.apache.samza.operators.triggers.Triggers; -import org.apache.samza.operators.windows.WindowPane; -import org.apache.samza.operators.windows.Windows; -import org.apache.samza.runtime.LocalApplicationRunner; -import org.apache.samza.serializers.IntegerSerde; -import org.apache.samza.serializers.JsonSerdeV2; -import org.apache.samza.util.CommandLine; - -import java.time.Duration; -import java.util.function.Supplier; - - -/** - * Example implementation of a simple user-defined task w/ a window operator. - * - */ -public class WindowExample implements StreamApplication { - - @Override - public void init(StreamGraph graph, Config config) { - Supplier<Integer> initialValue = () -> 0; - FoldLeftFunction<PageViewEvent, Integer> counter = (m, c) -> c == null ? 1 : c + 1; - MessageStream<PageViewEvent> inputStream = graph.getInputStream("inputStream", new JsonSerdeV2<PageViewEvent>()); - OutputStream<Integer> outputStream = graph.getOutputStream("outputStream", new IntegerSerde()); - - // create a tumbling window that outputs the number of message collected every 10 minutes. - // also emit early results if either the number of messages collected reaches 30000, or if no new messages arrive - // for 1 minute. - inputStream - .window(Windows.tumblingWindow(Duration.ofMinutes(10), initialValue, counter, new IntegerSerde()) - .setLateTrigger(Triggers.any(Triggers.count(30000), - Triggers.timeSinceLastMessage(Duration.ofMinutes(1)))), "window") - .map(WindowPane::getMessage) - .sendTo(outputStream); - } - - // local execution mode - public static void main(String[] args) throws Exception { - CommandLine cmdLine = new CommandLine(); - Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); - LocalApplicationRunner localRunner = new LocalApplicationRunner(config); - localRunner.run(new WindowExample()); - } - - class PageViewEvent { - String key; - long timestamp; - - public PageViewEvent(String key, long timestamp) { - this.key = key; - this.timestamp = timestamp; - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java index 664f3b1..83fe5ad 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java @@ -34,8 +34,8 @@ import org.apache.samza.config.MapConfig; import org.apache.samza.config.TaskConfig; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.StreamGraphSpec; import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraphImpl; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.windows.Windows; import org.apache.samza.runtime.ApplicationRunner; @@ -97,24 +97,24 @@ public class TestExecutionPlanner { }; } - private StreamGraphImpl createSimpleGraph() { + private StreamGraphSpec createSimpleGraph() { /** * a simple graph of partitionBy and map * * input1 -> partitionBy -> map -> output1 * */ - StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config); - MessageStream<KV<Object, Object>> input1 = streamGraph.getInputStream("input1"); - OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1"); + StreamGraphSpec graphSpec = new StreamGraphSpec(runner, config); + MessageStream<KV<Object, Object>> input1 = graphSpec.getInputStream("input1"); + OutputStream<KV<Object, Object>> output1 = graphSpec.getOutputStream("output1"); input1 .partitionBy(m -> m.key, m -> m.value, "p1") .map(kv -> kv) .sendTo(output1); - return streamGraph; + return graphSpec; } - private StreamGraphImpl createStreamGraphWithJoin() { + private StreamGraphSpec createStreamGraphWithJoin() { /** * the graph looks like the following. number of partitions in parentheses. quotes indicate expected value. @@ -127,76 +127,79 @@ public class TestExecutionPlanner { * */ - StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config); + StreamGraphSpec graphSpec = new StreamGraphSpec(runner, config); MessageStream<KV<Object, Object>> messageStream1 = - streamGraph.<KV<Object, Object>>getInputStream("input1") + graphSpec.<KV<Object, Object>>getInputStream("input1") .map(m -> m); MessageStream<KV<Object, Object>> messageStream2 = - streamGraph.<KV<Object, Object>>getInputStream("input2") + graphSpec.<KV<Object, Object>>getInputStream("input2") .partitionBy(m -> m.key, m -> m.value, "p1") .filter(m -> true); MessageStream<KV<Object, Object>> messageStream3 = - streamGraph.<KV<Object, Object>>getInputStream("input3") + graphSpec.<KV<Object, Object>>getInputStream("input3") .filter(m -> true) .partitionBy(m -> m.key, m -> m.value, "p2") .map(m -> m); - OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1"); - OutputStream<KV<Object, Object>> output2 = streamGraph.getOutputStream("output2"); + OutputStream<KV<Object, Object>> output1 = graphSpec.getOutputStream("output1"); + OutputStream<KV<Object, Object>> output2 = graphSpec.getOutputStream("output2"); messageStream1 - .join(messageStream2, mock(JoinFunction.class), + .join(messageStream2, + (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class), mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2), "j1") .sendTo(output1); messageStream3 - .join(messageStream2, mock(JoinFunction.class), + .join(messageStream2, + (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class), mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j2") .sendTo(output2); - return streamGraph; + return graphSpec; } - private StreamGraphImpl createStreamGraphWithJoinAndWindow() { + private StreamGraphSpec createStreamGraphWithJoinAndWindow() { - StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config); + StreamGraphSpec graphSpec = new StreamGraphSpec(runner, config); MessageStream<KV<Object, Object>> messageStream1 = - streamGraph.<KV<Object, Object>>getInputStream("input1") + graphSpec.<KV<Object, Object>>getInputStream("input1") .map(m -> m); MessageStream<KV<Object, Object>> messageStream2 = - streamGraph.<KV<Object, Object>>getInputStream("input2") + graphSpec.<KV<Object, Object>>getInputStream("input2") .partitionBy(m -> m.key, m -> m.value, "p1") .filter(m -> true); MessageStream<KV<Object, Object>> messageStream3 = - streamGraph.<KV<Object, Object>>getInputStream("input3") + graphSpec.<KV<Object, Object>>getInputStream("input3") .filter(m -> true) .partitionBy(m -> m.key, m -> m.value, "p2") .map(m -> m); - OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1"); - OutputStream<KV<Object, Object>> output2 = streamGraph.getOutputStream("output2"); + OutputStream<KV<Object, Object>> output1 = graphSpec.getOutputStream("output1"); + OutputStream<KV<Object, Object>> output2 = graphSpec.getOutputStream("output2"); messageStream1.map(m -> m) .filter(m->true) - .window(Windows.<KV<Object, Object>, Object>keyedTumblingWindow(m -> m, Duration.ofMillis(8), - mock(Serde.class), mock(Serde.class)), "w1"); + .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(8), mock(Serde.class), mock(Serde.class)), "w1"); messageStream2.map(m -> m) .filter(m->true) - .window(Windows.<KV<Object, Object>, Object>keyedTumblingWindow(m -> m, Duration.ofMillis(16), - mock(Serde.class), mock(Serde.class)), "w2"); + .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(16), mock(Serde.class), mock(Serde.class)), "w2"); messageStream1 - .join(messageStream2, mock(JoinFunction.class), + .join(messageStream2, + (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class), mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(1600), "j1") .sendTo(output1); messageStream3 - .join(messageStream2, mock(JoinFunction.class), + .join(messageStream2, + (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class), mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(100), "j2") .sendTo(output2); messageStream3 - .join(messageStream2, mock(JoinFunction.class), + .join(messageStream2, + (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class), mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(252), "j3") .sendTo(output2); - return streamGraph; + return graphSpec; } @Before @@ -252,9 +255,9 @@ public class TestExecutionPlanner { @Test public void testCreateProcessorGraph() { ExecutionPlanner planner = new ExecutionPlanner(config, streamManager); - StreamGraphImpl streamGraph = createStreamGraphWithJoin(); + StreamGraphSpec graphSpec = createStreamGraphWithJoin(); - JobGraph jobGraph = planner.createJobGraph(streamGraph); + JobGraph jobGraph = planner.createJobGraph(graphSpec.getOperatorSpecGraph()); assertTrue(jobGraph.getSources().size() == 3); assertTrue(jobGraph.getSinks().size() == 2); assertTrue(jobGraph.getIntermediateStreams().size() == 2); // two streams generated by partitionBy @@ -263,8 +266,8 @@ public class TestExecutionPlanner { @Test public void testFetchExistingStreamPartitions() { ExecutionPlanner planner = new ExecutionPlanner(config, streamManager); - StreamGraphImpl streamGraph = createStreamGraphWithJoin(); - JobGraph jobGraph = planner.createJobGraph(streamGraph); + StreamGraphSpec graphSpec = createStreamGraphWithJoin(); + JobGraph jobGraph = planner.createJobGraph(graphSpec.getOperatorSpecGraph()); ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager); assertTrue(jobGraph.getOrCreateStreamEdge(input1).getPartitionCount() == 64); @@ -281,11 +284,11 @@ public class TestExecutionPlanner { @Test public void testCalculateJoinInputPartitions() { ExecutionPlanner planner = new ExecutionPlanner(config, streamManager); - StreamGraphImpl streamGraph = createStreamGraphWithJoin(); - JobGraph jobGraph = planner.createJobGraph(streamGraph); + StreamGraphSpec graphSpec = createStreamGraphWithJoin(); + JobGraph jobGraph = planner.createJobGraph(graphSpec.getOperatorSpecGraph()); ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager); - ExecutionPlanner.calculateJoinInputPartitions(streamGraph, jobGraph); + ExecutionPlanner.calculateJoinInputPartitions(jobGraph); // the partitions should be the same as input1 jobGraph.getIntermediateStreams().forEach(edge -> { @@ -300,9 +303,9 @@ public class TestExecutionPlanner { Config cfg = new MapConfig(map); ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager); - StreamGraphImpl streamGraph = createSimpleGraph(); - JobGraph jobGraph = planner.createJobGraph(streamGraph); - planner.calculatePartitions(streamGraph, jobGraph); + StreamGraphSpec graphSpec = createSimpleGraph(); + JobGraph jobGraph = planner.createJobGraph(graphSpec.getOperatorSpecGraph()); + planner.calculatePartitions(jobGraph); // the partitions should be the same as input1 jobGraph.getIntermediateStreams().forEach(edge -> { @@ -317,8 +320,8 @@ public class TestExecutionPlanner { Config cfg = new MapConfig(map); ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager); - StreamGraphImpl streamGraph = createStreamGraphWithJoin(); - ExecutionPlan plan = planner.plan(streamGraph); + StreamGraphSpec graphSpec = createStreamGraphWithJoin(); + ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph()); List<JobConfig> jobConfigs = plan.getJobConfigs(); for (JobConfig config : jobConfigs) { System.out.println(config); @@ -332,8 +335,8 @@ public class TestExecutionPlanner { Config cfg = new MapConfig(map); ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager); - StreamGraphImpl streamGraph = createStreamGraphWithJoinAndWindow(); - ExecutionPlan plan = planner.plan(streamGraph); + StreamGraphSpec graphSpec = createStreamGraphWithJoinAndWindow(); + ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph()); List<JobConfig> jobConfigs = plan.getJobConfigs(); assertEquals(1, jobConfigs.size()); @@ -349,8 +352,8 @@ public class TestExecutionPlanner { Config cfg = new MapConfig(map); ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager); - StreamGraphImpl streamGraph = createStreamGraphWithJoinAndWindow(); - ExecutionPlan plan = planner.plan(streamGraph); + StreamGraphSpec graphSpec = createStreamGraphWithJoinAndWindow(); + ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph()); List<JobConfig> jobConfigs = plan.getJobConfigs(); assertEquals(1, jobConfigs.size()); @@ -366,8 +369,8 @@ public class TestExecutionPlanner { Config cfg = new MapConfig(map); ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager); - StreamGraphImpl streamGraph = createSimpleGraph(); - ExecutionPlan plan = planner.plan(streamGraph); + StreamGraphSpec graphSpec = createSimpleGraph(); + ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph()); List<JobConfig> jobConfigs = plan.getJobConfigs(); assertEquals(1, jobConfigs.size()); assertFalse(jobConfigs.get(0).containsKey(TaskConfig.WINDOW_MS())); @@ -381,8 +384,8 @@ public class TestExecutionPlanner { Config cfg = new MapConfig(map); ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager); - StreamGraphImpl streamGraph = createSimpleGraph(); - ExecutionPlan plan = planner.plan(streamGraph); + StreamGraphSpec graphSpec = createSimpleGraph(); + ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph()); List<JobConfig> jobConfigs = plan.getJobConfigs(); assertEquals(1, jobConfigs.size()); assertEquals("2000", jobConfigs.get(0).get(TaskConfig.WINDOW_MS())); @@ -391,8 +394,8 @@ public class TestExecutionPlanner { @Test public void testCalculateIntStreamPartitions() throws Exception { ExecutionPlanner planner = new ExecutionPlanner(config, streamManager); - StreamGraphImpl streamGraph = createSimpleGraph(); - JobGraph jobGraph = (JobGraph) planner.plan(streamGraph); + StreamGraphSpec graphSpec = createSimpleGraph(); + JobGraph jobGraph = (JobGraph) planner.plan(graphSpec.getOperatorSpecGraph()); // the partitions should be the same as input1 jobGraph.getIntermediateStreams().forEach(edge -> { @@ -424,12 +427,12 @@ public class TestExecutionPlanner { int partitionLimit = ExecutionPlanner.MAX_INFERRED_PARTITIONS; ExecutionPlanner planner = new ExecutionPlanner(config, streamManager); - StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config); + StreamGraphSpec graphSpec = new StreamGraphSpec(runner, config); - MessageStream<KV<Object, Object>> input1 = streamGraph.getInputStream("input4"); - OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1"); + MessageStream<KV<Object, Object>> input1 = graphSpec.getInputStream("input4"); + OutputStream<KV<Object, Object>> output1 = graphSpec.getOutputStream("output1"); input1.partitionBy(m -> m.key, m -> m.value, "p1").map(kv -> kv).sendTo(output1); - JobGraph jobGraph = (JobGraph) planner.plan(streamGraph); + JobGraph jobGraph = (JobGraph) planner.plan(graphSpec.getOperatorSpecGraph()); // the partitions should be the same as input1 jobGraph.getIntermediateStreams().forEach(edge -> { http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java index bf131ce..359c422 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java @@ -57,16 +57,16 @@ public class TestJobGraph { * 2 9 10 */ private void createGraph1() { - graph1 = new JobGraph(null); + graph1 = new JobGraph(null, null); - JobNode n2 = graph1.getOrCreateJobNode("2", "1", null); - JobNode n3 = graph1.getOrCreateJobNode("3", "1", null); - JobNode n5 = graph1.getOrCreateJobNode("5", "1", null); - JobNode n7 = graph1.getOrCreateJobNode("7", "1", null); - JobNode n8 = graph1.getOrCreateJobNode("8", "1", null); - JobNode n9 = graph1.getOrCreateJobNode("9", "1", null); - JobNode n10 = graph1.getOrCreateJobNode("10", "1", null); - JobNode n11 = graph1.getOrCreateJobNode("11", "1", null); + JobNode n2 = graph1.getOrCreateJobNode("2", "1"); + JobNode n3 = graph1.getOrCreateJobNode("3", "1"); + JobNode n5 = graph1.getOrCreateJobNode("5", "1"); + JobNode n7 = graph1.getOrCreateJobNode("7", "1"); + JobNode n8 = graph1.getOrCreateJobNode("8", "1"); + JobNode n9 = graph1.getOrCreateJobNode("9", "1"); + JobNode n10 = graph1.getOrCreateJobNode("10", "1"); + JobNode n11 = graph1.getOrCreateJobNode("11", "1"); graph1.addSource(genStream(), n5); graph1.addSource(genStream(), n7); @@ -90,15 +90,15 @@ public class TestJobGraph { * |<---6 <--| <> */ private void createGraph2() { - graph2 = new JobGraph(null); + graph2 = new JobGraph(null, null); - JobNode n1 = graph2.getOrCreateJobNode("1", "1", null); - JobNode n2 = graph2.getOrCreateJobNode("2", "1", null); - JobNode n3 = graph2.getOrCreateJobNode("3", "1", null); - JobNode n4 = graph2.getOrCreateJobNode("4", "1", null); - JobNode n5 = graph2.getOrCreateJobNode("5", "1", null); - JobNode n6 = graph2.getOrCreateJobNode("6", "1", null); - JobNode n7 = graph2.getOrCreateJobNode("7", "1", null); + JobNode n1 = graph2.getOrCreateJobNode("1", "1"); + JobNode n2 = graph2.getOrCreateJobNode("2", "1"); + JobNode n3 = graph2.getOrCreateJobNode("3", "1"); + JobNode n4 = graph2.getOrCreateJobNode("4", "1"); + JobNode n5 = graph2.getOrCreateJobNode("5", "1"); + JobNode n6 = graph2.getOrCreateJobNode("6", "1"); + JobNode n7 = graph2.getOrCreateJobNode("7", "1"); graph2.addSource(genStream(), n1); graph2.addIntermediateStream(genStream(), n1, n2); @@ -117,10 +117,10 @@ public class TestJobGraph { * 1<->1 -> 2<->2 */ private void createGraph3() { - graph3 = new JobGraph(null); + graph3 = new JobGraph(null, null); - JobNode n1 = graph3.getOrCreateJobNode("1", "1", null); - JobNode n2 = graph3.getOrCreateJobNode("2", "1", null); + JobNode n1 = graph3.getOrCreateJobNode("1", "1"); + JobNode n2 = graph3.getOrCreateJobNode("2", "1"); graph3.addSource(genStream(), n1); graph3.addIntermediateStream(genStream(), n1, n1); @@ -133,9 +133,9 @@ public class TestJobGraph { * 1<->1 */ private void createGraph4() { - graph4 = new JobGraph(null); + graph4 = new JobGraph(null, null); - JobNode n1 = graph4.getOrCreateJobNode("1", "1", null); + JobNode n1 = graph4.getOrCreateJobNode("1", "1"); graph4.addSource(genStream(), n1); graph4.addIntermediateStream(genStream(), n1, n1); @@ -151,7 +151,7 @@ public class TestJobGraph { @Test public void testAddSource() { - JobGraph graph = new JobGraph(null); + JobGraph graph = new JobGraph(null, null); /** * s1 -> 1 @@ -160,9 +160,9 @@ public class TestJobGraph { * s3 -> 2 * |-> 3 */ - JobNode n1 = graph.getOrCreateJobNode("1", "1", null); - JobNode n2 = graph.getOrCreateJobNode("2", "1", null); - JobNode n3 = graph.getOrCreateJobNode("3", "1", null); + JobNode n1 = graph.getOrCreateJobNode("1", "1"); + JobNode n2 = graph.getOrCreateJobNode("2", "1"); + JobNode n3 = graph.getOrCreateJobNode("3", "1"); StreamSpec s1 = genStream(); StreamSpec s2 = genStream(); StreamSpec s3 = genStream(); @@ -173,9 +173,9 @@ public class TestJobGraph { assertTrue(graph.getSources().size() == 3); - assertTrue(graph.getOrCreateJobNode("1", "1", null).getInEdges().size() == 2); - assertTrue(graph.getOrCreateJobNode("2", "1", null).getInEdges().size() == 1); - assertTrue(graph.getOrCreateJobNode("3", "1", null).getInEdges().size() == 1); + assertTrue(graph.getOrCreateJobNode("1", "1").getInEdges().size() == 2); + assertTrue(graph.getOrCreateJobNode("2", "1").getInEdges().size() == 1); + assertTrue(graph.getOrCreateJobNode("3", "1").getInEdges().size() == 1); assertTrue(graph.getOrCreateStreamEdge(s1).getSourceNodes().size() == 0); assertTrue(graph.getOrCreateStreamEdge(s1).getTargetNodes().size() == 1); @@ -192,9 +192,9 @@ public class TestJobGraph { * 2 -> s2 * 2 -> s3 */ - JobGraph graph = new JobGraph(null); - JobNode n1 = graph.getOrCreateJobNode("1", "1", null); - JobNode n2 = graph.getOrCreateJobNode("2", "1", null); + JobGraph graph = new JobGraph(null, null); + JobNode n1 = graph.getOrCreateJobNode("1", "1"); + JobNode n2 = graph.getOrCreateJobNode("2", "1"); StreamSpec s1 = genStream(); StreamSpec s2 = genStream(); StreamSpec s3 = genStream(); @@ -203,8 +203,8 @@ public class TestJobGraph { graph.addSink(s3, n2); assertTrue(graph.getSinks().size() == 3); - assertTrue(graph.getOrCreateJobNode("1", "1", null).getOutEdges().size() == 1); - assertTrue(graph.getOrCreateJobNode("2", "1", null).getOutEdges().size() == 2); + assertTrue(graph.getOrCreateJobNode("1", "1").getOutEdges().size() == 1); + assertTrue(graph.getOrCreateJobNode("2", "1").getOutEdges().size() == 2); assertTrue(graph.getOrCreateStreamEdge(s1).getSourceNodes().size() == 1); assertTrue(graph.getOrCreateStreamEdge(s1).getTargetNodes().size() == 0); http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java index f218e89..abe8969 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java @@ -28,7 +28,7 @@ import org.apache.samza.config.MapConfig; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraphImpl; +import org.apache.samza.operators.StreamGraphSpec; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.windows.Windows; import org.apache.samza.runtime.ApplicationRunner; @@ -114,35 +114,37 @@ public class TestJobGraphJsonGenerator { when(systemAdmins.getSystemAdmin("system2")).thenReturn(systemAdmin2); StreamManager streamManager = new StreamManager(systemAdmins); - StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config); - streamGraph.setDefaultSerde(KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>())); + StreamGraphSpec graphSpec = new StreamGraphSpec(runner, config); + graphSpec.setDefaultSerde(KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>())); MessageStream<KV<Object, Object>> messageStream1 = - streamGraph.<KV<Object, Object>>getInputStream("input1") + graphSpec.<KV<Object, Object>>getInputStream("input1") .map(m -> m); MessageStream<KV<Object, Object>> messageStream2 = - streamGraph.<KV<Object, Object>>getInputStream("input2") + graphSpec.<KV<Object, Object>>getInputStream("input2") .partitionBy(m -> m.key, m -> m.value, "p1") .filter(m -> true); MessageStream<KV<Object, Object>> messageStream3 = - streamGraph.<KV<Object, Object>>getInputStream("input3") + graphSpec.<KV<Object, Object>>getInputStream("input3") .filter(m -> true) .partitionBy(m -> m.key, m -> m.value, "p2") .map(m -> m); - OutputStream<KV<Object, Object>> outputStream1 = streamGraph.getOutputStream("output1"); - OutputStream<KV<Object, Object>> outputStream2 = streamGraph.getOutputStream("output2"); + OutputStream<KV<Object, Object>> outputStream1 = graphSpec.getOutputStream("output1"); + OutputStream<KV<Object, Object>> outputStream2 = graphSpec.getOutputStream("output2"); messageStream1 - .join(messageStream2, mock(JoinFunction.class), + .join(messageStream2, + (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class), mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2), "j1") .sendTo(outputStream1); messageStream2.sink((message, collector, coordinator) -> { }); messageStream3 - .join(messageStream2, mock(JoinFunction.class), + .join(messageStream2, + (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class), mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j2") .sendTo(outputStream2); ExecutionPlanner planner = new ExecutionPlanner(config, streamManager); - ExecutionPlan plan = planner.plan(streamGraph); + ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph()); String json = plan.getPlanAsJson(); System.out.println(json); @@ -187,8 +189,8 @@ public class TestJobGraphJsonGenerator { when(systemAdmins.getSystemAdmin("kafka")).thenReturn(systemAdmin2); StreamManager streamManager = new StreamManager(systemAdmins); - StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config); - MessageStream<KV<String, PageViewEvent>> inputStream = streamGraph.getInputStream("PageView"); + StreamGraphSpec graphSpec = new StreamGraphSpec(runner, config); + MessageStream<KV<String, PageViewEvent>> inputStream = graphSpec.getInputStream("PageView"); inputStream .partitionBy(kv -> kv.getValue().getCountry(), kv -> kv.getValue(), "keyed-by-country") .window(Windows.keyedTumblingWindow(kv -> kv.getValue().getCountry(), @@ -198,10 +200,10 @@ public class TestJobGraphJsonGenerator { new StringSerde(), new LongSerde()), "count-by-country") .map(pane -> new KV<>(pane.getKey().getKey(), pane.getMessage())) - .sendTo(streamGraph.getOutputStream("PageViewCount")); + .sendTo(graphSpec.getOutputStream("PageViewCount")); ExecutionPlanner planner = new ExecutionPlanner(config, streamManager); - ExecutionPlan plan = planner.plan(streamGraph); + ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph()); String json = plan.getPlanAsJson(); System.out.println(json); http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java index 53e8bf6..c43e242 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java @@ -25,8 +25,8 @@ import org.apache.samza.config.MapConfig; import org.apache.samza.config.SerializerConfig; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.StreamGraphSpec; import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraphImpl; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.impl.store.TimestampedValueSerde; import org.apache.samza.runtime.ApplicationRunner; @@ -71,11 +71,11 @@ public class TestJobNode { when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("jobName"); when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId"); - StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mockConfig); - streamGraph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>())); - MessageStream<KV<String, Object>> input1 = streamGraph.getInputStream("input1"); - MessageStream<KV<String, Object>> input2 = streamGraph.getInputStream("input2"); - OutputStream<KV<String, Object>> output = streamGraph.getOutputStream("output"); + StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig); + graphSpec.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>())); + MessageStream<KV<String, Object>> input1 = graphSpec.getInputStream("input1"); + MessageStream<KV<String, Object>> input2 = graphSpec.getInputStream("input2"); + OutputStream<KV<String, Object>> output = graphSpec.getOutputStream("output"); JoinFunction<String, Object, Object, KV<String, Object>> mockJoinFn = mock(JoinFunction.class); input1 .partitionBy(KV::getKey, KV::getValue, "p1").map(kv -> kv.value) @@ -84,7 +84,7 @@ public class TestJobNode { Duration.ofHours(1), "j1") .sendTo(output); - JobNode jobNode = new JobNode("jobName", "jobId", streamGraph, mockConfig); + JobNode jobNode = new JobNode("jobName", "jobId", graphSpec.getOperatorSpecGraph(), mockConfig); Config config = new MapConfig(); StreamEdge input1Edge = new StreamEdge(input1Spec, config); StreamEdge input2Edge = new StreamEdge(input2Spec, config);