http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpl.java 
b/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpl.java
index 705cab7..b186cdb 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpl.java
@@ -46,14 +46,14 @@ public interface TriggerImpl<M, WK> {
    * @param message the incoming message
    * @param context the {@link TriggerScheduler} to schedule and cancel 
callbacks
    */
-  public void onMessage(M message, TriggerScheduler<WK> context);
+  void onMessage(M message, TriggerScheduler<WK> context);
 
   /**
    * Returns {@code true} if the current state of the trigger indicates that 
its condition
    * is satisfied and it is ready to fire.
    * @return if this trigger should fire.
    */
-  public boolean shouldFire();
+  boolean shouldFire();
 
   /**
    * Invoked when the execution of this {@link TriggerImpl} is canceled by an 
up-stream {@link TriggerImpl}.
@@ -61,6 +61,6 @@ public interface TriggerImpl<M, WK> {
    * No calls to {@link #onMessage(Object, TriggerScheduler)} or {@link 
#shouldFire()} will be invoked
    * after this invocation.
    */
-  public void cancel();
+  void cancel();
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
 
b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
index 68962ce..5043977 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
@@ -29,7 +29,8 @@ import org.apache.samza.config.StreamConfig;
 import org.apache.samza.execution.ExecutionPlan;
 import org.apache.samza.execution.ExecutionPlanner;
 import org.apache.samza.execution.StreamManager;
-import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.OperatorSpecGraph;
+import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmins;
 import org.slf4j.Logger;
@@ -44,7 +45,7 @@ import java.util.Set;
 
 
 /**
- * Defines common, core behavior for implementations of the {@link 
ApplicationRunner} API
+ * Defines common, core behavior for implementations of the {@link 
ApplicationRunner} API.
  */
 public abstract class AbstractApplicationRunner extends ApplicationRunner {
   private static final Logger log = 
LoggerFactory.getLogger(AbstractApplicationRunner.class);
@@ -52,8 +53,14 @@ public abstract class AbstractApplicationRunner extends 
ApplicationRunner {
   private final StreamManager streamManager;
   private final SystemAdmins systemAdmins;
 
+  /**
+   * The {@link ApplicationRunner} is supposed to run a single {@link 
StreamApplication} instance in the full life-cycle
+   */
+  protected final StreamGraphSpec graphSpec;
+
   public AbstractApplicationRunner(Config config) {
     super(config);
+    this.graphSpec = new StreamGraphSpec(this, config);
     this.systemAdmins = new SystemAdmins(config);
     this.streamManager = new StreamManager(systemAdmins);
   }
@@ -126,23 +133,23 @@ public abstract class AbstractApplicationRunner extends 
ApplicationRunner {
   /* package private */
   ExecutionPlan getExecutionPlan(StreamApplication app, String runId) throws 
Exception {
     // build stream graph
-    StreamGraphImpl streamGraph = new StreamGraphImpl(this, config);
-    app.init(streamGraph, config);
+    app.init(graphSpec, config);
 
+    OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
     // create the physical execution plan
     Map<String, String> cfg = new HashMap<>(config);
     if (StringUtils.isNoneEmpty(runId)) {
       cfg.put(ApplicationConfig.APP_RUN_ID, runId);
     }
 
-    Set<StreamSpec> inputStreams = new 
HashSet<>(streamGraph.getInputOperators().keySet());
-    inputStreams.removeAll(streamGraph.getOutputStreams().keySet());
+    Set<StreamSpec> inputStreams = new 
HashSet<>(specGraph.getInputOperators().keySet());
+    inputStreams.removeAll(specGraph.getOutputStreams().keySet());
     ApplicationMode mode = 
inputStreams.stream().allMatch(StreamSpec::isBounded)
         ? ApplicationMode.BATCH : ApplicationMode.STREAM;
     cfg.put(ApplicationConfig.APP_MODE, mode.name());
 
     ExecutionPlanner planner = new ExecutionPlanner(new MapConfig(cfg), 
streamManager);
-    return planner.plan(streamGraph);
+    return planner.plan(specGraph);
   }
 
   /* package private for testing */

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 1284060..d64e57a 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -42,6 +42,7 @@ import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.DistributedLockWithState;
 import org.apache.samza.execution.ExecutionPlan;
 import org.apache.samza.job.ApplicationStatus;
+import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.processor.StreamProcessor;
 import org.apache.samza.processor.StreamProcessorLifecycleListener;
 import org.apache.samza.system.StreamSpec;
@@ -139,7 +140,7 @@ public class LocalApplicationRunner extends 
AbstractApplicationRunner {
     LOG.info("LocalApplicationRunner will run " + taskName);
     LocalStreamProcessorLifeCycleListener listener = new 
LocalStreamProcessorLifeCycleListener();
 
-    StreamProcessor processor = createStreamProcessor(jobConfig, null, 
listener);
+    StreamProcessor processor = createStreamProcessor(jobConfig, listener);
 
     numProcessorsToStart.set(1);
     listener.setProcessor(processor);
@@ -169,7 +170,7 @@ public class LocalApplicationRunner extends 
AbstractApplicationRunner {
       plan.getJobConfigs().forEach(jobConfig -> {
           LOG.debug("Starting job {} StreamProcessor with config {}", 
jobConfig.getName(), jobConfig);
           LocalStreamProcessorLifeCycleListener listener = new 
LocalStreamProcessorLifeCycleListener();
-          StreamProcessor processor = createStreamProcessor(jobConfig, app, 
listener);
+          StreamProcessor processor = createStreamProcessor(jobConfig, 
graphSpec, listener);
           listener.setProcessor(processor);
           processors.add(processor);
         });
@@ -284,15 +285,32 @@ public class LocalApplicationRunner extends 
AbstractApplicationRunner {
   /**
    * Create {@link StreamProcessor} based on {@link StreamApplication} and the 
config
    * @param config config
-   * @param app {@link StreamApplication}
    * @return {@link StreamProcessor]}
    */
   /* package private */
   StreamProcessor createStreamProcessor(
       Config config,
-      StreamApplication app,
       StreamProcessorLifecycleListener listener) {
-    Object taskFactory = TaskFactoryUtil.createTaskFactory(config, app, new 
LocalApplicationRunner(config));
+    Object taskFactory = TaskFactoryUtil.createTaskFactory(config);
+    return getStreamProcessorInstance(config, taskFactory, listener);
+  }
+
+  /**
+   * Create {@link StreamProcessor} based on {@link StreamApplication} and the 
config
+   * @param config config
+   * @param graphBuilder {@link StreamGraphSpec}
+   * @return {@link StreamProcessor]}
+   */
+  /* package private */
+  StreamProcessor createStreamProcessor(
+      Config config,
+      StreamGraphSpec graphBuilder,
+      StreamProcessorLifecycleListener listener) {
+    Object taskFactory = 
TaskFactoryUtil.createTaskFactory(graphBuilder.getOperatorSpecGraph(), 
graphBuilder.getContextManager());
+    return getStreamProcessorInstance(config, taskFactory, listener);
+  }
+
+  private StreamProcessor getStreamProcessorInstance(Config config, Object 
taskFactory, StreamProcessorLifecycleListener listener) {
     if (taskFactory instanceof StreamTaskFactory) {
       return new StreamProcessor(
           config, new HashMap<>(), (StreamTaskFactory) taskFactory, listener);

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index 5831910..7751241 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -70,8 +70,7 @@ public class LocalContainerRunner extends 
AbstractApplicationRunner {
 
   @Override
   public void run(StreamApplication streamApp) {
-    super.run(streamApp);
-    Object taskFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, 
this);
+    Object taskFactory = getTaskFactory(streamApp);
 
     container = SamzaContainer$.MODULE$.apply(
         containerId,
@@ -106,6 +105,14 @@ public class LocalContainerRunner extends 
AbstractApplicationRunner {
     }
   }
 
+  private Object getTaskFactory(StreamApplication streamApp) {
+    if (streamApp != null) {
+      streamApp.init(graphSpec, config);
+      return 
TaskFactoryUtil.createTaskFactory(graphSpec.getOperatorSpecGraph(), 
graphSpec.getContextManager());
+    }
+    return TaskFactoryUtil.createTaskFactory(config);
+  }
+
   @Override
   public void kill(StreamApplication streamApp) {
     // Ultimately this class probably won't end up extending 
ApplicationRunner, so this will be deleted

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java 
b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index e4b3c62..fdd134f 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -18,16 +18,14 @@
  */
 package org.apache.samza.task;
 
-import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
+import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.system.EndOfStreamMessage;
 import org.apache.samza.system.MessageType;
 import org.apache.samza.operators.ContextManager;
 import org.apache.samza.operators.KV;
-import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.impl.InputOperatorImpl;
 import org.apache.samza.operators.impl.OperatorImplGraph;
-import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.WatermarkMessage;
@@ -39,41 +37,45 @@ import org.slf4j.LoggerFactory;
 
 /**
  * A {@link StreamTask} implementation that brings all the operator API 
implementation components together and
- * feeds the input messages into the user-defined transformation chains in 
{@link StreamApplication}.
+ * feeds the input messages into the user-defined transformation chains in 
{@link OperatorSpecGraph}.
  */
 public class StreamOperatorTask implements StreamTask, InitableTask, 
WindowableTask, ClosableTask {
   private static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorTask.class);
 
-  private final StreamApplication streamApplication;
-  private final ApplicationRunner runner;
+  private final OperatorSpecGraph specGraph;
+  // TODO: to be replaced by proper scope of shared context factory in 
SAMZA-1714
+  private final ContextManager contextManager;
   private final Clock clock;
 
   private OperatorImplGraph operatorImplGraph;
-  private ContextManager contextManager;
 
   /**
-   * Constructs an adaptor task to run the user-implemented {@link 
StreamApplication}.
-   * @param streamApplication the user-implemented {@link StreamApplication} 
that creates the logical DAG
-   * @param runner the {@link ApplicationRunner} to get the mapping between 
logical and physical streams
+   * Constructs an adaptor task to run the user-implemented {@link 
OperatorSpecGraph}.
+   * @param specGraph the serialized version of user-implemented {@link 
OperatorSpecGraph}
+   *                  that includes the logical DAG
+   * @param contextManager the {@link ContextManager} used to set up the 
shared context used by operators in the DAG
    * @param clock the {@link Clock} to use for time-keeping
    */
-  public StreamOperatorTask(StreamApplication streamApplication, 
ApplicationRunner runner, Clock clock) {
-    this.streamApplication = streamApplication;
-    this.runner = runner;
+  public StreamOperatorTask(OperatorSpecGraph specGraph, ContextManager 
contextManager, Clock clock) {
+    this.specGraph = specGraph.clone();
+    this.contextManager = contextManager;
     this.clock = clock;
   }
 
-  public StreamOperatorTask(StreamApplication application, ApplicationRunner 
runner) {
-    this(application, runner, SystemClock.instance());
+  public StreamOperatorTask(OperatorSpecGraph specGraph, ContextManager 
contextManager) {
+    this(specGraph, contextManager, SystemClock.instance());
   }
 
   /**
    * Initializes this task during startup.
    * <p>
-   * Implementation: Initializes the user-implemented {@link 
StreamApplication}. The {@link StreamApplication} sets
-   * the input and output streams and the task-wide context manager using the 
{@link StreamGraphImpl} APIs,
-   * and the logical transforms using the {@link 
org.apache.samza.operators.MessageStream} APIs. It then uses
-   * the {@link StreamGraphImpl} to create the {@link OperatorImplGraph} 
corresponding to the logical DAG.
+   * Implementation: Initializes the runtime {@link OperatorImplGraph} 
according to user-defined {@link OperatorSpecGraph}.
+   * The {@link org.apache.samza.operators.StreamGraphSpec} sets the input and 
output streams and the task-wide
+   * context manager using the {@link org.apache.samza.operators.StreamGraph} 
APIs,
+   * and the logical transforms using the {@link 
org.apache.samza.operators.MessageStream} APIs. After the
+   * {@link org.apache.samza.operators.StreamGraphSpec} is initialized once by 
the application, it then creates
+   * an immutable {@link OperatorSpecGraph} accordingly, which is passed in to 
this class to create the {@link OperatorImplGraph}
+   * corresponding to the logical DAG.
    *
    * @param config allows accessing of fields in the configuration files that 
this StreamTask is specified in
    * @param context allows initializing and accessing contextual data of this 
StreamTask
@@ -81,18 +83,14 @@ public class StreamOperatorTask implements StreamTask, 
InitableTask, WindowableT
    */
   @Override
   public final void init(Config config, TaskContext context) throws Exception {
-    StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
-    // initialize the user-implemented stream application.
-    this.streamApplication.init(streamGraph, config);
 
-    // get the user-implemented context manager and initialize it
-    this.contextManager = streamGraph.getContextManager();
+    // get the user-implemented per task context manager and initialize it
     if (this.contextManager != null) {
       this.contextManager.init(config, context);
     }
 
     // create the operator impl DAG corresponding to the logical operator spec 
DAG
-    this.operatorImplGraph = new OperatorImplGraph(streamGraph, config, 
context, clock);
+    this.operatorImplGraph = new OperatorImplGraph(specGraph, config, context, 
clock);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java 
b/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java
index 2a894ae..38ae854 100644
--- a/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java
@@ -24,7 +24,8 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.TaskConfig;
-import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.operators.ContextManager;
+import org.apache.samza.operators.OperatorSpecGraph;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,19 +42,28 @@ public class TaskFactoryUtil {
   private static final Logger log = 
LoggerFactory.getLogger(TaskFactoryUtil.class);
 
   /**
-   * This method creates a task factory class based on the configuration and 
{@link StreamApplication}
+   * This method creates a task factory class based on the {@link 
StreamApplication}
+   *
+   * @param specGraph the {@link OperatorSpecGraph}
+   * @param contextManager the {@link ContextManager} to set up initial 
context for {@code specGraph}
+   * @return  a task factory object, either a instance of {@link 
StreamTaskFactory} or {@link AsyncStreamTaskFactory}
+   */
+  public static Object createTaskFactory(OperatorSpecGraph specGraph, 
ContextManager contextManager) {
+    return createStreamOperatorTaskFactory(specGraph, contextManager);
+  }
+
+  /**
+   * This method creates a task factory class based on the configuration
    *
    * @param config  the {@link Config} for this job
-   * @param streamApp the {@link StreamApplication}
-   * @param runner  the {@link ApplicationRunner} to run this job
    * @return  a task factory object, either a instance of {@link 
StreamTaskFactory} or {@link AsyncStreamTaskFactory}
    */
-  public static Object createTaskFactory(Config config, StreamApplication 
streamApp, ApplicationRunner runner) {
-    return (streamApp != null) ? createStreamOperatorTaskFactory(streamApp, 
runner) : fromTaskClassConfig(config);
+  public static Object createTaskFactory(Config config) {
+    return fromTaskClassConfig(config);
   }
 
-  private static StreamTaskFactory 
createStreamOperatorTaskFactory(StreamApplication streamApp, ApplicationRunner 
runner) {
-    return () -> new StreamOperatorTask(streamApp, runner);
+  private static StreamTaskFactory 
createStreamOperatorTaskFactory(OperatorSpecGraph specGraph, ContextManager 
contextManager) {
+    return () -> new StreamOperatorTask(specGraph, contextManager);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 61e8c77..64ee7f3 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -28,7 +28,6 @@ import org.apache.samza.config.Config
 import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.job.model.JobModel
 import org.apache.samza.metrics.MetricsReporter
-import org.apache.samza.operators.functions.TimerFunction
 import org.apache.samza.storage.TaskStorageManager
 import org.apache.samza.system._
 import org.apache.samza.table.TableManager

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index e5ce3c8..029b375 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -27,10 +27,12 @@ import org.apache.samza.coordinator.JobModelManager
 import org.apache.samza.coordinator.stream.CoordinatorStreamManager
 import org.apache.samza.job.{StreamJob, StreamJobFactory}
 import org.apache.samza.metrics.{JmxServer, MetricsRegistryMap, 
MetricsReporter}
+import org.apache.samza.operators.StreamGraphSpec
 import org.apache.samza.runtime.LocalContainerRunner
 import org.apache.samza.storage.ChangelogStreamManager
 import org.apache.samza.task.TaskFactoryUtil
 import org.apache.samza.util.Logging
+
 import scala.collection.JavaConversions._
 import scala.collection.mutable
 
@@ -71,7 +73,14 @@ class ThreadJobFactory extends StreamJobFactory with Logging 
{
     val jmxServer = new JmxServer
     val streamApp = TaskFactoryUtil.createStreamApplication(config)
     val appRunner = new LocalContainerRunner(jobModel, "0")
-    val taskFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, 
appRunner)
+
+    val taskFactory = if (streamApp != null) {
+      val graphSpec = new StreamGraphSpec(appRunner, config)
+      streamApp.init(graphSpec, config)
+      TaskFactoryUtil.createTaskFactory(graphSpec.getOperatorSpecGraph(), 
graphSpec.getContextManager)
+    } else {
+      TaskFactoryUtil.createTaskFactory(config)
+    }
 
     // Give developers a nice friendly warning if they've specified task.opts 
and are using a threaded job.
     config.getTaskOpts match {

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java 
b/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java
deleted file mode 100644
index 7061732..0000000
--- a/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.example;
-
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.KV;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.runtime.LocalApplicationRunner;
-import org.apache.samza.serializers.JsonSerdeV2;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.util.CommandLine;
-
-
-/**
- * Example implementation of a task that splits its input into multiple output 
streams.
- */
-public class BroadcastExample implements StreamApplication {
-
-  @Override
-  public void init(StreamGraph graph, Config config) {
-    graph.setDefaultSerde(KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>(PageViewEvent.class)));
-
-    MessageStream<KV<String, PageViewEvent>> inputStream = 
graph.getInputStream("inputStream");
-    OutputStream<KV<String, PageViewEvent>> outputStream1 = 
graph.getOutputStream("outputStream1");
-    OutputStream<KV<String, PageViewEvent>> outputStream2 = 
graph.getOutputStream("outputStream2");
-    OutputStream<KV<String, PageViewEvent>> outputStream3 = 
graph.getOutputStream("outputStream3");
-
-    inputStream.filter(m -> m.key.equals("key1")).sendTo(outputStream1);
-    inputStream.filter(m -> m.key.equals("key2")).sendTo(outputStream2);
-    inputStream.filter(m -> m.key.equals("key3")).sendTo(outputStream3);
-  }
-
-  // local execution mode
-  public static void main(String[] args) throws Exception {
-    CommandLine cmdLine = new CommandLine();
-    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    LocalApplicationRunner localRunner = new LocalApplicationRunner(config);
-    localRunner.run(new BroadcastExample());
-  }
-
-  class PageViewEvent {
-    String key;
-    long timestamp;
-
-    public PageViewEvent(String key, long timestamp) {
-      this.key = key;
-      this.timestamp = timestamp;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java 
b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
deleted file mode 100644
index f9e0a3a..0000000
--- 
a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.example;
-
-
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.KV;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.runtime.LocalApplicationRunner;
-import org.apache.samza.serializers.JsonSerdeV2;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.util.CommandLine;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-
-/**
- * Example code using {@link KeyValueStore} to implement event-time window
- */
-public class KeyValueStoreExample implements StreamApplication {
-
-  @Override public void init(StreamGraph graph, Config config) {
-    MessageStream<PageViewEvent> pageViewEvents =
-        graph.getInputStream("pageViewEventStream", new 
JsonSerdeV2<>(PageViewEvent.class));
-    OutputStream<KV<String, StatsOutput>> pageViewEventPerMember =
-        graph.getOutputStream("pageViewEventPerMember",
-            KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>(StatsOutput.class)));
-
-    pageViewEvents
-        .partitionBy(pve -> pve.memberId, pve -> pve,
-            KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>(PageViewEvent.class)), "partitionBy")
-        .map(KV::getValue)
-        .flatMap(new MyStatsCounter())
-        .map(stats -> KV.of(stats.memberId, stats))
-        .sendTo(pageViewEventPerMember);
-  }
-
-  // local execution mode
-  public static void main(String[] args) throws Exception {
-    CommandLine cmdLine = new CommandLine();
-    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    LocalApplicationRunner localRunner = new LocalApplicationRunner(config);
-    localRunner.run(new KeyValueStoreExample());
-  }
-
-  class MyStatsCounter implements FlatMapFunction<PageViewEvent, StatsOutput> {
-    private final int timeoutMs = 10 * 60 * 1000;
-
-    KeyValueStore<String, StatsWindowState> statsStore;
-
-    class StatsWindowState {
-      int lastCount = 0;
-      long timeAtLastOutput = 0;
-      int newCount = 0;
-    }
-
-    @Override
-    public Collection<StatsOutput> apply(PageViewEvent message) {
-      List<StatsOutput> outputStats = new ArrayList<>();
-      long wndTimestamp = (long) 
Math.floor(TimeUnit.MILLISECONDS.toMinutes(message.timestamp) / 5) * 5;
-      String wndKey = String.format("%s-%d", message.memberId, wndTimestamp);
-      StatsWindowState curState = this.statsStore.get(wndKey);
-      curState.newCount++;
-      long curTimeMs = System.currentTimeMillis();
-      if (curState.newCount > 0 && curState.timeAtLastOutput + timeoutMs < 
curTimeMs) {
-        curState.timeAtLastOutput = curTimeMs;
-        curState.lastCount += curState.newCount;
-        curState.newCount = 0;
-        outputStats.add(new StatsOutput(message.memberId, wndTimestamp, 
curState.lastCount));
-      }
-      // update counter w/o generating output
-      this.statsStore.put(wndKey, curState);
-      return outputStats;
-    }
-
-    @Override
-    public void init(Config config, TaskContext context) {
-      this.statsStore = (KeyValueStore<String, StatsWindowState>) 
context.getStore("my-stats-wnd-store");
-    }
-  }
-
-  class PageViewEvent {
-    String pageId;
-    String memberId;
-    long timestamp;
-
-    PageViewEvent(String pageId, String memberId, long timestamp) {
-      this.pageId = pageId;
-      this.memberId = memberId;
-      this.timestamp = timestamp;
-    }
-  }
-
-  class StatsOutput {
-    private String memberId;
-    private long timestamp;
-    private Integer count;
-
-    StatsOutput(String key, long timestamp, Integer count) {
-      this.memberId = key;
-      this.timestamp = timestamp;
-      this.count = count;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/example/MergeExample.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/example/MergeExample.java 
b/samza-core/src/test/java/org/apache/samza/example/MergeExample.java
deleted file mode 100644
index 4702c9a..0000000
--- a/samza-core/src/test/java/org/apache/samza/example/MergeExample.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.example;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.KV;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.runtime.LocalApplicationRunner;
-import org.apache.samza.serializers.IntegerSerde;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.util.CommandLine;
-
-public class MergeExample implements StreamApplication {
-
-  @Override
-  public void init(StreamGraph graph, Config config) {
-    graph.setDefaultSerde(new StringSerde());
-
-    MessageStream<String> inputStream1 = graph.getInputStream("inputStream1");
-    MessageStream<String> inputStream2 = graph.getInputStream("inputStream2");
-    MessageStream<String> inputStream3 = graph.getInputStream("inputStream3");
-    OutputStream<KV<Integer, String>> outputStream =
-        graph.getOutputStream("outputStream", KVSerde.of(new IntegerSerde(), 
new StringSerde()));
-
-    MessageStream
-        .mergeAll(ImmutableList.of(inputStream1, inputStream2, inputStream3))
-        .map(m -> KV.of(m.hashCode(), m))
-        .sendTo(outputStream);
-  }
-
-  // local execution mode
-  public static void main(String[] args) throws Exception {
-    CommandLine cmdLine = new CommandLine();
-    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    LocalApplicationRunner localRunner = new LocalApplicationRunner(config);
-    localRunner.run(new MergeExample());
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
 
b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
deleted file mode 100644
index ff785d9..0000000
--- 
a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.example;
-
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.KV;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.runtime.LocalApplicationRunner;
-import org.apache.samza.serializers.JsonSerdeV2;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.util.CommandLine;
-
-import java.time.Duration;
-
-/**
- * Simple 2-way stream-to-stream join example
- */
-public class OrderShipmentJoinExample implements StreamApplication {
-
-  @Override
-  public void init(StreamGraph graph, Config config) {
-    MessageStream<OrderRecord> orders =
-        graph.getInputStream("orders", new JsonSerdeV2<>(OrderRecord.class));
-    MessageStream<ShipmentRecord> shipments =
-        graph.getInputStream("shipments", new 
JsonSerdeV2<>(ShipmentRecord.class));
-    OutputStream<KV<String, FulfilledOrderRecord>> fulfilledOrders =
-        graph.getOutputStream("fulfilledOrders",
-            KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>(FulfilledOrderRecord.class)));
-
-    orders
-        .join(shipments, new MyJoinFunction(),
-            new StringSerde(), new JsonSerdeV2<>(OrderRecord.class), new 
JsonSerdeV2<>(ShipmentRecord.class),
-            Duration.ofMinutes(1), "join")
-        .map(fulFilledOrder -> KV.of(fulFilledOrder.orderId, fulFilledOrder))
-        .sendTo(fulfilledOrders);
-  }
-
-  // local execution mode
-  public static void main(String[] args) throws Exception {
-    CommandLine cmdLine = new CommandLine();
-    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    LocalApplicationRunner localRunner = new LocalApplicationRunner(config);
-    localRunner.run(new OrderShipmentJoinExample());
-  }
-
-  class MyJoinFunction implements JoinFunction<String, OrderRecord, 
ShipmentRecord, FulfilledOrderRecord> {
-    @Override
-    public FulfilledOrderRecord apply(OrderRecord message, ShipmentRecord 
otherMessage) {
-      return new FulfilledOrderRecord(message.orderId, message.orderTimeMs, 
otherMessage.shipTimeMs);
-    }
-
-    @Override
-    public String getFirstKey(OrderRecord message) {
-      return message.orderId;
-    }
-
-    @Override
-    public String getSecondKey(ShipmentRecord message) {
-      return message.orderId;
-    }
-  }
-
-  class OrderRecord {
-    String orderId;
-    long orderTimeMs;
-
-    OrderRecord(String orderId, long timeMs) {
-      this.orderId = orderId;
-      this.orderTimeMs = timeMs;
-    }
-  }
-
-  class ShipmentRecord {
-    String orderId;
-    long shipTimeMs;
-
-    ShipmentRecord(String orderId, long timeMs) {
-      this.orderId = orderId;
-      this.shipTimeMs = timeMs;
-    }
-  }
-
-  class FulfilledOrderRecord {
-    String orderId;
-    long orderTimeMs;
-    long shipTimeMs;
-
-    FulfilledOrderRecord(String orderId, long orderTimeMs, long shipTimeMs) {
-      this.orderId = orderId;
-      this.orderTimeMs = orderTimeMs;
-      this.shipTimeMs = shipTimeMs;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java 
b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
deleted file mode 100644
index 846b9f8..0000000
--- 
a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.example;
-
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.KV;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.functions.FoldLeftFunction;
-import org.apache.samza.operators.triggers.Triggers;
-import org.apache.samza.operators.windows.AccumulationMode;
-import org.apache.samza.operators.windows.WindowPane;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.runtime.LocalApplicationRunner;
-import org.apache.samza.serializers.JsonSerdeV2;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.util.CommandLine;
-
-import java.time.Duration;
-import java.util.function.Supplier;
-
-
-/**
- * Example code to implement window-based counter
- */
-public class PageViewCounterExample implements StreamApplication {
-
-  @Override public void init(StreamGraph graph, Config config) {
-    MessageStream<PageViewEvent> pageViewEvents =
-        graph.getInputStream("pageViewEventStream", new 
JsonSerdeV2<>(PageViewEvent.class));
-    OutputStream<KV<String, PageViewCount>> pageViewEventPerMemberStream =
-        graph.getOutputStream("pageViewEventPerMemberStream",
-            KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>(PageViewCount.class)));
-
-    Supplier<Integer> initialValue = () -> 0;
-    FoldLeftFunction<PageViewEvent, Integer> foldLeftFn = (m, c) -> c + 1;
-    pageViewEvents
-        .window(Windows.keyedTumblingWindow(m -> m.memberId, 
Duration.ofSeconds(10), initialValue, foldLeftFn, null, null)
-            .setEarlyTrigger(Triggers.repeat(Triggers.count(5)))
-            .setAccumulationMode(AccumulationMode.DISCARDING), 
"tumblingWindow")
-        .map(windowPane -> KV.of(windowPane.getKey().getKey(), new 
PageViewCount(windowPane)))
-        .sendTo(pageViewEventPerMemberStream);
-  }
-
-  // local execution mode
-  public static void main(String[] args) {
-    CommandLine cmdLine = new CommandLine();
-    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    LocalApplicationRunner localRunner = new LocalApplicationRunner(config);
-    localRunner.run(new PageViewCounterExample());
-  }
-
-  class PageViewEvent {
-    String pageId;
-    String memberId;
-    long timestamp;
-
-    PageViewEvent(String pageId, String memberId, long timestamp) {
-      this.pageId = pageId;
-      this.memberId = memberId;
-      this.timestamp = timestamp;
-    }
-  }
-
-  class PageViewCount {
-    String memberId;
-    long timestamp;
-    int count;
-
-    PageViewCount(WindowPane<String, Integer> m) {
-      this.memberId = m.getKey().getKey();
-      this.timestamp = Long.valueOf(m.getKey().getPaneId());
-      this.count = m.getMessage();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java 
b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
deleted file mode 100644
index c9bcc45..0000000
--- a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.example;
-
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.KV;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.windows.WindowPane;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.runtime.LocalApplicationRunner;
-import org.apache.samza.serializers.JsonSerdeV2;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.util.CommandLine;
-
-import java.time.Duration;
-
-
-/**
- * Example {@link StreamApplication} code to test the API methods with 
re-partition operator
- */
-public class RepartitionExample implements StreamApplication {
-
-  @Override public void init(StreamGraph graph, Config config) {
-    MessageStream<PageViewEvent> pageViewEvents =
-        graph.getInputStream("pageViewEvent", new 
JsonSerdeV2<>(PageViewEvent.class));
-    OutputStream<KV<String, MyStreamOutput>> pageViewEventPerMember =
-        graph.getOutputStream("pageViewEventPerMember",
-            KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>(MyStreamOutput.class)));
-
-    pageViewEvents
-        .partitionBy(pve -> pve.memberId, pve -> pve,
-            KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>(PageViewEvent.class)), "partitionBy")
-        .window(Windows.keyedTumblingWindow(KV::getKey, Duration.ofMinutes(5), 
() -> 0, (m, c) -> c + 1, null, null),
-            "window")
-        .map(windowPane -> KV.of(windowPane.getKey().getKey(), new 
MyStreamOutput(windowPane)))
-        .sendTo(pageViewEventPerMember);
-  }
-
-  // local execution mode
-  public static void main(String[] args) throws Exception {
-    CommandLine cmdLine = new CommandLine();
-    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    LocalApplicationRunner localRunner = new LocalApplicationRunner(config);
-    localRunner.run(new RepartitionExample());
-  }
-
-  class PageViewEvent {
-    String pageId;
-    String memberId;
-    long timestamp;
-
-    PageViewEvent(String pageId, String memberId, long timestamp) {
-      this.pageId = pageId;
-      this.memberId = memberId;
-      this.timestamp = timestamp;
-    }
-  }
-
-  class MyStreamOutput {
-    String memberId;
-    long timestamp;
-    int count;
-
-    MyStreamOutput(WindowPane<String, Integer> m) {
-      this.memberId = m.getKey().getKey();
-      this.timestamp = Long.valueOf(m.getKey().getPaneId());
-      this.count = m.getMessage();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/example/WindowExample.java 
b/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
deleted file mode 100644
index 3c37c31..0000000
--- a/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.example;
-
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.functions.FoldLeftFunction;
-import org.apache.samza.operators.triggers.Triggers;
-import org.apache.samza.operators.windows.WindowPane;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.runtime.LocalApplicationRunner;
-import org.apache.samza.serializers.IntegerSerde;
-import org.apache.samza.serializers.JsonSerdeV2;
-import org.apache.samza.util.CommandLine;
-
-import java.time.Duration;
-import java.util.function.Supplier;
-
-
-/**
- * Example implementation of a simple user-defined task w/ a window operator.
- *
- */
-public class WindowExample implements StreamApplication {
-
-  @Override
-  public void init(StreamGraph graph, Config config) {
-    Supplier<Integer> initialValue = () -> 0;
-    FoldLeftFunction<PageViewEvent, Integer> counter = (m, c) -> c == null ? 1 
: c + 1;
-    MessageStream<PageViewEvent> inputStream = 
graph.getInputStream("inputStream", new JsonSerdeV2<PageViewEvent>());
-    OutputStream<Integer> outputStream = graph.getOutputStream("outputStream", 
new IntegerSerde());
-
-    // create a tumbling window that outputs the number of message collected 
every 10 minutes.
-    // also emit early results if either the number of messages collected 
reaches 30000, or if no new messages arrive
-    // for 1 minute.
-    inputStream
-        .window(Windows.tumblingWindow(Duration.ofMinutes(10), initialValue, 
counter, new IntegerSerde())
-            .setLateTrigger(Triggers.any(Triggers.count(30000),
-                Triggers.timeSinceLastMessage(Duration.ofMinutes(1)))), 
"window")
-        .map(WindowPane::getMessage)
-        .sendTo(outputStream);
-  }
-
-  // local execution mode
-  public static void main(String[] args) throws Exception {
-    CommandLine cmdLine = new CommandLine();
-    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    LocalApplicationRunner localRunner = new LocalApplicationRunner(config);
-    localRunner.run(new WindowExample());
-  }
-
-  class PageViewEvent {
-    String key;
-    long timestamp;
-
-    public PageViewEvent(String key, long timestamp) {
-      this.key = key;
-      this.timestamp = timestamp;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java 
b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index 664f3b1..83fe5ad 100644
--- 
a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ 
b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -34,8 +34,8 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.runtime.ApplicationRunner;
@@ -97,24 +97,24 @@ public class TestExecutionPlanner {
     };
   }
 
-  private StreamGraphImpl createSimpleGraph() {
+  private StreamGraphSpec createSimpleGraph() {
     /**
      * a simple graph of partitionBy and map
      *
      * input1 -> partitionBy -> map -> output1
      *
      */
-    StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
-    MessageStream<KV<Object, Object>> input1 = 
streamGraph.getInputStream("input1");
-    OutputStream<KV<Object, Object>> output1 = 
streamGraph.getOutputStream("output1");
+    StreamGraphSpec graphSpec = new StreamGraphSpec(runner, config);
+    MessageStream<KV<Object, Object>> input1 = 
graphSpec.getInputStream("input1");
+    OutputStream<KV<Object, Object>> output1 = 
graphSpec.getOutputStream("output1");
     input1
         .partitionBy(m -> m.key, m -> m.value, "p1")
         .map(kv -> kv)
         .sendTo(output1);
-    return streamGraph;
+    return graphSpec;
   }
 
-  private StreamGraphImpl createStreamGraphWithJoin() {
+  private StreamGraphSpec createStreamGraphWithJoin() {
 
     /**
      * the graph looks like the following. number of partitions in 
parentheses. quotes indicate expected value.
@@ -127,76 +127,79 @@ public class TestExecutionPlanner {
      *
      */
 
-    StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(runner, config);
     MessageStream<KV<Object, Object>> messageStream1 =
-        streamGraph.<KV<Object, Object>>getInputStream("input1")
+        graphSpec.<KV<Object, Object>>getInputStream("input1")
             .map(m -> m);
     MessageStream<KV<Object, Object>> messageStream2 =
-        streamGraph.<KV<Object, Object>>getInputStream("input2")
+        graphSpec.<KV<Object, Object>>getInputStream("input2")
             .partitionBy(m -> m.key, m -> m.value, "p1")
             .filter(m -> true);
     MessageStream<KV<Object, Object>> messageStream3 =
-        streamGraph.<KV<Object, Object>>getInputStream("input3")
+        graphSpec.<KV<Object, Object>>getInputStream("input3")
             .filter(m -> true)
             .partitionBy(m -> m.key, m -> m.value, "p2")
             .map(m -> m);
-    OutputStream<KV<Object, Object>> output1 = 
streamGraph.getOutputStream("output1");
-    OutputStream<KV<Object, Object>> output2 = 
streamGraph.getOutputStream("output2");
+    OutputStream<KV<Object, Object>> output1 = 
graphSpec.getOutputStream("output1");
+    OutputStream<KV<Object, Object>> output2 = 
graphSpec.getOutputStream("output2");
 
     messageStream1
-        .join(messageStream2, mock(JoinFunction.class),
+        .join(messageStream2,
+            (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, 
KV<Object, Object>>) mock(JoinFunction.class),
             mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofHours(2), "j1")
         .sendTo(output1);
     messageStream3
-        .join(messageStream2, mock(JoinFunction.class),
+        .join(messageStream2,
+            (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, 
KV<Object, Object>>) mock(JoinFunction.class),
             mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofHours(1), "j2")
         .sendTo(output2);
 
-    return streamGraph;
+    return graphSpec;
   }
 
-  private StreamGraphImpl createStreamGraphWithJoinAndWindow() {
+  private StreamGraphSpec createStreamGraphWithJoinAndWindow() {
 
-    StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(runner, config);
     MessageStream<KV<Object, Object>> messageStream1 =
-        streamGraph.<KV<Object, Object>>getInputStream("input1")
+        graphSpec.<KV<Object, Object>>getInputStream("input1")
             .map(m -> m);
     MessageStream<KV<Object, Object>> messageStream2 =
-        streamGraph.<KV<Object, Object>>getInputStream("input2")
+        graphSpec.<KV<Object, Object>>getInputStream("input2")
             .partitionBy(m -> m.key, m -> m.value, "p1")
             .filter(m -> true);
     MessageStream<KV<Object, Object>> messageStream3 =
-        streamGraph.<KV<Object, Object>>getInputStream("input3")
+        graphSpec.<KV<Object, Object>>getInputStream("input3")
             .filter(m -> true)
             .partitionBy(m -> m.key, m -> m.value, "p2")
             .map(m -> m);
-    OutputStream<KV<Object, Object>> output1 = 
streamGraph.getOutputStream("output1");
-    OutputStream<KV<Object, Object>> output2 = 
streamGraph.getOutputStream("output2");
+    OutputStream<KV<Object, Object>> output1 = 
graphSpec.getOutputStream("output1");
+    OutputStream<KV<Object, Object>> output2 = 
graphSpec.getOutputStream("output2");
 
     messageStream1.map(m -> m)
         .filter(m->true)
-        .window(Windows.<KV<Object, Object>, Object>keyedTumblingWindow(m -> 
m, Duration.ofMillis(8),
-            mock(Serde.class), mock(Serde.class)), "w1");
+        .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(8), 
mock(Serde.class), mock(Serde.class)), "w1");
 
     messageStream2.map(m -> m)
         .filter(m->true)
-        .window(Windows.<KV<Object, Object>, Object>keyedTumblingWindow(m -> 
m, Duration.ofMillis(16),
-            mock(Serde.class), mock(Serde.class)), "w2");
+        .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(16), 
mock(Serde.class), mock(Serde.class)), "w2");
 
     messageStream1
-        .join(messageStream2, mock(JoinFunction.class),
+        .join(messageStream2,
+            (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, 
KV<Object, Object>>) mock(JoinFunction.class),
             mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofMillis(1600), "j1")
         .sendTo(output1);
     messageStream3
-        .join(messageStream2, mock(JoinFunction.class),
+        .join(messageStream2,
+            (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, 
KV<Object, Object>>) mock(JoinFunction.class),
             mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofMillis(100), "j2")
         .sendTo(output2);
     messageStream3
-        .join(messageStream2, mock(JoinFunction.class),
+        .join(messageStream2,
+            (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, 
KV<Object, Object>>) mock(JoinFunction.class),
             mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofMillis(252), "j3")
         .sendTo(output2);
 
-    return streamGraph;
+    return graphSpec;
   }
 
   @Before
@@ -252,9 +255,9 @@ public class TestExecutionPlanner {
   @Test
   public void testCreateProcessorGraph() {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
-    StreamGraphImpl streamGraph = createStreamGraphWithJoin();
+    StreamGraphSpec graphSpec = createStreamGraphWithJoin();
 
-    JobGraph jobGraph = planner.createJobGraph(streamGraph);
+    JobGraph jobGraph = 
planner.createJobGraph(graphSpec.getOperatorSpecGraph());
     assertTrue(jobGraph.getSources().size() == 3);
     assertTrue(jobGraph.getSinks().size() == 2);
     assertTrue(jobGraph.getIntermediateStreams().size() == 2); // two streams 
generated by partitionBy
@@ -263,8 +266,8 @@ public class TestExecutionPlanner {
   @Test
   public void testFetchExistingStreamPartitions() {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
-    StreamGraphImpl streamGraph = createStreamGraphWithJoin();
-    JobGraph jobGraph = planner.createJobGraph(streamGraph);
+    StreamGraphSpec graphSpec = createStreamGraphWithJoin();
+    JobGraph jobGraph = 
planner.createJobGraph(graphSpec.getOperatorSpecGraph());
 
     ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager);
     assertTrue(jobGraph.getOrCreateStreamEdge(input1).getPartitionCount() == 
64);
@@ -281,11 +284,11 @@ public class TestExecutionPlanner {
   @Test
   public void testCalculateJoinInputPartitions() {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
-    StreamGraphImpl streamGraph = createStreamGraphWithJoin();
-    JobGraph jobGraph = planner.createJobGraph(streamGraph);
+    StreamGraphSpec graphSpec = createStreamGraphWithJoin();
+    JobGraph jobGraph = 
planner.createJobGraph(graphSpec.getOperatorSpecGraph());
 
     ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager);
-    ExecutionPlanner.calculateJoinInputPartitions(streamGraph, jobGraph);
+    ExecutionPlanner.calculateJoinInputPartitions(jobGraph);
 
     // the partitions should be the same as input1
     jobGraph.getIntermediateStreams().forEach(edge -> {
@@ -300,9 +303,9 @@ public class TestExecutionPlanner {
     Config cfg = new MapConfig(map);
 
     ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
-    StreamGraphImpl streamGraph = createSimpleGraph();
-    JobGraph jobGraph = planner.createJobGraph(streamGraph);
-    planner.calculatePartitions(streamGraph, jobGraph);
+    StreamGraphSpec graphSpec = createSimpleGraph();
+    JobGraph jobGraph = 
planner.createJobGraph(graphSpec.getOperatorSpecGraph());
+    planner.calculatePartitions(jobGraph);
 
     // the partitions should be the same as input1
     jobGraph.getIntermediateStreams().forEach(edge -> {
@@ -317,8 +320,8 @@ public class TestExecutionPlanner {
     Config cfg = new MapConfig(map);
 
     ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
-    StreamGraphImpl streamGraph = createStreamGraphWithJoin();
-    ExecutionPlan plan = planner.plan(streamGraph);
+    StreamGraphSpec graphSpec = createStreamGraphWithJoin();
+    ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph());
     List<JobConfig> jobConfigs = plan.getJobConfigs();
     for (JobConfig config : jobConfigs) {
       System.out.println(config);
@@ -332,8 +335,8 @@ public class TestExecutionPlanner {
     Config cfg = new MapConfig(map);
 
     ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
-    StreamGraphImpl streamGraph = createStreamGraphWithJoinAndWindow();
-    ExecutionPlan plan = planner.plan(streamGraph);
+    StreamGraphSpec graphSpec = createStreamGraphWithJoinAndWindow();
+    ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph());
     List<JobConfig> jobConfigs = plan.getJobConfigs();
     assertEquals(1, jobConfigs.size());
 
@@ -349,8 +352,8 @@ public class TestExecutionPlanner {
     Config cfg = new MapConfig(map);
 
     ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
-    StreamGraphImpl streamGraph = createStreamGraphWithJoinAndWindow();
-    ExecutionPlan plan = planner.plan(streamGraph);
+    StreamGraphSpec graphSpec = createStreamGraphWithJoinAndWindow();
+    ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph());
     List<JobConfig> jobConfigs = plan.getJobConfigs();
     assertEquals(1, jobConfigs.size());
 
@@ -366,8 +369,8 @@ public class TestExecutionPlanner {
     Config cfg = new MapConfig(map);
 
     ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
-    StreamGraphImpl streamGraph = createSimpleGraph();
-    ExecutionPlan plan = planner.plan(streamGraph);
+    StreamGraphSpec graphSpec = createSimpleGraph();
+    ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph());
     List<JobConfig> jobConfigs = plan.getJobConfigs();
     assertEquals(1, jobConfigs.size());
     assertFalse(jobConfigs.get(0).containsKey(TaskConfig.WINDOW_MS()));
@@ -381,8 +384,8 @@ public class TestExecutionPlanner {
     Config cfg = new MapConfig(map);
 
     ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
-    StreamGraphImpl streamGraph = createSimpleGraph();
-    ExecutionPlan plan = planner.plan(streamGraph);
+    StreamGraphSpec graphSpec = createSimpleGraph();
+    ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph());
     List<JobConfig> jobConfigs = plan.getJobConfigs();
     assertEquals(1, jobConfigs.size());
     assertEquals("2000", jobConfigs.get(0).get(TaskConfig.WINDOW_MS()));
@@ -391,8 +394,8 @@ public class TestExecutionPlanner {
   @Test
   public void testCalculateIntStreamPartitions() throws Exception {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
-    StreamGraphImpl streamGraph = createSimpleGraph();
-    JobGraph jobGraph = (JobGraph) planner.plan(streamGraph);
+    StreamGraphSpec graphSpec = createSimpleGraph();
+    JobGraph jobGraph = (JobGraph) 
planner.plan(graphSpec.getOperatorSpecGraph());
 
     // the partitions should be the same as input1
     jobGraph.getIntermediateStreams().forEach(edge -> {
@@ -424,12 +427,12 @@ public class TestExecutionPlanner {
     int partitionLimit = ExecutionPlanner.MAX_INFERRED_PARTITIONS;
 
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
-    StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
+    StreamGraphSpec graphSpec = new StreamGraphSpec(runner, config);
 
-    MessageStream<KV<Object, Object>> input1 = 
streamGraph.getInputStream("input4");
-    OutputStream<KV<Object, Object>> output1 = 
streamGraph.getOutputStream("output1");
+    MessageStream<KV<Object, Object>> input1 = 
graphSpec.getInputStream("input4");
+    OutputStream<KV<Object, Object>> output1 = 
graphSpec.getOutputStream("output1");
     input1.partitionBy(m -> m.key, m -> m.value, "p1").map(kv -> 
kv).sendTo(output1);
-    JobGraph jobGraph = (JobGraph) planner.plan(streamGraph);
+    JobGraph jobGraph = (JobGraph) 
planner.plan(graphSpec.getOperatorSpecGraph());
 
     // the partitions should be the same as input1
     jobGraph.getIntermediateStreams().forEach(edge -> {

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
index bf131ce..359c422 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
@@ -57,16 +57,16 @@ public class TestJobGraph {
    * 2 9 10
    */
   private void createGraph1() {
-    graph1 = new JobGraph(null);
+    graph1 = new JobGraph(null, null);
 
-    JobNode n2 = graph1.getOrCreateJobNode("2", "1", null);
-    JobNode n3 = graph1.getOrCreateJobNode("3", "1", null);
-    JobNode n5 = graph1.getOrCreateJobNode("5", "1", null);
-    JobNode n7 = graph1.getOrCreateJobNode("7", "1", null);
-    JobNode n8 = graph1.getOrCreateJobNode("8", "1", null);
-    JobNode n9 = graph1.getOrCreateJobNode("9", "1", null);
-    JobNode n10 = graph1.getOrCreateJobNode("10", "1", null);
-    JobNode n11 = graph1.getOrCreateJobNode("11", "1", null);
+    JobNode n2 = graph1.getOrCreateJobNode("2", "1");
+    JobNode n3 = graph1.getOrCreateJobNode("3", "1");
+    JobNode n5 = graph1.getOrCreateJobNode("5", "1");
+    JobNode n7 = graph1.getOrCreateJobNode("7", "1");
+    JobNode n8 = graph1.getOrCreateJobNode("8", "1");
+    JobNode n9 = graph1.getOrCreateJobNode("9", "1");
+    JobNode n10 = graph1.getOrCreateJobNode("10", "1");
+    JobNode n11 = graph1.getOrCreateJobNode("11", "1");
 
     graph1.addSource(genStream(), n5);
     graph1.addSource(genStream(), n7);
@@ -90,15 +90,15 @@ public class TestJobGraph {
    *      |<---6 <--|    <>
    */
   private void createGraph2() {
-    graph2 = new JobGraph(null);
+    graph2 = new JobGraph(null, null);
 
-    JobNode n1 = graph2.getOrCreateJobNode("1", "1", null);
-    JobNode n2 = graph2.getOrCreateJobNode("2", "1", null);
-    JobNode n3 = graph2.getOrCreateJobNode("3", "1", null);
-    JobNode n4 = graph2.getOrCreateJobNode("4", "1", null);
-    JobNode n5 = graph2.getOrCreateJobNode("5", "1", null);
-    JobNode n6 = graph2.getOrCreateJobNode("6", "1", null);
-    JobNode n7 = graph2.getOrCreateJobNode("7", "1", null);
+    JobNode n1 = graph2.getOrCreateJobNode("1", "1");
+    JobNode n2 = graph2.getOrCreateJobNode("2", "1");
+    JobNode n3 = graph2.getOrCreateJobNode("3", "1");
+    JobNode n4 = graph2.getOrCreateJobNode("4", "1");
+    JobNode n5 = graph2.getOrCreateJobNode("5", "1");
+    JobNode n6 = graph2.getOrCreateJobNode("6", "1");
+    JobNode n7 = graph2.getOrCreateJobNode("7", "1");
 
     graph2.addSource(genStream(), n1);
     graph2.addIntermediateStream(genStream(), n1, n2);
@@ -117,10 +117,10 @@ public class TestJobGraph {
    * 1<->1 -> 2<->2
    */
   private void createGraph3() {
-    graph3 = new JobGraph(null);
+    graph3 = new JobGraph(null, null);
 
-    JobNode n1 = graph3.getOrCreateJobNode("1", "1", null);
-    JobNode n2 = graph3.getOrCreateJobNode("2", "1", null);
+    JobNode n1 = graph3.getOrCreateJobNode("1", "1");
+    JobNode n2 = graph3.getOrCreateJobNode("2", "1");
 
     graph3.addSource(genStream(), n1);
     graph3.addIntermediateStream(genStream(), n1, n1);
@@ -133,9 +133,9 @@ public class TestJobGraph {
    * 1<->1
    */
   private void createGraph4() {
-    graph4 = new JobGraph(null);
+    graph4 = new JobGraph(null, null);
 
-    JobNode n1 = graph4.getOrCreateJobNode("1", "1", null);
+    JobNode n1 = graph4.getOrCreateJobNode("1", "1");
 
     graph4.addSource(genStream(), n1);
     graph4.addIntermediateStream(genStream(), n1, n1);
@@ -151,7 +151,7 @@ public class TestJobGraph {
 
   @Test
   public void testAddSource() {
-    JobGraph graph = new JobGraph(null);
+    JobGraph graph = new JobGraph(null, null);
 
     /**
      * s1 -> 1
@@ -160,9 +160,9 @@ public class TestJobGraph {
      * s3 -> 2
      *   |-> 3
      */
-    JobNode n1 = graph.getOrCreateJobNode("1", "1", null);
-    JobNode n2 = graph.getOrCreateJobNode("2", "1", null);
-    JobNode n3 = graph.getOrCreateJobNode("3", "1", null);
+    JobNode n1 = graph.getOrCreateJobNode("1", "1");
+    JobNode n2 = graph.getOrCreateJobNode("2", "1");
+    JobNode n3 = graph.getOrCreateJobNode("3", "1");
     StreamSpec s1 = genStream();
     StreamSpec s2 = genStream();
     StreamSpec s3 = genStream();
@@ -173,9 +173,9 @@ public class TestJobGraph {
 
     assertTrue(graph.getSources().size() == 3);
 
-    assertTrue(graph.getOrCreateJobNode("1", "1", null).getInEdges().size() == 
2);
-    assertTrue(graph.getOrCreateJobNode("2", "1", null).getInEdges().size() == 
1);
-    assertTrue(graph.getOrCreateJobNode("3", "1", null).getInEdges().size() == 
1);
+    assertTrue(graph.getOrCreateJobNode("1", "1").getInEdges().size() == 2);
+    assertTrue(graph.getOrCreateJobNode("2", "1").getInEdges().size() == 1);
+    assertTrue(graph.getOrCreateJobNode("3", "1").getInEdges().size() == 1);
 
     assertTrue(graph.getOrCreateStreamEdge(s1).getSourceNodes().size() == 0);
     assertTrue(graph.getOrCreateStreamEdge(s1).getTargetNodes().size() == 1);
@@ -192,9 +192,9 @@ public class TestJobGraph {
      * 2 -> s2
      * 2 -> s3
      */
-    JobGraph graph = new JobGraph(null);
-    JobNode n1 = graph.getOrCreateJobNode("1", "1", null);
-    JobNode n2 = graph.getOrCreateJobNode("2", "1", null);
+    JobGraph graph = new JobGraph(null, null);
+    JobNode n1 = graph.getOrCreateJobNode("1", "1");
+    JobNode n2 = graph.getOrCreateJobNode("2", "1");
     StreamSpec s1 = genStream();
     StreamSpec s2 = genStream();
     StreamSpec s3 = genStream();
@@ -203,8 +203,8 @@ public class TestJobGraph {
     graph.addSink(s3, n2);
 
     assertTrue(graph.getSinks().size() == 3);
-    assertTrue(graph.getOrCreateJobNode("1", "1", null).getOutEdges().size() 
== 1);
-    assertTrue(graph.getOrCreateJobNode("2", "1", null).getOutEdges().size() 
== 2);
+    assertTrue(graph.getOrCreateJobNode("1", "1").getOutEdges().size() == 1);
+    assertTrue(graph.getOrCreateJobNode("2", "1").getOutEdges().size() == 2);
 
     assertTrue(graph.getOrCreateStreamEdge(s1).getSourceNodes().size() == 1);
     assertTrue(graph.getOrCreateStreamEdge(s1).getTargetNodes().size() == 0);

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
index f218e89..abe8969 100644
--- 
a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
+++ 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
@@ -28,7 +28,7 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.runtime.ApplicationRunner;
@@ -114,35 +114,37 @@ public class TestJobGraphJsonGenerator {
     when(systemAdmins.getSystemAdmin("system2")).thenReturn(systemAdmin2);
     StreamManager streamManager = new StreamManager(systemAdmins);
 
-    StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
-    streamGraph.setDefaultSerde(KVSerde.of(new NoOpSerde<>(), new 
NoOpSerde<>()));
+    StreamGraphSpec graphSpec = new StreamGraphSpec(runner, config);
+    graphSpec.setDefaultSerde(KVSerde.of(new NoOpSerde<>(), new 
NoOpSerde<>()));
     MessageStream<KV<Object, Object>> messageStream1 =
-        streamGraph.<KV<Object, Object>>getInputStream("input1")
+        graphSpec.<KV<Object, Object>>getInputStream("input1")
             .map(m -> m);
     MessageStream<KV<Object, Object>> messageStream2 =
-        streamGraph.<KV<Object, Object>>getInputStream("input2")
+        graphSpec.<KV<Object, Object>>getInputStream("input2")
             .partitionBy(m -> m.key, m -> m.value, "p1")
             .filter(m -> true);
     MessageStream<KV<Object, Object>> messageStream3 =
-        streamGraph.<KV<Object, Object>>getInputStream("input3")
+        graphSpec.<KV<Object, Object>>getInputStream("input3")
             .filter(m -> true)
             .partitionBy(m -> m.key, m -> m.value, "p2")
             .map(m -> m);
-    OutputStream<KV<Object, Object>> outputStream1 = 
streamGraph.getOutputStream("output1");
-    OutputStream<KV<Object, Object>> outputStream2 = 
streamGraph.getOutputStream("output2");
+    OutputStream<KV<Object, Object>> outputStream1 = 
graphSpec.getOutputStream("output1");
+    OutputStream<KV<Object, Object>> outputStream2 = 
graphSpec.getOutputStream("output2");
 
     messageStream1
-        .join(messageStream2, mock(JoinFunction.class),
+        .join(messageStream2,
+            (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, 
KV<Object, Object>>) mock(JoinFunction.class),
             mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofHours(2), "j1")
         .sendTo(outputStream1);
     messageStream2.sink((message, collector, coordinator) -> { });
     messageStream3
-        .join(messageStream2, mock(JoinFunction.class),
+        .join(messageStream2,
+            (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, 
KV<Object, Object>>) mock(JoinFunction.class),
             mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofHours(1), "j2")
         .sendTo(outputStream2);
 
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
-    ExecutionPlan plan = planner.plan(streamGraph);
+    ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph());
     String json = plan.getPlanAsJson();
     System.out.println(json);
 
@@ -187,8 +189,8 @@ public class TestJobGraphJsonGenerator {
     when(systemAdmins.getSystemAdmin("kafka")).thenReturn(systemAdmin2);
     StreamManager streamManager = new StreamManager(systemAdmins);
 
-    StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
-    MessageStream<KV<String, PageViewEvent>> inputStream = 
streamGraph.getInputStream("PageView");
+    StreamGraphSpec graphSpec = new StreamGraphSpec(runner, config);
+    MessageStream<KV<String, PageViewEvent>> inputStream = 
graphSpec.getInputStream("PageView");
     inputStream
         .partitionBy(kv -> kv.getValue().getCountry(), kv -> kv.getValue(), 
"keyed-by-country")
         .window(Windows.keyedTumblingWindow(kv -> kv.getValue().getCountry(),
@@ -198,10 +200,10 @@ public class TestJobGraphJsonGenerator {
             new StringSerde(),
             new LongSerde()), "count-by-country")
         .map(pane -> new KV<>(pane.getKey().getKey(), pane.getMessage()))
-        .sendTo(streamGraph.getOutputStream("PageViewCount"));
+        .sendTo(graphSpec.getOutputStream("PageViewCount"));
 
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
-    ExecutionPlan plan = planner.plan(streamGraph);
+    ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph());
     String json = plan.getPlanAsJson();
     System.out.println(json);
 

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
index 53e8bf6..c43e242 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
@@ -25,8 +25,8 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.SerializerConfig;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.impl.store.TimestampedValueSerde;
 import org.apache.samza.runtime.ApplicationRunner;
@@ -71,11 +71,11 @@ public class TestJobNode {
     when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("jobName");
     when(mockConfig.get(eq(JobConfig.JOB_ID()), 
anyString())).thenReturn("jobId");
 
-    StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mockConfig);
-    streamGraph.setDefaultSerde(KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>()));
-    MessageStream<KV<String, Object>> input1 = 
streamGraph.getInputStream("input1");
-    MessageStream<KV<String, Object>> input2 = 
streamGraph.getInputStream("input2");
-    OutputStream<KV<String, Object>> output = 
streamGraph.getOutputStream("output");
+    StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+    graphSpec.setDefaultSerde(KVSerde.of(new StringSerde(), new 
JsonSerdeV2<>()));
+    MessageStream<KV<String, Object>> input1 = 
graphSpec.getInputStream("input1");
+    MessageStream<KV<String, Object>> input2 = 
graphSpec.getInputStream("input2");
+    OutputStream<KV<String, Object>> output = 
graphSpec.getOutputStream("output");
     JoinFunction<String, Object, Object, KV<String, Object>> mockJoinFn = 
mock(JoinFunction.class);
     input1
         .partitionBy(KV::getKey, KV::getValue, "p1").map(kv -> kv.value)
@@ -84,7 +84,7 @@ public class TestJobNode {
             Duration.ofHours(1), "j1")
         .sendTo(output);
 
-    JobNode jobNode = new JobNode("jobName", "jobId", streamGraph, mockConfig);
+    JobNode jobNode = new JobNode("jobName", "jobId", 
graphSpec.getOperatorSpecGraph(), mockConfig);
     Config config = new MapConfig();
     StreamEdge input1Edge = new StreamEdge(input1Spec, config);
     StreamEdge input2Edge = new StreamEdge(input2Spec, config);

Reply via email to