http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java 
b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
index dac4e94..602b595 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
@@ -21,9 +21,8 @@ package org.apache.samza.operators;
 import com.google.common.collect.ImmutableSet;
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
-import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.operators.functions.JoinFunction;
@@ -44,18 +43,24 @@ import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.testUtils.TestClock;
 import org.apache.samza.util.Clock;
 import org.apache.samza.util.SystemClock;
+import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.anyString;
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class TestJoinOperator {
@@ -64,10 +69,22 @@ public class TestJoinOperator {
   private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class);
   private final Set<Integer> numbers = ImmutableSet.of(1, 2, 3, 4, 5, 6, 7, 8, 
9, 10);
 
+  private Config config;
+
+  @Before
+  public void setUp() {
+    Map<String, String> mapConfig = new HashMap<>();
+    mapConfig.put("app.runner.class", 
"org.apache.samza.runtime.LocalApplicationRunner");
+    mapConfig.put("job.default.system", "insystem");
+    mapConfig.put("job.name", "jobName");
+    mapConfig.put("job.id", "jobId");
+    config = new MapConfig(mapConfig);
+  }
+
   @Test
   public void join() throws Exception {
-    TestJoinStreamApplication app = new TestJoinStreamApplication(new 
TestJoinFunction());
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app);
+    StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new 
TestJoinFunction());
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
graphSpec);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -82,43 +99,42 @@ public class TestJoinOperator {
 
   @Test(expected = SamzaException.class)
   public void joinWithSelfThrowsException() throws Exception {
-    StreamApplication app = new StreamApplication() {
-      @Override
-      public void init(StreamGraph graph, Config config) {
-        IntegerSerde integerSerde = new IntegerSerde();
-        KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, 
integerSerde);
-        MessageStream<KV<Integer, Integer>> inStream = 
graph.getInputStream("instream", kvSerde);
-
-        inStream.join(inStream, new TestJoinFunction(), integerSerde, kvSerde, 
kvSerde, JOIN_TTL, "join");
-      }
-    };
-
-    createStreamOperatorTask(new SystemClock(), app); // should throw an 
exception
+    config.put("streams.instream.system", "insystem");
+
+    StreamGraphSpec graphSpec = new 
StreamGraphSpec(mock(ApplicationRunner.class), config);
+    IntegerSerde integerSerde = new IntegerSerde();
+    KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, integerSerde);
+    MessageStream<KV<Integer, Integer>> inStream = 
graphSpec.getInputStream("instream", kvSerde);
+
+    inStream.join(inStream, new TestJoinFunction(), integerSerde, kvSerde, 
kvSerde, JOIN_TTL, "join");
+
+    createStreamOperatorTask(new SystemClock(), graphSpec); // should throw an 
exception
   }
 
   @Test
   public void joinFnInitAndClose() throws Exception {
     TestJoinFunction joinFn = new TestJoinFunction();
-    TestJoinStreamApplication app = new TestJoinStreamApplication(joinFn);
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app);
-    assertEquals(1, joinFn.getNumInitCalls());
+    StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(joinFn);
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
graphSpec);
+
     MessageCollector messageCollector = mock(MessageCollector.class);
 
     // push messages to first stream
     numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), 
messageCollector, taskCoordinator));
 
     // close should not be called till now
-    assertEquals(0, joinFn.getNumCloseCalls());
     sot.close();
 
