http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java 
b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
index 0759aba..6fa9ed1 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
@@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableSet;
 
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.TaskContextImpl;
@@ -72,8 +73,8 @@ public class TestJoinOperator {
 
   @Test
   public void join() throws Exception {
-    StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new 
TestJoinFunction());
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
graphSpec);
+    StreamApplicationDescriptorImpl streamAppDesc = 
this.getTestJoinStreamGraph(new TestJoinFunction());
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
streamAppDesc);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -93,25 +94,26 @@ public class TestJoinOperator {
     mapConfig.put("job.id", "jobId");
     StreamTestUtils.addStreamConfigs(mapConfig, "inStream", "insystem", 
"instream");
     Config config = new MapConfig(mapConfig);
-    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
 
-    IntegerSerde integerSerde = new IntegerSerde();
-    KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, integerSerde);
-    GenericSystemDescriptor sd = new GenericSystemDescriptor("insystem", 
"mockFactoryClassName");
-    GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = 
sd.getInputDescriptor("inStream", kvSerde);
+    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(appDesc -> {
+        IntegerSerde integerSerde = new IntegerSerde();
+        KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, 
integerSerde);
+        GenericSystemDescriptor sd = new GenericSystemDescriptor("insystem", 
"mockFactoryClassName");
+        GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = 
sd.getInputDescriptor("inStream", kvSerde);
 
-    MessageStream<KV<Integer, Integer>> inStream = 
graphSpec.getInputStream(inputDescriptor);
+        MessageStream<KV<Integer, Integer>> inStream = 
appDesc.getInputStream(inputDescriptor);
 
-    inStream.join(inStream, new TestJoinFunction(), integerSerde, kvSerde, 
kvSerde, JOIN_TTL, "join");
+        inStream.join(inStream, new TestJoinFunction(), integerSerde, kvSerde, 
kvSerde, JOIN_TTL, "join");
+      }, config);
 
-    createStreamOperatorTask(new SystemClock(), graphSpec); // should throw an 
exception
+    createStreamOperatorTask(new SystemClock(), streamAppDesc); // should 
throw an exception
   }
 
   @Test
   public void joinFnInitAndClose() throws Exception {
     TestJoinFunction joinFn = new TestJoinFunction();
-    StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(joinFn);
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
graphSpec);
+    StreamApplicationDescriptorImpl streamAppDesc = 
this.getTestJoinStreamGraph(joinFn);
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
streamAppDesc);
 
     MessageCollector messageCollector = mock(MessageCollector.class);
 