-    // close should be called from sot.close()
-    assertEquals(1, joinFn.getNumCloseCalls());
+    verify(messageCollector, 
times(0)).send(any(OutgoingMessageEnvelope.class));
+    // Make sure the joinFn has been copied instead of directly referred by 
the task instance
+    assertEquals(0, joinFn.getNumInitCalls());
+    assertEquals(0, joinFn.getNumCloseCalls());
   }
 
   @Test
   public void joinReverse() throws Exception {
-    TestJoinStreamApplication app = new TestJoinStreamApplication(new 
TestJoinFunction());
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app);
+    StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new 
TestJoinFunction());
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
graphSpec);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -133,8 +149,8 @@ public class TestJoinOperator {
 
   @Test
   public void joinNoMatch() throws Exception {
-    TestJoinStreamApplication app = new TestJoinStreamApplication(new 
TestJoinFunction());
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app);
+    StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new 
TestJoinFunction());
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
graphSpec);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -148,8 +164,8 @@ public class TestJoinOperator {
 
   @Test
   public void joinNoMatchReverse() throws Exception {
-    TestJoinStreamApplication app = new TestJoinStreamApplication(new 
TestJoinFunction());
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app);
+    StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new 
TestJoinFunction());
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
graphSpec);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -163,8 +179,8 @@ public class TestJoinOperator {
 
   @Test
   public void joinRetainsLatestMessageForKey() throws Exception {
-    TestJoinStreamApplication app = new TestJoinStreamApplication(new 
TestJoinFunction());
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app);
+    StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new 
TestJoinFunction());
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
graphSpec);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -181,8 +197,8 @@ public class TestJoinOperator {
 
   @Test
   public void joinRetainsLatestMessageForKeyReverse() throws Exception {
-    TestJoinStreamApplication app = new TestJoinStreamApplication(new 
TestJoinFunction());
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app);
+    StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new 
TestJoinFunction());
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
graphSpec);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -199,8 +215,8 @@ public class TestJoinOperator {
 
   @Test
   public void joinRetainsMatchedMessages() throws Exception {
-    TestJoinStreamApplication app = new TestJoinStreamApplication(new 
TestJoinFunction());
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app);
+    StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new 
TestJoinFunction());
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
graphSpec);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -222,8 +238,8 @@ public class TestJoinOperator {
 
   @Test
   public void joinRetainsMatchedMessagesReverse() throws Exception {
-    TestJoinStreamApplication app = new TestJoinStreamApplication(new 
TestJoinFunction());
-    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), app);
+    StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new 
TestJoinFunction());
+    StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), 
graphSpec);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -246,8 +262,8 @@ public class TestJoinOperator {
   @Test
   public void joinRemovesExpiredMessages() throws Exception {
     TestClock testClock = new TestClock();
-    TestJoinStreamApplication app = new TestJoinStreamApplication(new 
TestJoinFunction());
-    StreamOperatorTask sot = createStreamOperatorTask(testClock, app);
+    StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new 
TestJoinFunction());
+    StreamOperatorTask sot = createStreamOperatorTask(testClock, graphSpec);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -266,8 +282,8 @@ public class TestJoinOperator {
   @Test
   public void joinRemovesExpiredMessagesReverse() throws Exception {
     TestClock testClock = new TestClock();
-    TestJoinStreamApplication app = new TestJoinStreamApplication(new 
TestJoinFunction());
-    StreamOperatorTask sot = createStreamOperatorTask(testClock, app);
+    StreamGraphSpec graphSpec = this.getTestJoinStreamGraph(new 
TestJoinFunction());
+    StreamOperatorTask sot = createStreamOperatorTask(testClock, graphSpec);
     List<Integer> output = new ArrayList<>();
     MessageCollector messageCollector = envelope -> output.add((Integer) 
envelope.getMessage());
 
@@ -283,15 +299,12 @@ public class TestJoinOperator {
     assertTrue(output.isEmpty());
   }
 
-  private StreamOperatorTask createStreamOperatorTask(Clock clock, 
StreamApplication app) throws Exception {
-    ApplicationRunner runner = mock(ApplicationRunner.class);
-    when(runner.getStreamSpec("instream")).thenReturn(new 
StreamSpec("instream", "instream", "insystem"));
-    when(runner.getStreamSpec("instream2")).thenReturn(new 
StreamSpec("instream2", "instream2", "insystem2"));
+  private StreamOperatorTask createStreamOperatorTask(Clock clock, 
StreamGraphSpec graphSpec) throws Exception {
 
     TaskContextImpl taskContext = mock(TaskContextImpl.class);
     when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
         .of(new SystemStreamPartition("insystem", "instream", new 
Partition(0)),
-            new SystemStreamPartition("insystem2", "instream2", new 
Partition(0))));
+            new SystemStreamPartition("insystem", "instream2", new 
Partition(0))));
     when(taskContext.getMetricsRegistry()).thenReturn(new 
MetricsRegistryMap());
     // need to return different stores for left and right side
     IntegerSerde integerSerde = new IntegerSerde();
@@ -301,35 +314,30 @@ public class TestJoinOperator {
     when(taskContext.getStore(eq("jobName-jobId-join-j1-R")))
         .thenReturn(new TestInMemoryStore(integerSerde, 
timestampedValueSerde));
 
-    Config config = mock(Config.class);
-    when(config.get(JobConfig.JOB_NAME())).thenReturn("jobName");
-    when(config.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId");
-
-    StreamOperatorTask sot = new StreamOperatorTask(app, runner, clock);
+    StreamOperatorTask sot = new 
StreamOperatorTask(graphSpec.getOperatorSpecGraph(), 
graphSpec.getContextManager(), clock);
     sot.init(config, taskContext);
     return sot;
   }
 
-  private static class TestJoinStreamApplication implements StreamApplication {
-
-    private final TestJoinFunction joinFn;
-
-    TestJoinStreamApplication(TestJoinFunction joinFn) {
-      this.joinFn = joinFn;
-    }
+  private StreamGraphSpec getTestJoinStreamGraph(TestJoinFunction joinFn) 
throws IOException {
+    ApplicationRunner runner = mock(ApplicationRunner.class);
+    when(runner.getStreamSpec("instream")).thenReturn(new 
StreamSpec("instream", "instream", "insystem"));
+    when(runner.getStreamSpec("instream2")).thenReturn(new 
StreamSpec("instream2", "instream2", "insystem"));
 
-    @Override
-    public void init(StreamGraph graph, Config config) {
-      IntegerSerde integerSerde = new IntegerSerde();
-      KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, 
integerSerde);
-      MessageStream<KV<Integer, Integer>> inStream = 
graph.getInputStream("instream", kvSerde);
-      MessageStream<KV<Integer, Integer>> inStream2 = 
graph.getInputStream("instream2", kvSerde);
-
-      SystemStream outputSystemStream = new SystemStream("outputSystem", 
"outputStream");
-      inStream
-          .join(inStream2, joinFn, integerSerde, kvSerde, kvSerde, JOIN_TTL, 
"j1")
-          .sink((m, mc, tc) -> mc.send(new 
OutgoingMessageEnvelope(outputSystemStream, m)));
-    }
+    StreamGraphSpec graphSpec = new StreamGraphSpec(runner, config);
+    IntegerSerde integerSerde = new IntegerSerde();
+    KVSerde<Integer, Integer> kvSerde = KVSerde.of(integerSerde, integerSerde);
+    MessageStream<KV<Integer, Integer>> inStream = 
graphSpec.getInputStream("instream", kvSerde);
+    MessageStream<KV<Integer, Integer>> inStream2 = 
graphSpec.getInputStream("instream2", kvSerde);
+
+    inStream
+        .join(inStream2, joinFn, integerSerde, kvSerde, kvSerde, JOIN_TTL, 
"j1")
+        .sink((message, messageCollector, taskCoordinator) -> {
+            SystemStream outputSystemStream = new SystemStream("outputSystem", 
"outputStream");
+            messageCollector.send(new 
OutgoingMessageEnvelope(outputSystemStream, message));
+          });
+
+    return graphSpec;
   }
 
   private static class TestJoinFunction
@@ -380,7 +388,7 @@ public class TestJoinOperator {
 
   private static class SecondStreamIME extends IncomingMessageEnvelope {
     SecondStreamIME(Integer key, Integer value) {
-      super(new SystemStreamPartition("insystem2", "instream2", new 
Partition(0)), "1", key, value);
+      super(new SystemStreamPartition("insystem", "instream2", new 
Partition(0)), "1", key, value);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
 
b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
index 96e234e..fff85e8 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
@@ -18,11 +18,11 @@
  */
 package org.apache.samza.operators;
 
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
 import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.function.Function;
-import java.util.function.Supplier;
 
 import org.apache.samza.operators.data.TestMessageEnvelope;
 import org.apache.samza.operators.data.TestOutputMessageEnvelope;
@@ -32,6 +32,7 @@ import org.apache.samza.operators.functions.FoldLeftFunction;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.functions.SupplierFunction;
 import org.apache.samza.operators.functions.StreamTableJoinFunction;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
@@ -54,8 +55,6 @@ import org.apache.samza.table.TableSpec;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
-import com.google.common.collect.ImmutableList;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -71,7 +70,7 @@ public class TestMessageStreamImpl {
 
   @Test
   public void testMap() {
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
     OperatorSpec mockOpSpec = mock(OperatorSpec.class);
     MessageStreamImpl<TestMessageEnvelope> inputStream = new 
MessageStreamImpl<>(mockGraph, mockOpSpec);
 
@@ -96,7 +95,7 @@ public class TestMessageStreamImpl {
 
   @Test
   public void testFlatMap() {
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
     OperatorSpec mockOpSpec = mock(OperatorSpec.class);
     MessageStreamImpl<TestMessageEnvelope> inputStream = new 
MessageStreamImpl<>(mockGraph, mockOpSpec);
 
@@ -113,7 +112,7 @@ public class TestMessageStreamImpl {
 
   @Test
   public void testFlatMapWithRelaxedTypes() {
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
     OperatorSpec mockOpSpec = mock(OperatorSpec.class);
     MessageStreamImpl<TestInputMessageEnvelope> inputStream = new 
MessageStreamImpl<>(mockGraph, mockOpSpec);
 
@@ -133,7 +132,7 @@ public class TestMessageStreamImpl {
 
   @Test
   public void testFilter() {
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
     OperatorSpec mockOpSpec = mock(OperatorSpec.class);
     MessageStreamImpl<TestMessageEnvelope> inputStream = new 
MessageStreamImpl<>(mockGraph, mockOpSpec);
 
@@ -158,7 +157,7 @@ public class TestMessageStreamImpl {
 
   @Test
   public void testSink() {
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
     OperatorSpec mockOpSpec = mock(OperatorSpec.class);
     MessageStreamImpl<TestMessageEnvelope> inputStream = new 
MessageStreamImpl<>(mockGraph, mockOpSpec);
 
@@ -175,7 +174,7 @@ public class TestMessageStreamImpl {
 
   @Test
   public void testSendTo() {
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
     OperatorSpec mockOpSpec = mock(OperatorSpec.class);
     MessageStreamImpl<TestMessageEnvelope> inputStream = new 
MessageStreamImpl<>(mockGraph, mockOpSpec);
     OutputStreamImpl<TestMessageEnvelope> mockOutputStreamImpl = 
mock(OutputStreamImpl.class);
@@ -200,8 +199,8 @@ public class TestMessageStreamImpl {
   }
 
   @Test
-  public void testRepartition() {
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+  public void testPartitionBy() throws IOException {
+    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
     OperatorSpec mockOpSpec = mock(OperatorSpec.class);
     String mockOpName = "mockName";
     when(mockGraph.getNextOpId(anyObject(), 
anyObject())).thenReturn(mockOpName);
@@ -215,8 +214,8 @@ public class TestMessageStreamImpl {
     when(mockIntermediateStream.isKeyed()).thenReturn(true);
 
     MessageStreamImpl<TestMessageEnvelope> inputStream = new 
MessageStreamImpl<>(mockGraph, mockOpSpec);
-    Function mockKeyFunction = mock(Function.class);
-    Function mockValueFunction = mock(Function.class);
+    MapFunction mockKeyFunction = mock(MapFunction.class);
+    MapFunction mockValueFunction = mock(MapFunction.class);
     inputStream.partitionBy(mockKeyFunction, mockValueFunction, mockKVSerde, 
"p1");
 
     ArgumentCaptor<OperatorSpec> registeredOpCaptor = 
ArgumentCaptor.forClass(OperatorSpec.class);
@@ -232,7 +231,7 @@ public class TestMessageStreamImpl {
 
   @Test
   public void testRepartitionWithoutSerde() {
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
     OperatorSpec mockOpSpec = mock(OperatorSpec.class);
     String mockOpName = "mockName";
     when(mockGraph.getNextOpId(anyObject(), 
anyObject())).thenReturn(mockOpName);
@@ -245,8 +244,8 @@ public class TestMessageStreamImpl {
     when(mockIntermediateStream.isKeyed()).thenReturn(true);
 
     MessageStreamImpl<TestMessageEnvelope> inputStream = new 
MessageStreamImpl<>(mockGraph, mockOpSpec);
-    Function mockKeyFunction = mock(Function.class);
-    Function mockValueFunction = mock(Function.class);
+    MapFunction mockKeyFunction = mock(MapFunction.class);
+    MapFunction mockValueFunction = mock(MapFunction.class);
     inputStream.partitionBy(mockKeyFunction, mockValueFunction, "p1");
 
     ArgumentCaptor<OperatorSpec> registeredOpCaptor = 
ArgumentCaptor.forClass(OperatorSpec.class);
@@ -262,18 +261,17 @@ public class TestMessageStreamImpl {
 
   @Test
   public void testWindowWithRelaxedTypes() throws Exception {
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
     OperatorSpec mockOpSpec = mock(OperatorSpec.class);
     MessageStream<TestInputMessageEnvelope> inputStream = new 
MessageStreamImpl<>(mockGraph, mockOpSpec);
 
-    Function<TestMessageEnvelope, String> keyExtractor = m -> m.getKey();
+    MapFunction<TestMessageEnvelope, String> keyExtractor = m -> m.getKey();
     FoldLeftFunction<TestMessageEnvelope, Integer> aggregator = (m, c) -> c + 
1;
-    Supplier<Integer> initialValue = () -> 0;
+    SupplierFunction<Integer> initialValue = () -> 0;
 
     // should compile since TestMessageEnvelope (input for functions) is base 
class of TestInputMessageEnvelope (M)
-    Window<TestInputMessageEnvelope, String, Integer> window =
-        Windows.keyedTumblingWindow(keyExtractor, Duration.ofHours(1), 
initialValue, aggregator,
-            null, mock(Serde.class));
+    Window<TestInputMessageEnvelope, String, Integer> window = Windows
+        .keyedTumblingWindow(keyExtractor, Duration.ofHours(1), initialValue, 
aggregator, null, mock(Serde.class));
     MessageStream<WindowPane<String, Integer>> windowedStream = 
inputStream.window(window, "w1");
 
     ArgumentCaptor<OperatorSpec> registeredOpCaptor = 
ArgumentCaptor.forClass(OperatorSpec.class);
@@ -287,7 +285,7 @@ public class TestMessageStreamImpl {
 
   @Test
   public void testJoin() {
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
     OperatorSpec leftInputOpSpec = mock(OperatorSpec.class);
     MessageStreamImpl<TestMessageEnvelope> source1 = new 
MessageStreamImpl<>(mockGraph, leftInputOpSpec);
     OperatorSpec rightInputOpSpec = mock(OperatorSpec.class);
@@ -319,7 +317,7 @@ public class TestMessageStreamImpl {
 
   @Test
   public void testSendToTable() {
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
     OperatorSpec inputOpSpec = mock(OperatorSpec.class);
     MessageStreamImpl<TestMessageEnvelope> source = new 
MessageStreamImpl<>(mockGraph, inputOpSpec);
 
@@ -336,13 +334,12 @@ public class TestMessageStreamImpl {
     SendToTableOperatorSpec sendToTableOperatorSpec = 
(SendToTableOperatorSpec) registeredOpSpec;
 
     assertEquals(OpCode.SEND_TO, sendToTableOperatorSpec.getOpCode());
-    assertEquals(inputOpSpec, sendToTableOperatorSpec.getInputOpSpec());
     assertEquals(tableSpec, sendToTableOperatorSpec.getTableSpec());
   }
 
   @Test
   public void testStreamTableJoin() {
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
     OperatorSpec leftInputOpSpec = mock(OperatorSpec.class);
     MessageStreamImpl<KV<String, TestMessageEnvelope>> source1 = new 
MessageStreamImpl<>(mockGraph, leftInputOpSpec);
     OperatorSpec rightInputOpSpec = mock(OperatorSpec.class);
@@ -370,7 +367,7 @@ public class TestMessageStreamImpl {
 
   @Test
   public void testMerge() {
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
     OperatorSpec mockOpSpec1 = mock(OperatorSpec.class);
     MessageStream<TestMessageEnvelope> inputStream = new 
MessageStreamImpl<>(mockGraph, mockOpSpec1);
 
@@ -410,7 +407,7 @@ public class TestMessageStreamImpl {
 
   @Test
   public void testMergeWithRelaxedTypes() {
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    StreamGraphSpec mockGraph = mock(StreamGraphSpec.class);
     MessageStream<TestMessageEnvelope> inputStream = new 
MessageStreamImpl<>(mockGraph, mock(OperatorSpec.class));
 
     // other streams have the same message type T as input stream message type 
M

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
 
b/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
new file mode 100644
index 0000000..2be88ca
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for THE
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.io.ObjectInputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.WatermarkFunction;
+import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpecTestUtils;
+import org.apache.samza.operators.spec.OperatorSpecs;
+import org.apache.samza.operators.spec.OutputOperatorSpec;
+import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.system.StreamSpec;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+
+/**
+ * Unit tests for {@link OperatorSpecGraph}
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(OperatorSpec.class)
+public class TestOperatorSpecGraph {
+
+  private StreamGraphSpec mockGraph;
+  private Map<StreamSpec, InputOperatorSpec> inputOpSpecMap;
+  private Map<StreamSpec, OutputStreamImpl> outputStrmMap;
+  private Set<OperatorSpec> allOpSpecs;
+
+  @Before
+  public void setUp() {
+    this.mockGraph = mock(StreamGraphSpec.class);
+
+    /**
+     * Setup two linear transformation pipelines:
+     * 1) input1 --> filter --> sendTo
+     * 2) input2 --> map --> sink
+     */
+    StreamSpec testInputSpec = new StreamSpec("test-input-1", "test-input-1", 
"kafka");
+    InputOperatorSpec testInput = new InputOperatorSpec(testInputSpec, new 
NoOpSerde(), new NoOpSerde(), true, "test-input-1");
+    StreamOperatorSpec filterOp = OperatorSpecs.createFilterOperatorSpec(m -> 
true, "test-filter-2");
+    StreamSpec testOutputSpec = new StreamSpec("test-output-1", 
"test-output-1", "kafka");
+    OutputStreamImpl outputStream1 = new OutputStreamImpl(testOutputSpec, 
null, null, true);
+    OutputOperatorSpec outputSpec = 
OperatorSpecs.createSendToOperatorSpec(outputStream1, "test-output-3");
+    testInput.registerNextOperatorSpec(filterOp);
+    filterOp.registerNextOperatorSpec(outputSpec);
+    StreamSpec testInputSpec2 = new StreamSpec("test-input-2", "test-input-2", 
"kafka");
+    InputOperatorSpec testInput2 = new InputOperatorSpec(testInputSpec2, new 
NoOpSerde(), new NoOpSerde(), true, "test-input-4");
+    StreamOperatorSpec testMap = OperatorSpecs.createMapOperatorSpec(m -> m, 
"test-map-5");
+    SinkOperatorSpec testSink = OperatorSpecs.createSinkOperatorSpec((m, mc, 
tc) -> { }, "test-sink-6");
+    testInput2.registerNextOperatorSpec(testMap);
+    testMap.registerNextOperatorSpec(testSink);
+
+    this.inputOpSpecMap = new LinkedHashMap<>();
+    inputOpSpecMap.put(testInputSpec, testInput);
+    inputOpSpecMap.put(testInputSpec2, testInput2);
+    this.outputStrmMap = new LinkedHashMap<>();
+    outputStrmMap.put(testOutputSpec, outputStream1);
+    
when(mockGraph.getInputOperators()).thenReturn(Collections.unmodifiableMap(inputOpSpecMap));
+    
when(mockGraph.getOutputStreams()).thenReturn(Collections.unmodifiableMap(outputStrmMap));
+    this.allOpSpecs = new HashSet<OperatorSpec>() { {
+        this.add(testInput);
+        this.add(filterOp);
+        this.add(outputSpec);
+        this.add(testInput2);
+        this.add(testMap);
+        this.add(testSink);
+      } };
+  }
+
+  @After
+  public void tearDown() {
+    this.mockGraph = null;
+    this.inputOpSpecMap = null;
+    this.outputStrmMap = null;
+    this.allOpSpecs = null;
+  }
+
+  @Test
+  public void testConstructor() {
+    OperatorSpecGraph specGraph = new OperatorSpecGraph(mockGraph);
+    assertEquals(specGraph.getInputOperators(), inputOpSpecMap);
+    assertEquals(specGraph.getOutputStreams(), outputStrmMap);
+    assertTrue(specGraph.getTables().isEmpty());
+    assertTrue(!specGraph.hasWindowOrJoins());
+    assertEquals(specGraph.getAllOperatorSpecs(), this.allOpSpecs);
+  }
+
+  @Test
+  public void testClone() {
+    OperatorSpecGraph operatorSpecGraph = new OperatorSpecGraph(mockGraph);
+    OperatorSpecGraph clonedSpecGraph = operatorSpecGraph.clone();
+    OperatorSpecTestUtils.assertClonedGraph(operatorSpecGraph, 
clonedSpecGraph);
+  }
+
+  @Test(expected = NotSerializableException.class)
+  public void testCloneWithSerializationError() throws Throwable {
+    OperatorSpec mockFailedOpSpec = PowerMockito.mock(OperatorSpec.class);
+    when(mockFailedOpSpec.getOpId()).thenReturn("test-failed-op-4");
+    allOpSpecs.add(mockFailedOpSpec);
+    
inputOpSpecMap.values().stream().findFirst().get().registerNextOperatorSpec(mockFailedOpSpec);
+
+    //failed with serialization error
+    try {
+      new OperatorSpecGraph(mockGraph);
+      fail("Should have failed with serialization error");
+    } catch (SamzaException se) {
+      throw se.getCause();
+    }
+  }
+
+  @Test(expected = IOException.class)
+  public void testCloneWithDeserializationError() throws Throwable {
+    TestDeserializeOperatorSpec testOp = new 
TestDeserializeOperatorSpec(OperatorSpec.OpCode.MAP, "test-failed-op-4");
+    this.allOpSpecs.add(testOp);
+    
inputOpSpecMap.values().stream().findFirst().get().registerNextOperatorSpec(testOp);
+
+    OperatorSpecGraph operatorSpecGraph = new OperatorSpecGraph(mockGraph);
+    //failed with serialization error
+    try {
+      operatorSpecGraph.clone();
+      fail("Should have failed with serialization error");
+    } catch (SamzaException se) {
+      throw se.getCause();
+    }
+  }
+
+  private static class TestDeserializeOperatorSpec extends OperatorSpec {
+
+    public TestDeserializeOperatorSpec(OpCode opCode, String opId) {
+      super(opCode, opId);
+    }
+
+    private void readObject(ObjectInputStream ois) throws IOException, 
ClassNotFoundException {
+      throw new IOException("Raise IOException to cause deserialization 
failure");
+    }
+
+    @Override
+    public WatermarkFunction getWatermarkFn() {
+      return null;
+    }
+
+    @Override
+    public TimerFunction getTimerFn() {
+      return null;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java 
b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
deleted file mode 100644
index 3bb44b5..0000000
--- 
a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
+++ /dev/null
@@ -1,601 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for THE
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.operators.data.TestMessageEnvelope;
-import org.apache.samza.operators.spec.InputOperatorSpec;
-import org.apache.samza.operators.spec.OperatorSpec.OpCode;
-import org.apache.samza.operators.spec.OutputStreamImpl;
-import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
-import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.system.StreamSpec;
-import org.apache.samza.table.TableSpec;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableList;
-import junit.framework.Assert;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class TestStreamGraphImpl {
-
-  @Test
-  public void testGetInputStreamWithValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, 
mock(Config.class));
-
-    Serde mockValueSerde = mock(Serde.class);
-    MessageStream<TestMessageEnvelope> inputStream = 
graph.getInputStream("test-stream-1", mockValueSerde);
-
-    InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
-        (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) 
inputStream).getOperatorSpec();
-    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
-    assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
-    assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
-    assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
-    assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
-  }
-
-  @Test
-  public void testGetInputStreamWithKeyValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, 
mock(Config.class));
-
-    KVSerde mockKVSerde = mock(KVSerde.class);
-    Serde mockKeySerde = mock(Serde.class);
-    Serde mockValueSerde = mock(Serde.class);
-    doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
-    doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
-    MessageStream<TestMessageEnvelope> inputStream = 
graph.getInputStream("test-stream-1", mockKVSerde);
-
-    InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
-        (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) 
inputStream).getOperatorSpec();
-    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
-    assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
-    assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
-    assertEquals(mockKeySerde, inputOpSpec.getKeySerde());
-    assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testGetInputStreamWithNullSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, 
mock(Config.class));
-
-    graph.getInputStream("test-stream-1", null);
-  }
-
-  @Test
-  public void testGetInputStreamWithDefaultValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, 
mock(Config.class));
-
-    Serde mockValueSerde = mock(Serde.class);
-    graph.setDefaultSerde(mockValueSerde);
-    MessageStream<TestMessageEnvelope> inputStream = 
graph.getInputStream("test-stream-1");
-
-    InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
-        (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) 
inputStream).getOperatorSpec();
-    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
-    assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
-    assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
-    assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
-    assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
-  }
-
-  @Test
-  public void testGetInputStreamWithDefaultKeyValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, 
mock(Config.class));
-
-    KVSerde mockKVSerde = mock(KVSerde.class);
-    Serde mockKeySerde = mock(Serde.class);
-    Serde mockValueSerde = mock(Serde.class);
-    doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
-    doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
-    graph.setDefaultSerde(mockKVSerde);
-    MessageStream<TestMessageEnvelope> inputStream = 
graph.getInputStream("test-stream-1");
-
-    InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
-        (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) 
inputStream).getOperatorSpec();
-    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
-    assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
-    assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
-    assertEquals(mockKeySerde, inputOpSpec.getKeySerde());
-    assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
-  }
-
-  @Test
-  public void testGetInputStreamWithDefaultDefaultSerde() {
-    // default default serde == user hasn't provided a default serde
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, 
mock(Config.class));
-
-    MessageStream<TestMessageEnvelope> inputStream = 
graph.getInputStream("test-stream-1");
-
-    InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
-        (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) 
inputStream).getOperatorSpec();
-    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
-    assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
-    assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
-    assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
-    assertTrue(inputOpSpec.getValueSerde() instanceof NoOpSerde);
-  }
-
-  @Test
-  public void testGetInputStreamWithRelaxedTypes() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, 
mock(Config.class));
-
-    MessageStream<TestMessageEnvelope> inputStream = 
graph.getInputStream("test-stream-1");
-
-    InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
-        (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) 
inputStream).getOperatorSpec();
-    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
-    assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
-    assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
-  }
-
-  @Test
-  public void testMultipleGetInputStreams() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec1 = mock(StreamSpec.class);
-    StreamSpec mockStreamSpec2 = mock(StreamSpec.class);
-    
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec1);
-    
when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(mockStreamSpec2);
-
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, 
mock(Config.class));
-    MessageStream<Object> inputStream1 = graph.getInputStream("test-stream-1");
-    MessageStream<Object> inputStream2 = graph.getInputStream("test-stream-2");
-
-    InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec1 =
-        (InputOperatorSpec) ((MessageStreamImpl<Object>) 
inputStream1).getOperatorSpec();
-    InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec2 =
-        (InputOperatorSpec) ((MessageStreamImpl<Object>) 
inputStream2).getOperatorSpec();
-
-    assertEquals(graph.getInputOperators().size(), 2);
-    assertEquals(graph.getInputOperators().get(mockStreamSpec1), inputOpSpec1);
-    assertEquals(graph.getInputOperators().get(mockStreamSpec2), inputOpSpec2);
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testGetSameInputStreamTwice() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
-
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, 
mock(Config.class));
-    graph.getInputStream("test-stream-1");
-    // should throw exception
-    graph.getInputStream("test-stream-1");
-  }
-
-  @Test
-  public void testGetOutputStreamWithValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, 
mock(Config.class));
-
-    Serde mockValueSerde = mock(Serde.class);
-    OutputStream<TestMessageEnvelope> outputStream =
-        graph.getOutputStream("test-stream-1", mockValueSerde);
-
-    OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = 
(OutputStreamImpl) outputStream;
-    assertEquals(graph.getOutputStreams().get(mockStreamSpec), 
outputStreamImpl);
-    assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
-    assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
-    assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
-  }
-
-  @Test
-  public void testGetOutputStreamWithKeyValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, 
mock(Config.class));
-    KVSerde mockKVSerde = mock(KVSerde.class);
-    Serde mockKeySerde = mock(Serde.class);
-    Serde mockValueSerde = mock(Serde.class);
-    doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
-    doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
-    graph.setDefaultSerde(mockKVSerde);
-    OutputStream<TestMessageEnvelope> outputStream = 
graph.getOutputStream("test-stream-1", mockKVSerde);
-
-    OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = 
(OutputStreamImpl) outputStream;
-    assertEquals(graph.getOutputStreams().get(mockStreamSpec), 
outputStreamImpl);
-    assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
-    assertEquals(mockKeySerde, outputStreamImpl.getKeySerde());
-    assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testGetOutputStreamWithNullSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, 
mock(Config.class));
-
-    graph.getOutputStream("test-stream-1", null);
-  }
-
-  @Test
-  public void testGetOutputStreamWithDefaultValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-
-    Serde mockValueSerde = mock(Serde.class);
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, 
mock(Config.class));
-    graph.setDefaultSerde(mockValueSerde);
-    OutputStream<TestMessageEnvelope> outputStream = 
graph.getOutputStream("test-stream-1");
-
-    OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = 
(OutputStreamImpl) outputStream;
-    assertEquals(graph.getOutputStreams().get(mockStreamSpec), 
outputStreamImpl);
-    assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
-    assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
-    assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
-  }
-
-  @Test
-  public void testGetOutputStreamWithDefaultKeyValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, 
mock(Config.class));
-    KVSerde mockKVSerde = mock(KVSerde.class);
-    Serde mockKeySerde = mock(Serde.class);
-    Serde mockValueSerde = mock(Serde.class);
-    doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
-    doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
-    graph.setDefaultSerde(mockKVSerde);
-
-    OutputStream<TestMessageEnvelope> outputStream = 
graph.getOutputStream("test-stream-1");
-
-    OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = 
(OutputStreamImpl) outputStream;
-    assertEquals(graph.getOutputStreams().get(mockStreamSpec), 
outputStreamImpl);
-    assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
-    assertEquals(mockKeySerde, outputStreamImpl.getKeySerde());
-    assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
-  }
-
-  @Test
-  public void testGetOutputStreamWithDefaultDefaultSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
-
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, 
mock(Config.class));
-
-    OutputStream<TestMessageEnvelope> outputStream = 
graph.getOutputStream("test-stream-1");
-
-    OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = 
(OutputStreamImpl) outputStream;
-    assertEquals(graph.getOutputStreams().get(mockStreamSpec), 
outputStreamImpl);
-    assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
-    assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
-    assertTrue(outputStreamImpl.getValueSerde() instanceof NoOpSerde);
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testSetDefaultSerdeAfterGettingStreams() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
-
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, 
mock(Config.class));
-    graph.getInputStream("test-stream-1");
-    graph.setDefaultSerde(mock(Serde.class)); // should throw exception
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testSetDefaultSerdeAfterGettingOutputStream() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
-
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, 
mock(Config.class));
-    graph.getOutputStream("test-stream-1");
-    graph.setDefaultSerde(mock(Serde.class)); // should throw exception
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testSetDefaultSerdeAfterGettingIntermediateStream() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
-
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, 
mock(Config.class));
-    graph.getIntermediateStream("test-stream-1", null);
-    graph.setDefaultSerde(mock(Serde.class)); // should throw exception
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testGetSameOutputStreamTwice() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
-
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, 
mock(Config.class));
-    graph.getOutputStream("test-stream-1");
-    graph.getOutputStream("test-stream-1"); // should throw exception
-  }
-
-  @Test
-  public void testGetIntermediateStreamWithValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    Config mockConfig = mock(Config.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    String mockStreamName = "mockStreamName";
-    when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
-
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
-
-    Serde mockValueSerde = mock(Serde.class);
-    IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
-        graph.getIntermediateStream(mockStreamName, mockValueSerde);
-
-    assertEquals(graph.getInputOperators().get(mockStreamSpec), 
intermediateStreamImpl.getOperatorSpec());
-    assertEquals(graph.getOutputStreams().get(mockStreamSpec), 
intermediateStreamImpl.getOutputStream());
-    assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
-    assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() 
instanceof NoOpSerde);
-    assertEquals(mockValueSerde, 
intermediateStreamImpl.getOutputStream().getValueSerde());
-    assertTrue(((InputOperatorSpec) 
intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
-    assertEquals(mockValueSerde, ((InputOperatorSpec) 
intermediateStreamImpl.getOperatorSpec()).getValueSerde());
-  }
-
-  @Test
-  public void testGetIntermediateStreamWithKeyValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    Config mockConfig = mock(Config.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    String mockStreamName = "mockStreamName";
-    when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
-
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
-
-    KVSerde mockKVSerde = mock(KVSerde.class);
-    Serde mockKeySerde = mock(Serde.class);
-    Serde mockValueSerde = mock(Serde.class);
-    doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
-    doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
-    IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
-        graph.getIntermediateStream(mockStreamName, mockKVSerde);
-
-    assertEquals(graph.getInputOperators().get(mockStreamSpec), 
intermediateStreamImpl.getOperatorSpec());
-    assertEquals(graph.getOutputStreams().get(mockStreamSpec), 
intermediateStreamImpl.getOutputStream());
-    assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
-    assertEquals(mockKeySerde, 
intermediateStreamImpl.getOutputStream().getKeySerde());
-    assertEquals(mockValueSerde, 
intermediateStreamImpl.getOutputStream().getValueSerde());
-    assertEquals(mockKeySerde, ((InputOperatorSpec) 
intermediateStreamImpl.getOperatorSpec()).getKeySerde());
-    assertEquals(mockValueSerde, ((InputOperatorSpec) 
intermediateStreamImpl.getOperatorSpec()).getValueSerde());
-  }
-
-  @Test
-  public void testGetIntermediateStreamWithDefaultValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    Config mockConfig = mock(Config.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    String mockStreamName = "mockStreamName";
-    when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
-
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
-
-    Serde mockValueSerde = mock(Serde.class);
-    graph.setDefaultSerde(mockValueSerde);
-    IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
-        graph.getIntermediateStream(mockStreamName, null);
-
-    assertEquals(graph.getInputOperators().get(mockStreamSpec), 
intermediateStreamImpl.getOperatorSpec());
-    assertEquals(graph.getOutputStreams().get(mockStreamSpec), 
intermediateStreamImpl.getOutputStream());
-    assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
-    assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() 
instanceof NoOpSerde);
-    assertEquals(mockValueSerde, 
intermediateStreamImpl.getOutputStream().getValueSerde());
-    assertTrue(((InputOperatorSpec) 
intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
-    assertEquals(mockValueSerde, ((InputOperatorSpec) 
intermediateStreamImpl.getOperatorSpec()).getValueSerde());
-  }
-
-  @Test
-  public void testGetIntermediateStreamWithDefaultKeyValueSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    Config mockConfig = mock(Config.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    String mockStreamName = "mockStreamName";
-    when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
-
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
-
-    KVSerde mockKVSerde = mock(KVSerde.class);
-    Serde mockKeySerde = mock(Serde.class);
-    Serde mockValueSerde = mock(Serde.class);
-    doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
-    doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
-    graph.setDefaultSerde(mockKVSerde);
-    IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
-        graph.getIntermediateStream(mockStreamName, null);
-
-    assertEquals(graph.getInputOperators().get(mockStreamSpec), 
intermediateStreamImpl.getOperatorSpec());
-    assertEquals(graph.getOutputStreams().get(mockStreamSpec), 
intermediateStreamImpl.getOutputStream());
-    assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
-    assertEquals(mockKeySerde, 
intermediateStreamImpl.getOutputStream().getKeySerde());
-    assertEquals(mockValueSerde, 
intermediateStreamImpl.getOutputStream().getValueSerde());
-    assertEquals(mockKeySerde, ((InputOperatorSpec) 
intermediateStreamImpl.getOperatorSpec()).getKeySerde());
-    assertEquals(mockValueSerde, ((InputOperatorSpec) 
intermediateStreamImpl.getOperatorSpec()).getValueSerde());
-  }
-
-  @Test
-  public void testGetIntermediateStreamWithDefaultDefaultSerde() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    Config mockConfig = mock(Config.class);
-    StreamSpec mockStreamSpec = mock(StreamSpec.class);
-    String mockStreamName = "mockStreamName";
-    when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
-
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
-    IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
-        graph.getIntermediateStream(mockStreamName, null);
-
-    assertEquals(graph.getInputOperators().get(mockStreamSpec), 
intermediateStreamImpl.getOperatorSpec());
-    assertEquals(graph.getOutputStreams().get(mockStreamSpec), 
intermediateStreamImpl.getOutputStream());
-    assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
-    assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() 
instanceof NoOpSerde);
-    assertTrue(intermediateStreamImpl.getOutputStream().getValueSerde() 
instanceof NoOpSerde);
-    assertTrue(((InputOperatorSpec) 
intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
-    assertTrue(((InputOperatorSpec) 
intermediateStreamImpl.getOperatorSpec()).getValueSerde() instanceof NoOpSerde);
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testGetSameIntermediateStreamTwice() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
-
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, 
mock(Config.class));
-    graph.getIntermediateStream("test-stream-1", mock(Serde.class));
-    graph.getIntermediateStream("test-stream-1", mock(Serde.class));
-  }
-
-  @Test
-  public void testGetNextOpIdIncrementsId() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    Config mockConfig = mock(Config.class);
-    when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
-    when(mockConfig.get(eq(JobConfig.JOB_ID()), 
anyString())).thenReturn("1234");
-
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
-    assertEquals("jobName-1234-merge-0", graph.getNextOpId(OpCode.MERGE, 
null));
-    assertEquals("jobName-1234-join-customName", 
graph.getNextOpId(OpCode.JOIN, "customName"));
-    assertEquals("jobName-1234-map-2", graph.getNextOpId(OpCode.MAP, null));
-  }
-
-  @Test(expected = SamzaException.class)
-  public void testGetNextOpIdRejectsDuplicates() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    Config mockConfig = mock(Config.class);
-    when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
-    when(mockConfig.get(eq(JobConfig.JOB_ID()), 
anyString())).thenReturn("1234");
-
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
-    assertEquals("jobName-1234-join-customName", 
graph.getNextOpId(OpCode.JOIN, "customName"));
-    graph.getNextOpId(OpCode.JOIN, "customName"); // should throw
-  }
-
-  @Test
-  public void testUserDefinedIdValidation() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    Config mockConfig = mock(Config.class);
-    when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
-    when(mockConfig.get(eq(JobConfig.JOB_ID()), 
anyString())).thenReturn("1234");
-
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
-
-    // null and empty userDefinedIDs should fall back to autogenerated IDs.
-    try {
-      graph.getNextOpId(OpCode.FILTER, null);
-      graph.getNextOpId(OpCode.FILTER, "");
-      graph.getNextOpId(OpCode.FILTER, " ");
-      graph.getNextOpId(OpCode.FILTER, "\t");
-    } catch (SamzaException e) {
-      Assert.fail("Received an error with a null or empty operator ID instead 
of defaulting to auto-generated ID.");
-    }
-
-    List<String> validOpIds = ImmutableList.of("op.id", "op_id", "op-id", 
"1000", "op_1", "OP_ID");
-    for (String validOpId: validOpIds) {
-      try {
-        graph.getNextOpId(OpCode.FILTER, validOpId);
-      } catch (Exception e) {
-        Assert.fail("Received an exception with a valid operator ID: " + 
validOpId);
-      }
-    }
-
-    List<String> invalidOpIds = ImmutableList.of("op id", "op#id");
-    for (String invalidOpId: invalidOpIds) {
-      try {
-        graph.getNextOpId(OpCode.FILTER, invalidOpId);
-        Assert.fail("Did not receive an exception with an invalid operator ID: 
" + invalidOpId);
-      } catch (SamzaException e) { }
-    }
-  }
-
-  @Test
-  public void testGetInputStreamPreservesInsertionOrder() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    Config mockConfig = mock(Config.class);
-
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
-
-    StreamSpec testStreamSpec1 = new StreamSpec("test-stream-1", 
"physical-stream-1", "test-system");
-    
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec1);
-
-    StreamSpec testStreamSpec2 = new StreamSpec("test-stream-2", 
"physical-stream-2", "test-system");
-    
when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(testStreamSpec2);
-
-    StreamSpec testStreamSpec3 = new StreamSpec("test-stream-3", 
"physical-stream-3", "test-system");
-    
when(mockRunner.getStreamSpec("test-stream-3")).thenReturn(testStreamSpec3);
-
-    graph.getInputStream("test-stream-1");
-    graph.getInputStream("test-stream-2");
-    graph.getInputStream("test-stream-3");
-
-    List<InputOperatorSpec> inputSpecs = new 
ArrayList<>(graph.getInputOperators().values());
-    Assert.assertEquals(inputSpecs.size(), 3);
-    Assert.assertEquals(inputSpecs.get(0).getStreamSpec(), testStreamSpec1);
-    Assert.assertEquals(inputSpecs.get(1).getStreamSpec(), testStreamSpec2);
-    Assert.assertEquals(inputSpecs.get(2).getStreamSpec(), testStreamSpec3);
-  }
-
-  @Test
-  public void testGetTable() {
-    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    Config mockConfig = mock(Config.class);
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
-
-    BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class);
-    when(mockTableDescriptor.getTableSpec()).thenReturn(
-        new TableSpec("t1", KVSerde.of(new NoOpSerde(), new NoOpSerde()), "", 
new HashMap<>()));
-    Assert.assertNotNull(graph.getTable(mockTableDescriptor));
-  }
-}

Reply via email to