@@ -129,8 +131,8 @@ public class TestJoinOperator {
 
   @Test
   public void joinReverse() throws Exception {
-    StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new 
TestJoinFunction());
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
graphSpec);
+    StreamApplicationDescriptorImpl streamAppDesc = 
this.getTestJoinStreamGraph(new TestJoinFunction());
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
streamAppDesc);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -145,8 +147,8 @@ public class TestJoinOperator {
 
   @Test
   public void joinNoMatch() throws Exception {
-    StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new 
TestJoinFunction());
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
graphSpec);
+    StreamApplicationDescriptorImpl streamAppDesc = 
this.getTestJoinStreamGraph(new TestJoinFunction());
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
streamAppDesc);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -160,8 +162,8 @@ public class TestJoinOperator {
 
   @Test
   public void joinNoMatchReverse() throws Exception {
-    StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new 
TestJoinFunction());
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
graphSpec);
+    StreamApplicationDescriptorImpl streamAppDesc = 
this.getTestJoinStreamGraph(new TestJoinFunction());
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
streamAppDesc);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -175,8 +177,8 @@ public class TestJoinOperator {
 
   @Test
   public void joinRetainsLatestMessageForKey() throws Exception {
-    StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new 
TestJoinFunction());
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
graphSpec);
+    StreamApplicationDescriptorImpl streamAppDesc = 
this.getTestJoinStreamGraph(new TestJoinFunction());
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
streamAppDesc);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -193,8 +195,8 @@ public class TestJoinOperator {
 
   @Test
   public void joinRetainsLatestMessageForKeyReverse() throws Exception {
-    StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new 
TestJoinFunction());
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
graphSpec);
+    StreamApplicationDescriptorImpl streamAppDesc = 
this.getTestJoinStreamGraph(new TestJoinFunction());
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
streamAppDesc);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -211,8 +213,8 @@ public class TestJoinOperator {
 
   @Test
   public void joinRetainsMatchedMessages() throws Exception {
-    StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new 
TestJoinFunction());
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
graphSpec);
+    StreamApplicationDescriptorImpl streamAppDesc = 
this.getTestJoinStreamGraph(new TestJoinFunction());
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
streamAppDesc);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -234,8 +236,8 @@ public class TestJoinOperator {
 
   @Test
   public void joinRetainsMatchedMessagesReverse() throws Exception {
-    StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new 
TestJoinFunction());
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
graphSpec);
+    StreamApplicationDescriptorImpl streamAppDesc = 
this.getTestJoinStreamGraph(new TestJoinFunction());
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
streamAppDesc);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -258,8 +260,8 @@ public class TestJoinOperator {
   @Test
   public void joinRemovesExpiredMessages() throws Exception {
     TestClock testClock = new TestClock();
-    StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new 
TestJoinFunction());
-    StreamOperatorTask sot = createStreamOperatorTask(testClock, graphSpec);
+    StreamApplicationDescriptorImpl streamAppDesc = 
this.getTestJoinStreamGraph(new TestJoinFunction());
+    StreamOperatorTask sot = createStreamOperatorTask(testClock, 
streamAppDesc);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -278,8 +280,8 @@ public class TestJoinOperator {
   @Test
   public void joinRemovesExpiredMessagesReverse() throws Exception {
     TestClock testClock = new TestClock();
-    StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new 
TestJoinFunction());
-    StreamOperatorTask sot = createStreamOperatorTask(testClock, graphSpec);
+    StreamApplicationDescriptorImpl streamAppDesc = 
this.getTestJoinStreamGraph(new TestJoinFunction());
+    StreamOperatorTask sot = createStreamOperatorTask(testClock, 
streamAppDesc);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -295,7 +297,8 @@ public class TestJoinOperator {
     assertTrue(output.isEmpty());
   }
 
-  private StreamOperatorTask createStreamOperatorTask(Clock clock, 
StreamGraphSpec graphSpec) throws Exception {
+  private StreamOperatorTask createStreamOperatorTask(Clock clock, 
StreamApplicationDescriptorImpl graphSpec)
+      throws Exception {
     Map<String, String> mapConfig = new HashMap<>();
     mapConfig.put("job.name", "jobName");
     mapConfig.put("job.id", "jobId");
@@ -320,31 +323,31 @@ public class TestJoinOperator {
     return sot;
   }
 
-  private StreamGraphSpec getTestJoinStreamGraph(TestJoinFunction joinFn) 
throws IOException {
+  private StreamApplicationDescriptorImpl 
getTestJoinStreamGraph(TestJoinFunction joinFn) throws IOException {
     Map<String, String> mapConfig = new HashMap<>();
     mapConfig.put("job.name", "jobName");
     mapConfig.put("job.id", "jobId");
     StreamTestUtils.addStreamConfigs(mapConfig, "inStream", "insystem", 
"instream");
     StreamTestUtils.addStreamConfigs(mapConfig, "inStream2", "insystem", 
"instream2");
     Config config = new MapConfig(mapConfig);
-    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
-    IntegerSerde integerSerde = new IntegerSerde();
-    KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, integerSerde);
-    GenericSystemDescriptor sd = new GenericSystemDescriptor("insystem", 
"mockFactoryClassName");
-    GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor1 = 
sd.getInputDescriptor("inStream", kvSerde);
-    GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor2 = 
sd.getInputDescriptor("inStream2", kvSerde);
-
-    MessageStream<KV<Integer, Integer>> inStream = 
graphSpec.getInputStream(inputDescriptor1);
-    MessageStream<KV<Integer, Integer>> inStream2 = 
graphSpec.getInputStream(inputDescriptor2);
-
-    inStream
-        .join(inStream2, joinFn, integerSerde, kvSerde, kvSerde, JOIN_TTL, 
"j1")
-        .sink((message, messageCollector, taskCoordinator) -> {
-            SystemStream outputSystemStream = new SystemStream("outputSystem", 
"outputStream");
-            messageCollector.send(new 
OutgoingMessageEnvelope(outputSystemStream, message));
-          });
-
-    return graphSpec;
+
+    return new StreamApplicationDescriptorImpl(appDesc -> {
+        IntegerSerde integerSerde = new IntegerSerde();
+        KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, 
integerSerde);
+        GenericSystemDescriptor sd = new GenericSystemDescriptor("insystem", 
"mockFactoryClassName");
+        GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor1 = 
sd.getInputDescriptor("inStream", kvSerde);
+        GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor2 = 
sd.getInputDescriptor("inStream2", kvSerde);
+
+        MessageStream<KV<Integer, Integer>> inStream = 
appDesc.getInputStream(inputDescriptor1);
+        MessageStream<KV<Integer, Integer>> inStream2 = 
appDesc.getInputStream(inputDescriptor2);
+
+        inStream
+            .join(inStream2, joinFn, integerSerde, kvSerde, kvSerde, JOIN_TTL, 
"j1")
+            .sink((message, messageCollector, taskCoordinator) -> {
+                SystemStream outputSystemStream = new 
SystemStream("outputSystem", "outputStream");
+                messageCollector.send(new 
OutgoingMessageEnvelope(outputSystemStream, message));
+              });
+      }, config);
   }
 
   private static class TestJoinFunction

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
 
b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
index 001ffda..566079b 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
@@ -24,6 +24,7 @@ import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
 import org.apache.samza.operators.data.TestMessageEnvelope;
 import org.apache.samza.operators.data.TestOutputMessageEnvelope;
 import org.apache.samza.operators.functions.FilterFunction;
@@ -70,7 +71,7 @@ public class TestMessageStreamImpl {
 
   @Test
   public void testMap() {
-    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+    StreamApplicationDescriptorImpl mockGraph = 
mock(StreamApplicationDescriptorImpl.class);
     OperatorSpec mockOpSpec = mock(OperatorSpec.class);
     MessageStreamImpl<TestMessageEnvelope> inputStream = new 
MessageStreamImpl<>(mockGraph, mockOpSpec);
 
@@ -95,7 +96,7 @@ public class TestMessageStreamImpl {
 
   @Test
   public void testFlatMap() {
-    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+    StreamApplicationDescriptorImpl mockGraph = 
mock(StreamApplicationDescriptorImpl.class);
     OperatorSpec mockOpSpec = mock(OperatorSpec.class);
     MessageStreamImpl<TestMessageEnvelope> inputStream = new 
MessageStreamImpl<>(mockGraph, mockOpSpec);
 
@@ -112,7 +113,7 @@ public class TestMessageStreamImpl {
 
   @Test
   public void testFlatMapWithRelaxedTypes() {
-    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+    StreamApplicationDescriptorImpl mockGraph = 
mock(StreamApplicationDescriptorImpl.class);
     OperatorSpec mockOpSpec = mock(OperatorSpec.class);
     MessageStreamImpl<TestInputMessageEnvelope> inputStream = new 
MessageStreamImpl<>(mockGraph, mockOpSpec);
 
@@ -132,7 +133,7 @@ public class TestMessageStreamImpl {
 
   @Test
   public void testFilter() {
-    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+    StreamApplicationDescriptorImpl mockGraph = 
mock(StreamApplicationDescriptorImpl.class);
     OperatorSpec mockOpSpec = mock(OperatorSpec.class);
     MessageStreamImpl<TestMessageEnvelope> inputStream = new 
MessageStreamImpl<>(mockGraph, mockOpSpec);
 
@@ -157,7 +158,7 @@ public class TestMessageStreamImpl {
 
   @Test
   public void testSink() {
-    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+    StreamApplicationDescriptorImpl mockGraph = 
mock(StreamApplicationDescriptorImpl.class);
     OperatorSpec mockOpSpec = mock(OperatorSpec.class);
     MessageStreamImpl<TestMessageEnvelope> inputStream = new 
MessageStreamImpl<>(mockGraph, mockOpSpec);
 
@@ -174,7 +175,7 @@ public class TestMessageStreamImpl {
 
   @Test
   public void testSendTo() {
-    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+    StreamApplicationDescriptorImpl mockGraph = 
mock(StreamApplicationDescriptorImpl.class);
     OperatorSpec mockOpSpec = mock(OperatorSpec.class);
     MessageStreamImpl<TestMessageEnvelope> inputStream = new 
MessageStreamImpl<>(mockGraph, mockOpSpec);
     OutputStreamImpl<TestMessageEnvelope> mockOutputStreamImpl = 
mock(OutputStreamImpl.class);
@@ -200,7 +201,7 @@ public class TestMessageStreamImpl {
 
   @Test
   public void testPartitionBy() throws IOException {
-    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+    StreamApplicationDescriptorImpl mockGraph = 
mock(StreamApplicationDescriptorImpl.class);
     OperatorSpec mockOpSpec = mock(OperatorSpec.class);
     String mockOpName = "mockName";
     when(mockGraph.getNextOpId(anyObject(), 
anyObject())).thenReturn(mockOpName);
@@ -231,7 +232,7 @@ public class TestMessageStreamImpl {
 
   @Test
   public void testRepartitionWithoutSerde() {
-    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+    StreamApplicationDescriptorImpl mockGraph = 
mock(StreamApplicationDescriptorImpl.class);
     OperatorSpec mockOpSpec = mock(OperatorSpec.class);
     String mockOpName = "mockName";
     when(mockGraph.getNextOpId(anyObject(), 
anyObject())).thenReturn(mockOpName);
@@ -261,7 +262,7 @@ public class TestMessageStreamImpl {
 
   @Test
   public void testWindowWithRelaxedTypes() throws Exception {
-    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+    StreamApplicationDescriptorImpl mockGraph = 
mock(StreamApplicationDescriptorImpl.class);
     OperatorSpec mockOpSpec = mock(OperatorSpec.class);
     MessageStream<TestInputMessageEnvelope> inputStream = new 
MessageStreamImpl<>(mockGraph, mockOpSpec);
 
@@ -285,7 +286,7 @@ public class TestMessageStreamImpl {
 
   @Test
   public void testJoin() {
-    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+    StreamApplicationDescriptorImpl mockGraph = 
mock(StreamApplicationDescriptorImpl.class);
     OperatorSpec leftInputOpSpec = mock(OperatorSpec.class);
     MessageStreamImpl<TestMessageEnvelope> source1 = new 
MessageStreamImpl<>(mockGraph, leftInputOpSpec);
     OperatorSpec rightInputOpSpec = mock(OperatorSpec.class);
@@ -317,7 +318,7 @@ public class TestMessageStreamImpl {
 
   @Test
   public void testSendToTable() {
-    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+    StreamApplicationDescriptorImpl mockGraph = 
mock(StreamApplicationDescriptorImpl.class);
     OperatorSpec inputOpSpec = mock(OperatorSpec.class);
     MessageStreamImpl<TestMessageEnvelope> source = new 
MessageStreamImpl<>(mockGraph, inputOpSpec);
 
@@ -339,7 +340,7 @@ public class TestMessageStreamImpl {
 
   @Test
   public void testStreamTableJoin() {
-    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+    StreamApplicationDescriptorImpl mockGraph = 
mock(StreamApplicationDescriptorImpl.class);
     OperatorSpec leftInputOpSpec = mock(OperatorSpec.class);
     MessageStreamImpl<KV<String, TestMessageEnvelope>> source1 = new 
MessageStreamImpl<>(mockGraph, leftInputOpSpec);
     OperatorSpec rightInputOpSpec = mock(OperatorSpec.class);
@@ -367,7 +368,7 @@ public class TestMessageStreamImpl {
 
   @Test
   public void testMerge() {
-    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+    StreamApplicationDescriptorImpl mockGraph = 
mock(StreamApplicationDescriptorImpl.class);
     OperatorSpec mockOpSpec1 = mock(OperatorSpec.class);
     MessageStream<TestMessageEnvelope> inputStream = new 
MessageStreamImpl<>(mockGraph, mockOpSpec1);
 
@@ -407,7 +408,7 @@ public class TestMessageStreamImpl {
 
   @Test
   public void testMergeWithRelaxedTypes() {
-    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
+    StreamApplicationDescriptorImpl mockGraph = 
mock(StreamApplicationDescriptorImpl.class);
     MessageStream<TestMessageEnvelope> inputStream = new 
MessageStreamImpl<>(mockGraph, mock(OperatorSpec.class));
 
     // other streams have the same message type T as input stream message type 
M

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
 
b/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
index 6469326..a5b15b8 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
@@ -27,6 +27,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
 import org.apache.samza.SamzaException;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
 import org.apache.samza.operators.functions.TimerFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.operators.spec.InputOperatorSpec;
@@ -57,14 +58,14 @@ import static org.mockito.Mockito.*;
 @PrepareForTest(OperatorSpec.class)
 public class TestOperatorSpecGraph {
 
-  private StreamGraphSpec mockGraph;
+  private StreamApplicationDescriptorImpl mockAppDesc;
   private Map<String, InputOperatorSpec> inputOpSpecMap;
   private Map<String, OutputStreamImpl> outputStrmMap;
   private Set<OperatorSpec> allOpSpecs;
 
   @Before
   public void setUp() {
-    this.mockGraph = mock(StreamGraphSpec.class);
+    this.mockAppDesc = mock(StreamApplicationDescriptorImpl.class);
 
     /**
      * Setup two linear transformation pipelines:
@@ -91,8 +92,8 @@ public class TestOperatorSpecGraph {
     inputOpSpecMap.put(streamId2, testInput2);
     this.outputStrmMap = new LinkedHashMap<>();
     outputStrmMap.put(outputStreamId, outputStream1);
-    
when(mockGraph.getInputOperators()).thenReturn(Collections.unmodifiableMap(inputOpSpecMap));
-    
when(mockGraph.getOutputStreams()).thenReturn(Collections.unmodifiableMap(outputStrmMap));
+    
when(mockAppDesc.getInputOperators()).thenReturn(Collections.unmodifiableMap(inputOpSpecMap));
+    
when(mockAppDesc.getOutputStreams()).thenReturn(Collections.unmodifiableMap(outputStrmMap));
     this.allOpSpecs = new HashSet<OperatorSpec>() { {
         this.add(testInput);
         this.add(filterOp);
@@ -105,7 +106,7 @@ public class TestOperatorSpecGraph {
 
   @After
   public void tearDown() {
-    this.mockGraph = null;
+    this.mockAppDesc = null;
     this.inputOpSpecMap = null;
     this.outputStrmMap = null;
     this.allOpSpecs = null;
@@ -113,7 +114,7 @@ public class TestOperatorSpecGraph {
 
   @Test
   public void testConstructor() {
-    OperatorSpecGraph specGraph = new OperatorSpecGraph(mockGraph);
+    OperatorSpecGraph specGraph = new OperatorSpecGraph(mockAppDesc);
     assertEquals(specGraph.getInputOperators(), inputOpSpecMap);
     assertEquals(specGraph.getOutputStreams(), outputStrmMap);
     assertTrue(specGraph.getTables().isEmpty());
@@ -123,7 +124,7 @@ public class TestOperatorSpecGraph {
 
   @Test
   public void testClone() {
-    OperatorSpecGraph operatorSpecGraph = new OperatorSpecGraph(mockGraph);
+    OperatorSpecGraph operatorSpecGraph = new OperatorSpecGraph(mockAppDesc);
     OperatorSpecGraph clonedSpecGraph = operatorSpecGraph.clone();
     OperatorSpecTestUtils.assertClonedGraph(operatorSpecGraph, 
clonedSpecGraph);
   }
@@ -137,7 +138,7 @@ public class TestOperatorSpecGraph {
 
     //failed with serialization error
     try {
-      new OperatorSpecGraph(mockGraph);
+      new OperatorSpecGraph(mockAppDesc);
       fail("Should have failed with serialization error");
     } catch (SamzaException se) {
       throw se.getCause();
@@ -150,7 +151,7 @@ public class TestOperatorSpecGraph {
     this.allOpSpecs.add(testOp);
     
inputOpSpecMap.values().stream().findFirst().get().registerNextOperatorSpec(testOp);
 
-    OperatorSpecGraph operatorSpecGraph = new OperatorSpecGraph(mockGraph);
+    OperatorSpecGraph operatorSpecGraph = new OperatorSpecGraph(mockAppDesc);
     //failed with serialization error
     try {
       operatorSpecGraph.clone();

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java 
b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java
deleted file mode 100644
index 9629efa..0000000
--- 
a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java
+++ /dev/null
@@ -1,506 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for THE
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import com.google.common.collect.ImmutableList;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.operators.data.TestMessageEnvelope;
-import org.apache.samza.operators.descriptors.GenericInputDescriptor;
-import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
-import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
-import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
-import 
org.apache.samza.operators.descriptors.base.system.ExpandingInputDescriptorProvider;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import 
org.apache.samza.operators.descriptors.base.system.TransformingInputDescriptorProvider;
-import org.apache.samza.operators.functions.InputTransformer;
-import org.apache.samza.operators.functions.StreamExpander;
-import org.apache.samza.operators.spec.InputOperatorSpec;
-import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.OperatorSpec.OpCode;
-import org.apache.samza.operators.spec.OutputStreamImpl;
-import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
-import org.apache.samza.serializers.IntegerSerde;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.table.TableSpec;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-@SuppressWarnings("unchecked")
-public class TestStreamGraphSpec {
-
-  @Test
-  public void testGetInputStreamWithValueSerde() {
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-
-    String streamId = "test-stream-1";
-    Serde mockValueSerde = mock(Serde.class);
-    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", 
"mockSystemFactoryClass");
-    GenericInputDescriptor isd = sd.getInputDescriptor(streamId, 
mockValueSerde);
-    MessageStream<TestMessageEnvelope> inputStream = 
graphSpec.getInputStream(isd);
-
-    InputOperatorSpec inputOpSpec = (InputOperatorSpec) ((MessageStreamImpl) 
inputStream).getOperatorSpec();
-    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
-    assertEquals(graphSpec.getInputOperators().get(streamId), inputOpSpec);
-    assertEquals(streamId, inputOpSpec.getStreamId());
-    assertEquals(isd, graphSpec.getInputDescriptors().get(streamId));
-    assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
-    assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
-  }
-
-  @Test
-  public void testGetInputStreamWithKeyValueSerde() {
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-
-    String streamId = "test-stream-1";
-    KVSerde mockKVSerde = mock(KVSerde.class);
-    Serde mockKeySerde = mock(Serde.class);
-    Serde mockValueSerde = mock(Serde.class);
-    doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
-    doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
-    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", 
"mockSystemFactoryClass");
-    GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mockKVSerde);
-    MessageStream<TestMessageEnvelope> inputStream = 
graphSpec.getInputStream(isd);
-
-    InputOperatorSpec inputOpSpec = (InputOperatorSpec) ((MessageStreamImpl) 
inputStream).getOperatorSpec();
-    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
-    assertEquals(graphSpec.getInputOperators().get(streamId), inputOpSpec);
-    assertEquals(streamId, inputOpSpec.getStreamId());
-    assertEquals(isd, graphSpec.getInputDescriptors().get(streamId));
-    assertEquals(mockKeySerde, inputOpSpec.getKeySerde());
-    assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testGetInputStreamWithNullSerde() {
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", 
"mockSystemFactoryClass");
-    GenericInputDescriptor isd = sd.getInputDescriptor("mockStreamId", null);
-    graphSpec.getInputStream(isd);
-  }
-
-  @Test
-  public void testGetInputStreamWithTransformFunction() {
-    String streamId = "test-stream-1";
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-
-    Serde mockValueSerde = mock(Serde.class);
-    InputTransformer transformer = ime -> ime;
-    MockTransformingSystemDescriptor sd = new 
MockTransformingSystemDescriptor("mockSystem", transformer);
-    MockInputDescriptor isd = sd.getInputDescriptor(streamId, mockValueSerde);
-    MessageStream inputStream = graphSpec.getInputStream(isd);
-
-    InputOperatorSpec inputOpSpec = (InputOperatorSpec) ((MessageStreamImpl) 
inputStream).getOperatorSpec();
-    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
-    assertEquals(graphSpec.getInputOperators().get(streamId), inputOpSpec);
-    assertEquals(streamId, inputOpSpec.getStreamId());
-    assertEquals(isd, graphSpec.getInputDescriptors().get(streamId));
-    assertEquals(transformer, inputOpSpec.getTransformer());
-  }
-
-  @Test
-  public void testGetInputStreamWithExpandingSystem() {
-    String streamId = "test-stream-1";
-    String expandedStreamId = "expanded-stream";
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-    AtomicInteger expandCallCount = new AtomicInteger();
-    StreamExpander expander = (sg, isd) -> {
-      expandCallCount.incrementAndGet();
-      InputDescriptor expandedISD =
-          new GenericSystemDescriptor("expanded-system", "mockFactoryClass")
-              .getInputDescriptor(expandedStreamId, new IntegerSerde());
-
-      return sg.getInputStream(expandedISD);
-    };
-    MockExpandingSystemDescriptor sd = new 
MockExpandingSystemDescriptor("mock-system", expander);
-    MockInputDescriptor isd = sd.getInputDescriptor(streamId, new 
IntegerSerde());
-    MessageStream inputStream = graphSpec.getInputStream(isd);
-    InputOperatorSpec inputOpSpec = (InputOperatorSpec) ((MessageStreamImpl) 
inputStream).getOperatorSpec();
-    assertEquals(1, expandCallCount.get());
-    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
-    assertEquals(inputOpSpec, 
graphSpec.getInputOperators().get(expandedStreamId));
-    assertFalse(graphSpec.getInputOperators().containsKey(streamId));
-    assertFalse(graphSpec.getInputDescriptors().containsKey(streamId));
-    assertTrue(graphSpec.getInputDescriptors().containsKey(expandedStreamId));
-    assertEquals(expandedStreamId, inputOpSpec.getStreamId());
-  }
-
-  @Test
-  public void testMultipleGetInputStreams() {
-    String streamId1 = "test-stream-1";
-    String streamId2 = "test-stream-2";
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", 
"mockSystemFactoryClass");
-    GenericInputDescriptor isd1 = sd.getInputDescriptor(streamId1, 
mock(Serde.class));
-    GenericInputDescriptor isd2 = sd.getInputDescriptor(streamId2, 
mock(Serde.class));
-    MessageStream<Object> inputStream1 = graphSpec.getInputStream(isd1);
-    MessageStream<Object> inputStream2 = graphSpec.getInputStream(isd2);
-
-    InputOperatorSpec inputOpSpec1 =
-        (InputOperatorSpec) ((MessageStreamImpl<Object>) 
inputStream1).getOperatorSpec();
-    InputOperatorSpec inputOpSpec2 =
-        (InputOperatorSpec) ((MessageStreamImpl<Object>) 
inputStream2).getOperatorSpec();
-
-    assertEquals(graphSpec.getInputOperators().size(), 2);
-    assertEquals(graphSpec.getInputOperators().get(streamId1), inputOpSpec1);
-    assertEquals(graphSpec.getInputOperators().get(streamId2), inputOpSpec2);
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testGetSameInputStreamTwice() {
-    String streamId = "test-stream-1";
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", 
"mockSystemFactoryClass");
-    GenericInputDescriptor isd1 = sd.getInputDescriptor(streamId, 
mock(Serde.class));
-    GenericInputDescriptor isd2 = sd.getInputDescriptor(streamId, 
mock(Serde.class));
-    graphSpec.getInputStream(isd1);
-    // should throw exception
-    graphSpec.getInputStream(isd2);
-  }
-
-  @Test
-  public void testMultipleSystemDescriptorForSameSystemName() {
-    String streamId = "test-stream-1";
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-    GenericSystemDescriptor sd1 = new GenericSystemDescriptor("mockSystem", 
"mockSystemFactoryClass");
-    GenericSystemDescriptor sd2 = new GenericSystemDescriptor("mockSystem", 
"mockSystemFactoryClass");
-    GenericInputDescriptor isd1 = sd1.getInputDescriptor(streamId, 
mock(Serde.class));
-    GenericInputDescriptor isd2 = sd2.getInputDescriptor(streamId, 
mock(Serde.class));
-    GenericOutputDescriptor osd1 = sd2.getOutputDescriptor(streamId, 
mock(Serde.class));
-
-    graphSpec.getInputStream(isd1);
-    boolean passed = false;
-    try {
-      graphSpec.getInputStream(isd2);
-      passed = true;
-    } catch (IllegalStateException e) { }
-
-    try {
-      graphSpec.getOutputStream(osd1);
-      passed = true;
-    } catch (IllegalStateException e) { }
-
-    try {
-      graphSpec.setDefaultSystem(sd2);
-      passed = true;
-    } catch (IllegalStateException e) { }
-
-    assertFalse(passed);
-  }
-
-  @Test
-  public void testGetOutputStreamWithValueSerde() {
-    String streamId = "test-stream-1";
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-
-    Serde mockValueSerde = mock(Serde.class);
-    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", 
"mockSystemFactoryClass");
-    GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, 
mockValueSerde);
-    OutputStream<TestMessageEnvelope> outputStream = 
graphSpec.getOutputStream(osd);
-
-    OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = 
(OutputStreamImpl) outputStream;
-    assertEquals(graphSpec.getOutputStreams().get(streamId), outputStreamImpl);
-    assertEquals(streamId, outputStreamImpl.getStreamId());
-    assertEquals(osd, graphSpec.getOutputDescriptors().get(streamId));
-    assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
-    assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
-  }
-
-  @Test
-  public void testGetOutputStreamWithKeyValueSerde() {
-    String streamId = "test-stream-1";
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-    KVSerde mockKVSerde = mock(KVSerde.class);
-    Serde mockKeySerde = mock(Serde.class);
-    Serde mockValueSerde = mock(Serde.class);
-    doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
-    doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
-    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", 
"mockSystemFactoryClass");
-    GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, 
mockKVSerde);
-    OutputStream<TestMessageEnvelope> outputStream = 
graphSpec.getOutputStream(osd);
-
-    OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = 
(OutputStreamImpl) outputStream;
-    assertEquals(graphSpec.getOutputStreams().get(streamId), outputStreamImpl);
-    assertEquals(streamId, outputStreamImpl.getStreamId());
-    assertEquals(osd, graphSpec.getOutputDescriptors().get(streamId));
-    assertEquals(mockKeySerde, outputStreamImpl.getKeySerde());
-    assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testGetOutputStreamWithNullSerde() {
-    String streamId = "test-stream-1";
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", 
"mockSystemFactoryClass");
-    GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, null);
-    graphSpec.getOutputStream(osd);
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testSetDefaultSystemDescriptorAfterGettingInputStream() {
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-    GenericInputDescriptor id = new GenericSystemDescriptor("system", 
"factory.class.name")
-        .getInputDescriptor("input-stream", mock(Serde.class));
-    graphSpec.getInputStream(id);
-    graphSpec.setDefaultSystem(new GenericSystemDescriptor("mockSystem", 
"mockFactory")); // should throw exception
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testSetDefaultSystemDescriptorAfterGettingOutputStream() {
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-    GenericOutputDescriptor od = new GenericSystemDescriptor("system", 
"factory.class.name")
-        .getOutputDescriptor("output-stream", mock(Serde.class));
-    graphSpec.getOutputStream(od);
-    graphSpec.setDefaultSystem(new GenericSystemDescriptor("mockSystem", 
"mockFactory")); // should throw exception
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testSetDefaultSerdeAfterGettingIntermediateStream() {
-    String streamId = "test-stream-1";
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-    graphSpec.getIntermediateStream(streamId, mock(Serde.class), false);
-    graphSpec.setDefaultSystem(new GenericSystemDescriptor("mockSystem", 
"mockFactory")); // should throw exception
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testGetSameOutputStreamTwice() {
-    String streamId = "test-stream-1";
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", 
"mockSystemFactoryClass");
-    GenericOutputDescriptor osd1 = sd.getOutputDescriptor(streamId, 
mock(Serde.class));
-    GenericOutputDescriptor osd2 = sd.getOutputDescriptor(streamId, 
mock(Serde.class));
-    graphSpec.getOutputStream(osd1);
-    graphSpec.getOutputStream(osd2); // should throw exception
-  }
-
-  @Test
-  public void testGetIntermediateStreamWithValueSerde() {
-    String streamId = "stream-1";
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-
-    Serde mockValueSerde = mock(Serde.class);
-    IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
-        graphSpec.getIntermediateStream(streamId, mockValueSerde, false);
-
-    assertEquals(graphSpec.getInputOperators().get(streamId), 
intermediateStreamImpl.getOperatorSpec());
-    assertEquals(graphSpec.getOutputStreams().get(streamId), 
intermediateStreamImpl.getOutputStream());
-    assertEquals(streamId, intermediateStreamImpl.getStreamId());
-    assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() 
instanceof NoOpSerde);
-    assertEquals(mockValueSerde, 
intermediateStreamImpl.getOutputStream().getValueSerde());
-    assertTrue(((InputOperatorSpec) (OperatorSpec) 
intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
-    assertEquals(mockValueSerde, ((InputOperatorSpec) (OperatorSpec) 
intermediateStreamImpl.getOperatorSpec()).getValueSerde());
-  }
-
-  @Test
-  public void testGetIntermediateStreamWithKeyValueSerde() {
-    String streamId = "streamId";
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-
-    KVSerde mockKVSerde = mock(KVSerde.class);
-    Serde mockKeySerde = mock(Serde.class);
-    Serde mockValueSerde = mock(Serde.class);
-    doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
-    doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
-    IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
-        graphSpec.getIntermediateStream(streamId, mockKVSerde, false);
-
-    assertEquals(graphSpec.getInputOperators().get(streamId), 
intermediateStreamImpl.getOperatorSpec());
-    assertEquals(graphSpec.getOutputStreams().get(streamId), 
intermediateStreamImpl.getOutputStream());
-    assertEquals(streamId, intermediateStreamImpl.getStreamId());
-    assertEquals(mockKeySerde, 
intermediateStreamImpl.getOutputStream().getKeySerde());
-    assertEquals(mockValueSerde, 
intermediateStreamImpl.getOutputStream().getValueSerde());
-    assertEquals(mockKeySerde, ((InputOperatorSpec) (OperatorSpec)  
intermediateStreamImpl.getOperatorSpec()).getKeySerde());
-    assertEquals(mockValueSerde, ((InputOperatorSpec) (OperatorSpec)  
intermediateStreamImpl.getOperatorSpec()).getValueSerde());
-  }
-
-  @Test
-  public void testGetIntermediateStreamWithNoSerde() {
-    Config mockConfig = mock(Config.class);
-    String streamId = "streamId";
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
-    IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
-        graphSpec.getIntermediateStream(streamId, null, false);
-
-    assertEquals(graphSpec.getInputOperators().get(streamId), 
intermediateStreamImpl.getOperatorSpec());
-    assertEquals(graphSpec.getOutputStreams().get(streamId), 
intermediateStreamImpl.getOutputStream());
-    assertEquals(streamId, intermediateStreamImpl.getStreamId());
-    assertNull(intermediateStreamImpl.getOutputStream().getKeySerde());
-    assertNull(intermediateStreamImpl.getOutputStream().getValueSerde());
-    assertNull(((InputOperatorSpec) (OperatorSpec)  
intermediateStreamImpl.getOperatorSpec()).getKeySerde());
-    assertNull(((InputOperatorSpec) (OperatorSpec)  
intermediateStreamImpl.getOperatorSpec()).getValueSerde());
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testGetSameIntermediateStreamTwice() {
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
-    graphSpec.getIntermediateStream("test-stream-1", mock(Serde.class), false);
-    graphSpec.getIntermediateStream("test-stream-1", mock(Serde.class), false);
-  }
-
-  @Test
-  public void testGetNextOpIdIncrementsId() {
-    Config mockConfig = mock(Config.class);
-    when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
-    when(mockConfig.get(eq(JobConfig.JOB_ID()), 
anyString())).thenReturn("1234");
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
-    assertEquals("jobName-1234-merge-0", graphSpec.getNextOpId(OpCode.MERGE, 
null));
-    assertEquals("jobName-1234-join-customName", 
graphSpec.getNextOpId(OpCode.JOIN, "customName"));
-    assertEquals("jobName-1234-map-2", graphSpec.getNextOpId(OpCode.MAP, 
null));
-  }
-
-  @Test(expected = SamzaException.class)
-  public void testGetNextOpIdRejectsDuplicates() {
-    Config mockConfig = mock(Config.class);
-    when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
-    when(mockConfig.get(eq(JobConfig.JOB_ID()), 
anyString())).thenReturn("1234");
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
-    assertEquals("jobName-1234-join-customName", 
graphSpec.getNextOpId(OpCode.JOIN, "customName"));
-    graphSpec.getNextOpId(OpCode.JOIN, "customName"); // should throw
-  }
-
-  @Test
-  public void testIdValidation() {
-    Config mockConfig = mock(Config.class);
-    when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
-    when(mockConfig.get(eq(JobConfig.JOB_ID()), 
anyString())).thenReturn("1234");
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
-
-    // null and empty userDefinedIDs should fall back to autogenerated IDs.
-    try {
-      graphSpec.getNextOpId(OpCode.FILTER, null);
-      graphSpec.getNextOpId(OpCode.FILTER, "");
-      graphSpec.getNextOpId(OpCode.FILTER, " ");
-      graphSpec.getNextOpId(OpCode.FILTER, "\t");
-    } catch (SamzaException e) {
-      fail("Received an error with a null or empty operator ID instead of 
defaulting to auto-generated ID.");
-    }
-
-    List<String> validOpIds = ImmutableList.of("op_id", "op-id", "1000", 
"op_1", "OP_ID");
-    for (String validOpId: validOpIds) {
-      try {
-        graphSpec.getNextOpId(OpCode.FILTER, validOpId);
-      } catch (Exception e) {
-        fail("Received an exception with a valid operator ID: " + validOpId);
-      }
-    }
-
-    List<String> invalidOpIds = ImmutableList.of("op id", "op#id");
-    for (String invalidOpId: invalidOpIds) {
-      try {
-        graphSpec.getNextOpId(OpCode.FILTER, invalidOpId);
-        fail("Did not receive an exception with an invalid operator ID: " + 
invalidOpId);
-      } catch (SamzaException e) { }
-    }
-  }
-
-  @Test
-  public void testGetInputStreamPreservesInsertionOrder() {
-    Config mockConfig = mock(Config.class);
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
-
-    String testStreamId1 = "test-stream-1";
-    String testStreamId2 = "test-stream-2";
-    String testStreamId3 = "test-stream-3";
-    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", 
"mockSystemFactoryClass");
-    graphSpec.getInputStream(sd.getInputDescriptor(testStreamId1, 
mock(Serde.class)));
-    graphSpec.getInputStream(sd.getInputDescriptor(testStreamId2, 
mock(Serde.class)));
-    graphSpec.getInputStream(sd.getInputDescriptor(testStreamId3, 
mock(Serde.class)));
-
-    List<InputOperatorSpec> inputSpecs = new 
ArrayList<>(graphSpec.getInputOperators().values());
-    assertEquals(inputSpecs.size(), 3);
-    assertEquals(inputSpecs.get(0).getStreamId(), testStreamId1);
-    assertEquals(inputSpecs.get(1).getStreamId(), testStreamId2);
-    assertEquals(inputSpecs.get(2).getStreamId(), testStreamId3);
-  }
-
-  @Test
-  public void testGetTable() {
-    Config mockConfig = mock(Config.class);
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
-
-    BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class);
-    when(mockTableDescriptor.getTableId()).thenReturn("t1");
-    when(mockTableDescriptor.getTableSpec()).thenReturn(
-        new TableSpec("t1", KVSerde.of(new NoOpSerde(), new NoOpSerde()), "", 
new HashMap<>()));
-    assertNotNull(graphSpec.getTable(mockTableDescriptor));
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testGetTableWithBadId() {
-    Config mockConfig = mock(Config.class);
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
-
-    BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class);
-    when(mockTableDescriptor.getTableId()).thenReturn("my.table");
-    graphSpec.getTable(mockTableDescriptor);
-  }
-
-  class MockExpandingSystemDescriptor extends 
SystemDescriptor<MockExpandingSystemDescriptor> implements 
ExpandingInputDescriptorProvider<Integer> {
-    public MockExpandingSystemDescriptor(String systemName, StreamExpander 
expander) {
-      super(systemName, "factory.class", null, expander);
-    }
-
-    @Override
-    public MockInputDescriptor<Integer> getInputDescriptor(String streamId, 
Serde serde) {
-      return new MockInputDescriptor<>(streamId, this, serde);
-    }
-  }
-
-  class MockTransformingSystemDescriptor extends 
SystemDescriptor<MockTransformingSystemDescriptor> implements 
TransformingInputDescriptorProvider<Integer> {
-    public MockTransformingSystemDescriptor(String systemName, 
InputTransformer transformer) {
-      super(systemName, "factory.class", transformer, null);
-    }
-
-    @Override
-    public MockInputDescriptor<Integer> getInputDescriptor(String streamId, 
Serde serde) {
-      return new MockInputDescriptor<>(streamId, this, serde);
-    }
-  }
-
-  public class MockInputDescriptor<StreamMessageType> extends 
InputDescriptor<StreamMessageType, MockInputDescriptor<StreamMessageType>> {
-    MockInputDescriptor(String streamId, SystemDescriptor systemDescriptor, 
Serde serde) {
-      super(streamId, serde, systemDescriptor, null);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index 604c72d..6f8a8bc 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -21,7 +21,6 @@ package org.apache.samza.operators.impl;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
-
 import java.io.Serializable;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -35,6 +34,7 @@ import java.util.Set;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import org.apache.samza.Partition;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
@@ -49,7 +49,6 @@ import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.operators.descriptors.GenericInputDescriptor;
 import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
 import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
@@ -58,7 +57,6 @@ import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.InitableFunction;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.util.TimestampedValue;
 import org.apache.samza.operators.spec.OperatorSpec.OpCode;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
@@ -74,6 +72,7 @@ import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.testUtils.StreamTestUtils;
 import org.apache.samza.util.Clock;
 import org.apache.samza.util.SystemClock;
+import org.apache.samza.util.TimestampedValue;
 import org.junit.After;
 import org.junit.Test;
 
@@ -220,7 +219,7 @@ public class TestOperatorImplGraph {
 
   @Test
   public void testEmptyChain() {
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mock(Config.class));
+    StreamApplicationDescriptorImpl graphSpec = new 
StreamApplicationDescriptorImpl(appDesc -> { }, mock(Config.class));
     OperatorImplGraph opGraph =
         new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), 
mock(Config.class), mock(TaskContextImpl.class), mock(Clock.class));
     assertEquals(0, opGraph.getAllInputOperators().size());
@@ -244,17 +243,18 @@ public class TestOperatorImplGraph {
     StreamTestUtils.addStreamConfigs(configs, outputStreamId, outputSystem, 
outputPhysicalName);
     Config config = new MapConfig(configs);
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
-    GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, 
"mockFactoryClass");
-    GenericInputDescriptor inputDescriptor = 
sd.getInputDescriptor(inputStreamId, mock(Serde.class));
-    GenericOutputDescriptor outputDescriptor = 
sd.getOutputDescriptor(outputStreamId, mock(Serde.class));
-    MessageStream<Object> inputStream = 
graphSpec.getInputStream(inputDescriptor);
-    OutputStream<Object> outputStream = 
graphSpec.getOutputStream(outputDescriptor);
+    StreamApplicationDescriptorImpl graphSpec = new 
StreamApplicationDescriptorImpl(appDesc -> {
+        GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, 
"mockFactoryClass");
+        GenericInputDescriptor inputDescriptor = 
sd.getInputDescriptor(inputStreamId, mock(Serde.class));
+        GenericOutputDescriptor outputDescriptor = 
sd.getOutputDescriptor(outputStreamId, mock(Serde.class));
+        MessageStream<Object> inputStream = 
appDesc.getInputStream(inputDescriptor);
+        OutputStream<Object> outputStream = 
appDesc.getOutputStream(outputDescriptor);
 
-    inputStream
-        .filter(mock(FilterFunction.class))
-        .map(mock(MapFunction.class))
-        .sendTo(outputStream);
+        inputStream
+            .filter(mock(FilterFunction.class))
+            .map(mock(MapFunction.class))
+            .sendTo(outputStream);
+      }, config);
 
     TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
     when(mockTaskContext.getMetricsRegistry()).thenReturn(new 
MetricsRegistryMap());
@@ -297,19 +297,20 @@ public class TestOperatorImplGraph {
     StreamTestUtils.addStreamConfigs(configs, outputStreamId, outputSystem, 
outputPhysicalName);
     Config config = new MapConfig(configs);
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
-    GenericSystemDescriptor isd = new GenericSystemDescriptor(inputSystem, 
"mockFactoryClass");
-    GenericSystemDescriptor osd = new GenericSystemDescriptor(outputSystem, 
"mockFactoryClass");
-    GenericInputDescriptor inputDescriptor = 
isd.getInputDescriptor(inputStreamId, mock(Serde.class));
-    GenericOutputDescriptor outputDescriptor = 
osd.getOutputDescriptor(outputStreamId,
-        KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class)));
-    MessageStream<Object> inputStream = 
graphSpec.getInputStream(inputDescriptor);
-    OutputStream<KV<Integer, String>> outputStream = 
graphSpec.getOutputStream(outputDescriptor);
-
-    inputStream
-        .partitionBy(Object::hashCode, Object::toString,
-            KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class)), 
"p1")
-        .sendTo(outputStream);
+    StreamApplicationDescriptorImpl graphSpec = new 
StreamApplicationDescriptorImpl(appDesc -> {
+        GenericSystemDescriptor isd = new GenericSystemDescriptor(inputSystem, 
"mockFactoryClass");
+        GenericSystemDescriptor osd = new 
GenericSystemDescriptor(outputSystem, "mockFactoryClass");
+        GenericInputDescriptor inputDescriptor = 
isd.getInputDescriptor(inputStreamId, mock(Serde.class));
+        GenericOutputDescriptor outputDescriptor = 
osd.getOutputDescriptor(outputStreamId,
+            KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class)));
+        MessageStream<Object> inputStream = 
appDesc.getInputStream(inputDescriptor);
+        OutputStream<KV<Integer, String>> outputStream = 
appDesc.getOutputStream(outputDescriptor);
+
+        inputStream
+            .partitionBy(Object::hashCode, Object::toString,
+                KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class)), 
"p1")
+            .sendTo(outputStream);
+      }, config);
 
     TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
     when(mockTaskContext.getMetricsRegistry()).thenReturn(new 
MetricsRegistryMap());
@@ -351,13 +352,13 @@ public class TestOperatorImplGraph {
     HashMap<String, String> configMap = new HashMap<>();
     StreamTestUtils.addStreamConfigs(configMap, inputStreamId, inputSystem, 
inputPhysicalName);
     Config config = new MapConfig(configMap);
-    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
-
-    GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, 
"mockFactoryClass");
-    GenericInputDescriptor inputDescriptor = 
sd.getInputDescriptor(inputStreamId, mock(Serde.class));
-    MessageStream<Object> inputStream = 
graphSpec.getInputStream(inputDescriptor);
-    inputStream.filter(mock(FilterFunction.class));
-    inputStream.map(mock(MapFunction.class));
+    StreamApplicationDescriptorImpl graphSpec = new 
StreamApplicationDescriptorImpl(appDesc -> {
+        GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, 
"mockFactoryClass");
+        GenericInputDescriptor inputDescriptor = 
sd.getInputDescriptor(inputStreamId, mock(Serde.class));
+        MessageStream<Object> inputStream = 
appDesc.getInputStream(inputDescriptor);
+        inputStream.filter(mock(FilterFunction.class));
+        inputStream.map(mock(MapFunction.class));
+      }, config);
 
     TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
     when(mockTaskContext.getMetricsRegistry()).thenReturn(new 
MetricsRegistryMap());
@@ -380,31 +381,28 @@ public class TestOperatorImplGraph {
     HashMap<String, String> configs = new HashMap<>();
     StreamTestUtils.addStreamConfigs(configs, inputStreamId, inputSystem, 
inputPhysicalName);
     Config config = new MapConfig(configs);
-    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
-
-    GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, 
"mockFactoryClass");
-    GenericInputDescriptor inputDescriptor = 
sd.getInputDescriptor(inputStreamId, mock(Serde.class));
-    MessageStream<Object> inputStream = 
graphSpec.getInputStream(inputDescriptor);
-    MessageStream<Object> stream1 = 
inputStream.filter(mock(FilterFunction.class));
-    MessageStream<Object> stream2 = inputStream.map(mock(MapFunction.class));
-    MessageStream<Object> mergedStream = 
stream1.merge(Collections.singleton(stream2));
-
+    StreamApplicationDescriptorImpl graphSpec = new 
StreamApplicationDescriptorImpl(appDesc -> {
+        GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, 
"mockFactoryClass");
+        GenericInputDescriptor inputDescriptor = 
sd.getInputDescriptor(inputStreamId, mock(Serde.class));
+        MessageStream<Object> inputStream = 
appDesc.getInputStream(inputDescriptor);
+        MessageStream<Object> stream1 = 
inputStream.filter(mock(FilterFunction.class));
+        MessageStream<Object> stream2 = 
inputStream.map(mock(MapFunction.class));
+        stream1.merge(Collections.singleton(stream2))
+            .map(new TestMapFunction<Object, Object>("test-map-1", (Function & 
Serializable) m -> m));
+      }, mock(Config.class));
     TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
     TaskName mockTaskName = mock(TaskName.class);
     when(mockTaskContext.getTaskName()).thenReturn(mockTaskName);
     when(mockTaskContext.getMetricsRegistry()).thenReturn(new 
MetricsRegistryMap());
 
-    MapFunction testMapFunction = new TestMapFunction<Object, 
Object>("test-map-1", (Function & Serializable) m -> m);
-    mergedStream.map(testMapFunction);
-
     OperatorImplGraph opImplGraph =
         new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), 
mock(Config.class), mockTaskContext, mock(Clock.class));
 
     Set<OperatorImpl> opSet = 
opImplGraph.getAllInputOperators().stream().collect(HashSet::new,
         (s, op) -> addOperatorRecursively(s, op), HashSet::addAll);
     Object[] mergeOps = opSet.stream().filter(op -> 
op.getOperatorSpec().getOpCode() == OpCode.MERGE).toArray();
-    assertEquals(mergeOps.length, 1);
-    assertEquals(((OperatorImpl) mergeOps[0]).registeredOperators.size(), 1);
+    assertEquals(1, mergeOps.length);
+    assertEquals(1, ((OperatorImpl) mergeOps[0]).registeredOperators.size());
     OperatorImpl mapOp = (OperatorImpl) ((OperatorImpl) 
mergeOps[0]).registeredOperators.iterator().next();
     assertEquals(mapOp.getOperatorSpec().getOpCode(), OpCode.MAP);
 
@@ -425,21 +423,22 @@ public class TestOperatorImplGraph {
     StreamTestUtils.addStreamConfigs(configs, inputStreamId1, inputSystem, 
inputPhysicalName1);
     StreamTestUtils.addStreamConfigs(configs, inputStreamId2, inputSystem, 
inputPhysicalName2);
     Config config = new MapConfig(configs);
-    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
 
     Integer joinKey = new Integer(1);
     Function<Object, Integer> keyFn = (Function & Serializable) m -> joinKey;
     JoinFunction testJoinFunction = new 
TestJoinFunction("jobName-jobId-join-j1",
         (BiFunction & Serializable) (m1, m2) -> KV.of(m1, m2), keyFn, keyFn);
 
-    GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, 
"mockFactoryClass");
-    GenericInputDescriptor inputDescriptor1 = 
sd.getInputDescriptor(inputStreamId1, mock(Serde.class));
-    GenericInputDescriptor inputDescriptor2 = 
sd.getInputDescriptor(inputStreamId2, mock(Serde.class));
-    MessageStream<Object> inputStream1 = 
graphSpec.getInputStream(inputDescriptor1);
-    MessageStream<Object> inputStream2 = 
graphSpec.getInputStream(inputDescriptor2);
+    StreamApplicationDescriptorImpl graphSpec = new 
StreamApplicationDescriptorImpl(appDesc -> {
+        GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, 
"mockFactoryClass");
+        GenericInputDescriptor inputDescriptor1 = 
sd.getInputDescriptor(inputStreamId1, mock(Serde.class));
+        GenericInputDescriptor inputDescriptor2 = 
sd.getInputDescriptor(inputStreamId2, mock(Serde.class));
+        MessageStream<Object> inputStream1 = 
appDesc.getInputStream(inputDescriptor1);
+        MessageStream<Object> inputStream2 = 
appDesc.getInputStream(inputDescriptor2);
 
-    inputStream1.join(inputStream2, testJoinFunction,
-        mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofHours(1), "j1");
+        inputStream1.join(inputStream2, testJoinFunction,
+            mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofHours(1), "j1");
+      }, config);
 
     TaskName mockTaskName = mock(TaskName.class);
     TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
@@ -496,20 +495,20 @@ public class TestOperatorImplGraph {
     TaskContextImpl mockContext = mock(TaskContextImpl.class);
     when(mockContext.getTaskName()).thenReturn(mockTaskName);
     when(mockContext.getMetricsRegistry()).thenReturn(new 
MetricsRegistryMap());
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
-
-    GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, 
"mockFactoryClass");
-    GenericInputDescriptor inputDescriptor1 = 
sd.getInputDescriptor(inputStreamId1, mock(Serde.class));
-    GenericInputDescriptor inputDescriptor2 = 
sd.getInputDescriptor(inputStreamId2, mock(Serde.class));
-    MessageStream<Object> inputStream1 = 
graphSpec.getInputStream(inputDescriptor1);
-    MessageStream<Object> inputStream2 = 
graphSpec.getInputStream(inputDescriptor2);
+    StreamApplicationDescriptorImpl graphSpec = new 
StreamApplicationDescriptorImpl(appDesc -> {
+        GenericSystemDescriptor sd = new GenericSystemDescriptor(inputSystem, 
"mockFactoryClass");
+        GenericInputDescriptor inputDescriptor1 = 
sd.getInputDescriptor(inputStreamId1, mock(Serde.class));
+        GenericInputDescriptor inputDescriptor2 = 
sd.getInputDescriptor(inputStreamId2, mock(Serde.class));
+        MessageStream<Object> inputStream1 = 
appDesc.getInputStream(inputDescriptor1);
+        MessageStream<Object> inputStream2 = 
appDesc.getInputStream(inputDescriptor2);
 
-    Function mapFn = (Function & Serializable) m -> m;
-    inputStream1.map(new TestMapFunction<Object, Object>("1", mapFn))
-        .map(new TestMapFunction<Object, Object>("2", mapFn));
+        Function mapFn = (Function & Serializable) m -> m;
+        inputStream1.map(new TestMapFunction<Object, Object>("1", mapFn))
+            .map(new TestMapFunction<Object, Object>("2", mapFn));
 
-    inputStream2.map(new TestMapFunction<Object, Object>("3", mapFn))
-        .map(new TestMapFunction<Object, Object>("4", mapFn));
+        inputStream2.map(new TestMapFunction<Object, Object>("3", mapFn))
+            .map(new TestMapFunction<Object, Object>("4", mapFn));
+      }, mockConfig);
 
     OperatorImplGraph opImplGraph = new 
OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mockConfig, mockContext, 
SystemClock.instance());
 
@@ -592,33 +591,34 @@ public class TestOperatorImplGraph {
     StreamTestUtils.addStreamConfigs(configs, outputStreamId2, outputSystem, 
outputStreamId2);
     Config config = new MapConfig(configs);
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
-    GenericSystemDescriptor isd = new GenericSystemDescriptor(inputSystem, 
"mockFactoryClass");
-    GenericInputDescriptor inputDescriptor1 = 
isd.getInputDescriptor(inputStreamId1, mock(Serde.class));
-    GenericInputDescriptor inputDescriptor2 = 
isd.getInputDescriptor(inputStreamId2, mock(Serde.class));
-    GenericInputDescriptor inputDescriptor3 = 
isd.getInputDescriptor(inputStreamId3, mock(Serde.class));
-    GenericSystemDescriptor osd = new GenericSystemDescriptor(outputSystem, 
"mockFactoryClass");
-    GenericOutputDescriptor outputDescriptor1 = 
osd.getOutputDescriptor(outputStreamId1, mock(Serde.class));
-    GenericOutputDescriptor outputDescriptor2 = 
osd.getOutputDescriptor(outputStreamId2, mock(Serde.class));
-    MessageStream messageStream1 = 
graphSpec.getInputStream(inputDescriptor1).map(m -> m);
-    MessageStream messageStream2 = 
graphSpec.getInputStream(inputDescriptor2).filter(m -> true);
-    MessageStream messageStream3 =
-        graphSpec.getInputStream(inputDescriptor3)
-            .filter(m -> true)
-            .partitionBy(m -> "m", m -> m, "p1")
-            .map(m -> m);
-    OutputStream<Object> outputStream1 = 
graphSpec.getOutputStream(outputDescriptor1);
-    OutputStream<Object> outputStream2 = 
graphSpec.getOutputStream(outputDescriptor2);
-
-    messageStream1
-        .join(messageStream2, mock(JoinFunction.class),
-            mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofHours(2), "j1")
-        .partitionBy(m -> "m", m -> m, "p2")
-        .sendTo(outputStream1);
-    messageStream3
-        .join(messageStream2, mock(JoinFunction.class),
-            mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofHours(1), "j2")
-        .sendTo(outputStream2);
+    StreamApplicationDescriptorImpl graphSpec = new 
StreamApplicationDescriptorImpl(appDesc -> {
+        GenericSystemDescriptor isd = new GenericSystemDescriptor(inputSystem, 
"mockFactoryClass");
+        GenericInputDescriptor inputDescriptor1 = 
isd.getInputDescriptor(inputStreamId1, mock(Serde.class));
+        GenericInputDescriptor inputDescriptor2 = 
isd.getInputDescriptor(inputStreamId2, mock(Serde.class));
+        GenericInputDescriptor inputDescriptor3 = 
isd.getInputDescriptor(inputStreamId3, mock(Serde.class));
+        GenericSystemDescriptor osd = new 
GenericSystemDescriptor(outputSystem, "mockFactoryClass");
+        GenericOutputDescriptor outputDescriptor1 = 
osd.getOutputDescriptor(outputStreamId1, mock(Serde.class));
+        GenericOutputDescriptor outputDescriptor2 = 
osd.getOutputDescriptor(outputStreamId2, mock(Serde.class));
+        MessageStream messageStream1 = 
appDesc.getInputStream(inputDescriptor1).map(m -> m);
+        MessageStream messageStream2 = 
appDesc.getInputStream(inputDescriptor2).filter(m -> true);
+        MessageStream messageStream3 =
+            appDesc.getInputStream(inputDescriptor3)
+                .filter(m -> true)
+                .partitionBy(m -> "m", m -> m, "p1")
+                .map(m -> m);
+        OutputStream<Object> outputStream1 = 
appDesc.getOutputStream(outputDescriptor1);
+        OutputStream<Object> outputStream2 = 
appDesc.getOutputStream(outputDescriptor2);
+
+        messageStream1
+            .join(messageStream2, mock(JoinFunction.class),
+                mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofHours(2), "j1")
+            .partitionBy(m -> "m", m -> m, "p2")
+            .sendTo(outputStream1);
+        messageStream3
+            .join(messageStream2, mock(JoinFunction.class),
+                mock(Serde.class), mock(Serde.class), mock(Serde.class), 
Duration.ofHours(1), "j2")
+            .sendTo(outputStream2);
+      }, config);
 
     Multimap<SystemStream, SystemStream> outputToInput =
         
OperatorImplGraph.getIntermediateToInputStreamsMap(graphSpec.getOperatorSpecGraph(),
 new StreamConfig(config));

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
index 6c34fcd..7d468c9 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
@@ -19,25 +19,33 @@
 
 package org.apache.samza.operators.impl;
 
-
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.samza.Partition;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
+import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraphSpec;
+import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.operators.descriptors.GenericInputDescriptor;
 import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.impl.store.TestInMemoryStore;
 import org.apache.samza.operators.impl.store.TimeSeriesKeySerde;
-import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.operators.triggers.FiringType;
 import org.apache.samza.operators.triggers.Trigger;
 import org.apache.samza.operators.triggers.Triggers;
@@ -59,15 +67,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Collections;
-
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
@@ -546,72 +545,80 @@ public class TestWindowOperator {
     verify(taskCoordinator, 
times(1)).shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
   }
 
-  private StreamGraphSpec getKeyedTumblingWindowStreamGraph(AccumulationMode 
mode,
+  private StreamApplicationDescriptorImpl 
getKeyedTumblingWindowStreamGraph(AccumulationMode mode,
       Duration duration, Trigger<KV<Integer, Integer>> earlyTrigger) throws 
IOException {
-    StreamGraphSpec graph = new StreamGraphSpec(config);
-    KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new 
IntegerSerde());
-    GenericSystemDescriptor sd = new GenericSystemDescriptor("kafka", 
"mockFactoryClass");
-    GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = 
sd.getInputDescriptor("integers", kvSerde);
-    graph.getInputStream(inputDescriptor)
-        .window(Windows.keyedTumblingWindow(KV::getKey, duration, new 
IntegerSerde(), kvSerde)
-            .setEarlyTrigger(earlyTrigger).setAccumulationMode(mode), "w1")
-        .sink((message, messageCollector, taskCoordinator) -> {
-            SystemStream outputSystemStream = new SystemStream("outputSystem", 
"outputStream");
-            messageCollector.send(new 
OutgoingMessageEnvelope(outputSystemStream, message));
-          });
-
-    return graph;
+
+    StreamApplication userApp = appDesc -> {
+      KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new 
IntegerSerde());
+      GenericSystemDescriptor sd = new GenericSystemDescriptor("kafka", 
"mockFactoryClass");
+      GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = 
sd.getInputDescriptor("integers", kvSerde);
+      appDesc.getInputStream(inputDescriptor)
+          .window(Windows.keyedTumblingWindow(KV::getKey, duration, new 
IntegerSerde(), kvSerde)
+              .setEarlyTrigger(earlyTrigger).setAccumulationMode(mode), "w1")
+          .sink((message, messageCollector, taskCoordinator) -> {
+              SystemStream outputSystemStream = new 
SystemStream("outputSystem", "outputStream");
+              messageCollector.send(new 
OutgoingMessageEnvelope(outputSystemStream, message));
+            });
+    };
+
+    return new StreamApplicationDescriptorImpl(userApp, config);
   }
 
-  private StreamGraphSpec getTumblingWindowStreamGraph(AccumulationMode mode,
+  private StreamApplicationDescriptorImpl 
getTumblingWindowStreamGraph(AccumulationMode mode,
       Duration duration, Trigger<KV<Integer, Integer>> earlyTrigger) throws 
IOException {
-    StreamGraphSpec graph = new StreamGraphSpec(config);
-    KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new 
IntegerSerde());
-    GenericSystemDescriptor sd = new GenericSystemDescriptor("kafka", 
"mockFactoryClass");
-    GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = 
sd.getInputDescriptor("integers", kvSerde);
-    graph.getInputStream(inputDescriptor)
-        .window(Windows.tumblingWindow(duration, 
kvSerde).setEarlyTrigger(earlyTrigger)
-            .setAccumulationMode(mode), "w1")
-        .sink((message, messageCollector, taskCoordinator) -> {
-            SystemStream outputSystemStream = new SystemStream("outputSystem", 
"outputStream");
-            messageCollector.send(new 
OutgoingMessageEnvelope(outputSystemStream, message));
-          });
-    return graph;
+    StreamApplication userApp = appDesc -> {
+      KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new 
IntegerSerde());
+      GenericSystemDescriptor sd = new GenericSystemDescriptor("kafka", 
"mockFactoryClass");
+      GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = 
sd.getInputDescriptor("integers", kvSerde);
+      appDesc.getInputStream(inputDescriptor)
+          .window(Windows.tumblingWindow(duration, 
kvSerde).setEarlyTrigger(earlyTrigger)
+              .setAccumulationMode(mode), "w1")
+          .sink((message, messageCollector, taskCoordinator) -> {
+              SystemStream outputSystemStream = new 
SystemStream("outputSystem", "outputStream");
+              messageCollector.send(new 
OutgoingMessageEnvelope(outputSystemStream, message));
+            });
+    };
+
+    return new StreamApplicationDescriptorImpl(userApp, config);
   }
 
-  private StreamGraphSpec getKeyedSessionWindowStreamGraph(AccumulationMode 
mode, Duration duration) throws IOException {
-    StreamGraphSpec graph = new StreamGraphSpec(config);
-    KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new 
IntegerSerde());
-    GenericSystemDescriptor sd = new GenericSystemDescriptor("kafka", 
"mockFactoryClass");
-    GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = 
sd.getInputDescriptor("integers", kvSerde);
-    graph.getInputStream(inputDescriptor)
-        .window(Windows.keyedSessionWindow(KV::getKey, duration, new 
IntegerSerde(), kvSerde)
-            .setAccumulationMode(mode), "w1")
-        .sink((message, messageCollector, taskCoordinator) -> {
-            SystemStream outputSystemStream = new SystemStream("outputSystem", 
"outputStream");
-            messageCollector.send(new 
OutgoingMessageEnvelope(outputSystemStream, message));
-          });
-    return graph;
+  private StreamApplicationDescriptorImpl 
getKeyedSessionWindowStreamGraph(AccumulationMode mode, Duration duration) 
throws IOException {
+    StreamApplication userApp = appDesc -> {
+      KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new 
IntegerSerde());
+      GenericSystemDescriptor sd = new GenericSystemDescriptor("kafka", 
"mockFactoryClass");
+      GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = 
sd.getInputDescriptor("integers", kvSerde);
+      appDesc.getInputStream(inputDescriptor)
+          .window(Windows.keyedSessionWindow(KV::getKey, duration, new 
IntegerSerde(), kvSerde)
+              .setAccumulationMode(mode), "w1")
+          .sink((message, messageCollector, taskCoordinator) -> {
+              SystemStream outputSystemStream = new 
SystemStream("outputSystem", "outputStream");
+              messageCollector.send(new 
OutgoingMessageEnvelope(outputSystemStream, message));
+            });
+    };
+
+    return new StreamApplicationDescriptorImpl(userApp, config);
   }
 
-  private StreamGraphSpec 
getAggregateTumblingWindowStreamGraph(AccumulationMode mode, Duration 
timeDuration,
+  private StreamApplicationDescriptorImpl 
getAggregateTumblingWindowStreamGraph(AccumulationMode mode, Duration 
timeDuration,
         Trigger<IntegerEnvelope> earlyTrigger) throws IOException {
-    StreamGraphSpec graph = new StreamGraphSpec(config);
-    KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new 
IntegerSerde());
-    GenericSystemDescriptor sd = new GenericSystemDescriptor("kafka", 
"mockFactoryClass");
-    GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = 
sd.getInputDescriptor("integers", kvSerde);
-    MessageStream<KV<Integer, Integer>> integers = 
graph.getInputStream(inputDescriptor);
-
-    integers
-        .map(new KVMapFunction())
-        .window(Windows.<IntegerEnvelope, Integer>tumblingWindow(timeDuration, 
() -> 0, (m, c) -> c + 1, new IntegerSerde())
-            .setEarlyTrigger(earlyTrigger)
-            .setAccumulationMode(mode), "w1")
-        .sink((message, messageCollector, taskCoordinator) -> {
-            SystemStream outputSystemStream = new SystemStream("outputSystem", 
"outputStream");
-            messageCollector.send(new 
OutgoingMessageEnvelope(outputSystemStream, message));
-          });
-    return graph;
+    StreamApplication userApp = appDesc -> {
+      KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new 
IntegerSerde());
+      GenericSystemDescriptor sd = new GenericSystemDescriptor("kafka", 
"mockFactoryClass");
+      GenericInputDescriptor<KV<Integer, Integer>> inputDescriptor = 
sd.getInputDescriptor("integers", kvSerde);
+      MessageStream<KV<Integer, Integer>> integers = 
appDesc.getInputStream(inputDescriptor);
+
+      integers
+          .map(new KVMapFunction())
+          .window(Windows.<IntegerEnvelope, 
Integer>tumblingWindow(timeDuration, () -> 0, (m, c) -> c + 1, new 
IntegerSerde())
+              .setEarlyTrigger(earlyTrigger)
+              .setAccumulationMode(mode), "w1")
+          .sink((message, messageCollector, taskCoordinator) -> {
+              SystemStream outputSystemStream = new 
SystemStream("outputSystem", "outputStream");
+              messageCollector.send(new 
OutgoingMessageEnvelope(outputSystemStream, message));
+            });
+    };
+
+    return new StreamApplicationDescriptorImpl(userApp, config);
   }
 
   private static class IntegerEnvelope extends IncomingMessageEnvelope {

Reply via email to