http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
 
b/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
index 86a553f..db7079c 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
@@ -19,12 +19,12 @@
 package org.apache.samza.operators.spec;
 
 import java.util.Collection;
+import java.util.Map;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
-import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OperatorSpecGraph;
-import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.operators.TimerRegistry;
 import org.apache.samza.operators.descriptors.GenericInputDescriptor;
 import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
@@ -36,9 +36,9 @@ import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.Serde;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.internal.util.reflection.Whitebox;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -57,7 +57,6 @@ public class TestPartitionByOperatorSpec {
   private final String testJobName = "testJob";
   private final String testJobId = "1";
   private final String testRepartitionedStreamName = "parByKey";
-  private StreamGraphSpec graphSpec = null;
 
   class TimerMapFn implements MapFunction<Object, String>, 
TimerFunction<String, Object> {
 
@@ -99,25 +98,28 @@ public class TestPartitionByOperatorSpec {
   public void setup() {
     when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn(testJobName);
     when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn(testJobId);
-    graphSpec = new StreamGraphSpec(mockConfig);
   }
 
   @Test
   public void testPartitionBy() {
-    MessageStream inputStream = graphSpec.getInputStream(testinputDescriptor);
     MapFunction<Object, String> keyFn = m -> m.toString();
     MapFunction<Object, Object> valueFn = m -> m;
     KVSerde<Object, Object> partitionBySerde = KVSerde.of(new NoOpSerde<>(), 
new NoOpSerde<>());
-    MessageStream<KV<String, Object>>
-        reparStream = inputStream.partitionBy(keyFn, valueFn, 
partitionBySerde, testRepartitionedStreamName);
-    InputOperatorSpec inputOpSpec = (InputOperatorSpec) 
Whitebox.getInternalState(reparStream, "operatorSpec");
-    assertEquals(inputOpSpec.getStreamId(), 
String.format("%s-%s-partition_by-%s", testJobName, testJobId, 
testRepartitionedStreamName));
+    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(appDesc -> {
+        MessageStream inputStream = 
appDesc.getInputStream(testinputDescriptor);
+        inputStream.partitionBy(keyFn, valueFn, partitionBySerde, 
testRepartitionedStreamName);
+      }, mockConfig);
+    assertEquals(2, streamAppDesc.getInputOperators().size());
+    Map<String, InputOperatorSpec> inputOpSpecs = 
streamAppDesc.getInputOperators();
+    
assertTrue(inputOpSpecs.keySet().contains(String.format("%s-%s-partition_by-%s",
 testJobName, testJobId, testRepartitionedStreamName)));
+    InputOperatorSpec inputOpSpec = 
inputOpSpecs.get(String.format("%s-%s-partition_by-%s", testJobName, testJobId, 
testRepartitionedStreamName));
+    assertEquals(String.format("%s-%s-partition_by-%s", testJobName, 
testJobId, testRepartitionedStreamName), inputOpSpec.getStreamId());
     assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
     assertTrue(inputOpSpec.getValueSerde() instanceof NoOpSerde);
     assertTrue(inputOpSpec.isKeyed());
     assertNull(inputOpSpec.getTimerFn());
     assertNull(inputOpSpec.getWatermarkFn());
-    InputOperatorSpec originInputSpec = (InputOperatorSpec) 
Whitebox.getInternalState(inputStream, "operatorSpec");
+    InputOperatorSpec originInputSpec = 
inputOpSpecs.get(testinputDescriptor.getStreamId());
     assertTrue(originInputSpec.getRegisteredOperatorSpecs().toArray()[0] 
instanceof PartitionByOperatorSpec);
     PartitionByOperatorSpec reparOpSpec  = (PartitionByOperatorSpec) 
originInputSpec.getRegisteredOperatorSpecs().toArray()[0];
     assertEquals(reparOpSpec.getOpId(), String.format("%s-%s-partition_by-%s", 
testJobName, testJobId, testRepartitionedStreamName));
@@ -130,19 +132,21 @@ public class TestPartitionByOperatorSpec {
 
   @Test
   public void testPartitionByWithNoSerde() {
-    MessageStream inputStream = graphSpec.getInputStream(testinputDescriptor);
     MapFunction<Object, String> keyFn = m -> m.toString();
     MapFunction<Object, Object> valueFn = m -> m;
-    MessageStream<KV<String, Object>>
-        reparStream = inputStream.partitionBy(keyFn, valueFn, 
testRepartitionedStreamName);
-    InputOperatorSpec inputOpSpec = (InputOperatorSpec) 
Whitebox.getInternalState(reparStream, "operatorSpec");
-    assertEquals(inputOpSpec.getStreamId(), 
String.format("%s-%s-partition_by-%s", testJobName, testJobId, 
testRepartitionedStreamName));
+    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(appDesc -> {
+        MessageStream inputStream = 
appDesc.getInputStream(testinputDescriptor);
+        inputStream.partitionBy(keyFn, valueFn, testRepartitionedStreamName);
+      }, mockConfig);
+    InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(
+        String.format("%s-%s-partition_by-%s", testJobName, testJobId, 
testRepartitionedStreamName));
+    assertNotNull(inputOpSpec);
     assertNull(inputOpSpec.getKeySerde());
     assertNull(inputOpSpec.getValueSerde());
     assertTrue(inputOpSpec.isKeyed());
     assertNull(inputOpSpec.getTimerFn());
     assertNull(inputOpSpec.getWatermarkFn());
-    InputOperatorSpec originInputSpec = (InputOperatorSpec) 
Whitebox.getInternalState(inputStream, "operatorSpec");
+    InputOperatorSpec originInputSpec = 
streamAppDesc.getInputOperators().get(testinputDescriptor.getStreamId());
     assertTrue(originInputSpec.getRegisteredOperatorSpecs().toArray()[0] 
instanceof PartitionByOperatorSpec);
     PartitionByOperatorSpec reparOpSpec  = (PartitionByOperatorSpec) 
originInputSpec.getRegisteredOperatorSpecs().toArray()[0];
     assertEquals(reparOpSpec.getOpId(), String.format("%s-%s-partition_by-%s", 
testJobName, testJobId, testRepartitionedStreamName));
@@ -155,9 +159,11 @@ public class TestPartitionByOperatorSpec {
 
   @Test
   public void testCopy() {
-    MessageStream inputStream = graphSpec.getInputStream(testinputDescriptor);
-    inputStream.partitionBy(m -> m.toString(), m -> m, 
testRepartitionedStreamName);
-    OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
+    StreamApplicationDescriptorImpl streamAppDesc = new 
StreamApplicationDescriptorImpl(appDesc -> {
+        MessageStream inputStream = 
appDesc.getInputStream(testinputDescriptor);
+        inputStream.partitionBy(m -> m.toString(), m -> m, 
testRepartitionedStreamName);
+      }, mockConfig);
+    OperatorSpecGraph specGraph = streamAppDesc.getOperatorSpecGraph();
     OperatorSpecGraph clonedGraph = specGraph.clone();
     OperatorSpecTestUtils.assertClonedGraph(specGraph, clonedGraph);
   }
@@ -165,28 +171,36 @@ public class TestPartitionByOperatorSpec {
   @Test(expected = IllegalArgumentException.class)
   public void testTimerFunctionAsKeyFn() {
     TimerMapFn keyFn = new TimerMapFn();
-    MessageStream inputStream = graphSpec.getInputStream(testinputDescriptor);
-    inputStream.partitionBy(keyFn, m -> m, "parByKey");
+    new StreamApplicationDescriptorImpl(appDesc -> {
+        MessageStream<Object> inputStream = 
appDesc.getInputStream(testinputDescriptor);
+        inputStream.partitionBy(keyFn, m -> m, "parByKey");
+      }, mockConfig);
   }
 
   @Test(expected = IllegalArgumentException.class)
   public void testWatermarkFunctionAsKeyFn() {
     WatermarkMapFn keyFn = new WatermarkMapFn();
-    MessageStream inputStream = graphSpec.getInputStream(testinputDescriptor);
-    inputStream.partitionBy(keyFn, m -> m, "parByKey");
+    new StreamApplicationDescriptorImpl(appDesc -> {
+        MessageStream<Object> inputStream = 
appDesc.getInputStream(testinputDescriptor);
+        inputStream.partitionBy(keyFn, m -> m, "parByKey");
+      }, mockConfig);
   }
 
   @Test(expected = IllegalArgumentException.class)
   public void testTimerFunctionAsValueFn() {
     TimerMapFn valueFn = new TimerMapFn();
-    MessageStream inputStream = graphSpec.getInputStream(testinputDescriptor);
-    inputStream.partitionBy(m -> m.toString(), valueFn, "parByKey");
+    new StreamApplicationDescriptorImpl(appDesc -> {
+        MessageStream<Object> inputStream = 
appDesc.getInputStream(testinputDescriptor);
+        inputStream.partitionBy(m -> m.toString(), valueFn, "parByKey");
+      }, mockConfig);
   }
 
   @Test(expected = IllegalArgumentException.class)
   public void testWatermarkFunctionAsValueFn() {
     WatermarkMapFn valueFn = new WatermarkMapFn();
-    MessageStream inputStream = graphSpec.getInputStream(testinputDescriptor);
-    inputStream.partitionBy(m -> m.toString(), valueFn, "parByKey");
+    new StreamApplicationDescriptorImpl(appDesc -> {
+        MessageStream<Object> inputStream = 
appDesc.getInputStream(testinputDescriptor);
+        inputStream.partitionBy(m -> m.toString(), valueFn, "parByKey");
+      }, mockConfig);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java 
b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
index 052aa29..673015a 100644
--- 
a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
+++ 
b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
@@ -19,6 +19,13 @@
 package org.apache.samza.processor;
 
 import com.google.common.collect.ImmutableMap;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.samza.SamzaContainerStatus;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
@@ -30,21 +37,18 @@ import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.processor.StreamProcessor.State;
+import org.apache.samza.runtime.ProcessorLifecycleListener;
 import org.apache.samza.task.StreamTask;
 import org.apache.samza.task.StreamTaskFactory;
+import org.apache.samza.task.TaskFactory;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 import org.mockito.Mockito;
 import org.powermock.api.mockito.PowerMockito;
+
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
@@ -54,7 +58,7 @@ import static org.mockito.Mockito.when;
 public class TestStreamProcessor {
   private ConcurrentMap<ListenerCallback, Boolean> processorListenerState;
   private enum ListenerCallback {
-    ON_START, ON_SHUTDOWN, ON_FAILURE
+    BEFORE_START, AFTER_START, AFTER_STOP, AFTER_FAILURE
   }
 
   @Before
@@ -62,9 +66,10 @@ public class TestStreamProcessor {
     Mockito.reset();
     processorListenerState = new ConcurrentHashMap<ListenerCallback, 
Boolean>() {
       {
-        put(ListenerCallback.ON_START, false);
-        put(ListenerCallback.ON_FAILURE, false);
-        put(ListenerCallback.ON_SHUTDOWN, false);
+        put(ListenerCallback.BEFORE_START, false);
+        put(ListenerCallback.AFTER_START, false);
+        put(ListenerCallback.AFTER_STOP, false);
+        put(ListenerCallback.AFTER_FAILURE, false);
       }
     };
   }
@@ -83,7 +88,7 @@ public class TestStreamProcessor {
         Config config,
         Map<String, MetricsReporter> customMetricsReporters,
         StreamTaskFactory streamTaskFactory,
-        StreamProcessorLifecycleListener processorListener,
+        ProcessorLifecycleListener processorListener,
         JobCoordinator jobCoordinator,
         SamzaContainer container) {
       super(config, customMetricsReporters, streamTaskFactory, 
processorListener, jobCoordinator);
@@ -144,22 +149,27 @@ public class TestStreamProcessor {
         new MapConfig(),
         new HashMap<>(),
         mock(StreamTaskFactory.class),
-        new StreamProcessorLifecycleListener() {
+        new ProcessorLifecycleListener() {
           @Override
-          public void onStart() {
-            processorListenerState.put(ListenerCallback.ON_START, true);
+          public void afterStart() {
+            processorListenerState.put(ListenerCallback.AFTER_START, true);
             processorListenerStart.countDown();
           }
 
           @Override
-          public void onShutdown() {
-            processorListenerState.put(ListenerCallback.ON_SHUTDOWN, true);
+          public void afterFailure(Throwable t) {
+            processorListenerState.put(ListenerCallback.AFTER_FAILURE, true);
+          }
+
+          @Override
+          public void afterStop() {
+            processorListenerState.put(ListenerCallback.AFTER_STOP, true);
             processorListenerStop.countDown();
           }
 
           @Override
-          public void onFailure(Throwable t) {
-            processorListenerState.put(ListenerCallback.ON_FAILURE, true);
+          public void beforeStart() {
+            processorListenerState.put(ListenerCallback.BEFORE_START, true);
           }
         },
         mockJobCoordinator,
@@ -193,7 +203,7 @@ public class TestStreamProcessor {
     processor.start();
     processorListenerStart.await();
 
-    Assert.assertEquals(SamzaContainerStatus.STARTED, 
processor.getContainerStatus());
+    assertEquals(SamzaContainerStatus.STARTED, processor.getContainerStatus());
 
     // This block is required for the mockRunloop is actually start.
     // Otherwise, processor.stop gets triggered before mockRunloop begins to 
block
@@ -204,9 +214,10 @@ public class TestStreamProcessor {
     processorListenerStop.await();
 
     // Assertions on which callbacks are expected to be invoked
-    Assert.assertTrue(processorListenerState.get(ListenerCallback.ON_START));
-    
Assert.assertTrue(processorListenerState.get(ListenerCallback.ON_SHUTDOWN));
-    
Assert.assertFalse(processorListenerState.get(ListenerCallback.ON_FAILURE));
+    
Assert.assertTrue(processorListenerState.get(ListenerCallback.BEFORE_START));
+    
Assert.assertTrue(processorListenerState.get(ListenerCallback.AFTER_START));
+    Assert.assertTrue(processorListenerState.get(ListenerCallback.AFTER_STOP));
+    
Assert.assertFalse(processorListenerState.get(ListenerCallback.AFTER_FAILURE));
   }
 
   /**
@@ -215,7 +226,7 @@ public class TestStreamProcessor {
    *
    * Assertions:
    * - JobCoordinator has been stopped from the JobCoordinatorListener callback
-   * - StreamProcessorLifecycleListener#onFailure(Throwable) has been invoked
+   * - ProcessorLifecycleListener#afterStop(Throwable) has been invoked w/ 
non-null Throwable
    */
   @Test
   public void testContainerFailureCorrectlyStopsProcessor() throws 
InterruptedException {
@@ -242,20 +253,25 @@ public class TestStreamProcessor {
         new MapConfig(),
         new HashMap<>(),
         mock(StreamTaskFactory.class),
-        new StreamProcessorLifecycleListener() {
+        new ProcessorLifecycleListener() {
+          @Override
+          public void beforeStart() {
+            processorListenerState.put(ListenerCallback.BEFORE_START, true);
+          }
+
           @Override
-          public void onStart() {
-            processorListenerState.put(ListenerCallback.ON_START, true);
+          public void afterStart() {
+            processorListenerState.put(ListenerCallback.AFTER_START, true);
           }
 
           @Override
-          public void onShutdown() {
-            processorListenerState.put(ListenerCallback.ON_SHUTDOWN, true);
+          public void afterStop() {
+            processorListenerState.put(ListenerCallback.AFTER_STOP, true);
           }
 
           @Override
-          public void onFailure(Throwable t) {
-            processorListenerState.put(ListenerCallback.ON_FAILURE, true);
+          public void afterFailure(Throwable t) {
+            processorListenerState.put(ListenerCallback.AFTER_FAILURE, true);
             actualThrowable.getAndSet(t);
             processorListenerFailed.countDown();
           }
@@ -294,27 +310,28 @@ public class TestStreamProcessor {
     Assert.assertTrue(
         "Container failed and processor listener failed was not invoked within 
timeout!",
         processorListenerFailed.await(30, TimeUnit.SECONDS));
-    Assert.assertEquals(expectedThrowable, actualThrowable.get());
+    assertEquals(expectedThrowable, actualThrowable.get());
 
-    
Assert.assertFalse(processorListenerState.get(ListenerCallback.ON_SHUTDOWN));
-    Assert.assertTrue(processorListenerState.get(ListenerCallback.ON_START));
-    Assert.assertTrue(processorListenerState.get(ListenerCallback.ON_FAILURE));
+    
Assert.assertTrue(processorListenerState.get(ListenerCallback.BEFORE_START));
+    
Assert.assertTrue(processorListenerState.get(ListenerCallback.AFTER_START));
+    
Assert.assertFalse(processorListenerState.get(ListenerCallback.AFTER_STOP));
+    
Assert.assertTrue(processorListenerState.get(ListenerCallback.AFTER_FAILURE));
   }
 
   @Test
   public void testStartOperationShouldBeIdempotent() {
     JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class);
     Mockito.doNothing().when(mockJobCoordinator).start();
-    StreamProcessorLifecycleListener lifecycleListener = 
Mockito.mock(StreamProcessorLifecycleListener.class);
+    ProcessorLifecycleListener lifecycleListener = 
Mockito.mock(ProcessorLifecycleListener.class);
     StreamProcessor streamProcessor = new StreamProcessor(new MapConfig(), new 
HashMap<>(), null, lifecycleListener, mockJobCoordinator);
-    Assert.assertEquals(State.NEW, streamProcessor.getState());
+    assertEquals(State.NEW, streamProcessor.getState());
     streamProcessor.start();
 
-    Assert.assertEquals(State.STARTED, streamProcessor.getState());
+    assertEquals(State.STARTED, streamProcessor.getState());
 
     streamProcessor.start();
 
-    Assert.assertEquals(State.STARTED, streamProcessor.getState());
+    assertEquals(State.STARTED, streamProcessor.getState());
 
     Mockito.verify(mockJobCoordinator, Mockito.times(1)).start();
   }
@@ -322,7 +339,7 @@ public class TestStreamProcessor {
   @Test
   public void testOnJobModelExpiredShouldMakeCorrectStateTransitions() {
     JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class);
-    StreamProcessorLifecycleListener lifecycleListener = 
Mockito.mock(StreamProcessorLifecycleListener.class);
+    ProcessorLifecycleListener lifecycleListener = 
Mockito.mock(ProcessorLifecycleListener.class);
     SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class);
     MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
     StreamProcessor streamProcessor = new StreamProcessor(config, new 
HashMap<>(), null, lifecycleListener, mockJobCoordinator);
@@ -334,11 +351,11 @@ public class TestStreamProcessor {
 
     streamProcessor.start();
 
-    Assert.assertEquals(State.STARTED, streamProcessor.getState());
+    assertEquals(State.STARTED, streamProcessor.getState());
 
     streamProcessor.jobCoordinatorListener.onJobModelExpired();
 
-    Assert.assertEquals(State.IN_REBALANCE, streamProcessor.getState());
+    assertEquals(State.IN_REBALANCE, streamProcessor.getState());
 
     /**
      * When there's initialized SamzaContainer in StreamProcessor and the 
container shutdown
@@ -357,7 +374,7 @@ public class TestStreamProcessor {
 
     streamProcessor.jobCoordinatorListener.onJobModelExpired();
 
-    Assert.assertEquals(State.STOPPING, streamProcessor.getState());
+    assertEquals(State.STOPPING, streamProcessor.getState());
     Mockito.verify(mockSamzaContainer, Mockito.times(1)).shutdown();
     Mockito.verify(mockJobCoordinator, Mockito.times(1)).stop();
 
@@ -366,13 +383,13 @@ public class TestStreamProcessor {
 
     streamProcessor.jobCoordinatorListener.onJobModelExpired();
 
-    Assert.assertEquals(State.IN_REBALANCE, streamProcessor.state);
+    assertEquals(State.IN_REBALANCE, streamProcessor.state);
   }
 
   @Test
   public void testOnNewJobModelShouldResultInValidStateTransitions() throws 
Exception {
     JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class);
-    StreamProcessorLifecycleListener lifecycleListener = 
Mockito.mock(StreamProcessorLifecycleListener.class);
+    ProcessorLifecycleListener lifecycleListener = 
Mockito.mock(ProcessorLifecycleListener.class);
     SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class);
     MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
     StreamProcessor streamProcessor = PowerMockito.spy(new 
StreamProcessor(config, new HashMap<>(), null, lifecycleListener, 
mockJobCoordinator));
@@ -389,7 +406,7 @@ public class TestStreamProcessor {
   @Test
   public void testStopShouldBeIdempotent() {
     JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class);
-    StreamProcessorLifecycleListener lifecycleListener = 
Mockito.mock(StreamProcessorLifecycleListener.class);
+    ProcessorLifecycleListener lifecycleListener = 
Mockito.mock(ProcessorLifecycleListener.class);
     SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class);
     MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
     StreamProcessor streamProcessor = PowerMockito.spy(new 
StreamProcessor(config, new HashMap<>(), null, lifecycleListener, 
mockJobCoordinator));
@@ -405,13 +422,13 @@ public class TestStreamProcessor {
 
     streamProcessor.stop();
 
-    Assert.assertEquals(State.STOPPING, streamProcessor.state);
+    assertEquals(State.STOPPING, streamProcessor.state);
   }
 
   @Test
   public void testCoordinatorFailureShouldStopTheStreamProcessor() {
     JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class);
-    StreamProcessorLifecycleListener lifecycleListener = 
Mockito.mock(StreamProcessorLifecycleListener.class);
+    ProcessorLifecycleListener lifecycleListener = 
Mockito.mock(ProcessorLifecycleListener.class);
     SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class);
     MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
     StreamProcessor streamProcessor = new StreamProcessor(config, new 
HashMap<>(), null, lifecycleListener, mockJobCoordinator);
@@ -425,22 +442,38 @@ public class TestStreamProcessor {
     Mockito.when(mockSamzaContainer.hasStopped()).thenReturn(false);
 
 
-    Assert.assertEquals(State.STOPPED, streamProcessor.state);
-    Mockito.verify(lifecycleListener).onFailure(failureException);
+    assertEquals(State.STOPPED, streamProcessor.state);
+    Mockito.verify(lifecycleListener).afterFailure(failureException);
     Mockito.verify(mockSamzaContainer).shutdown();
   }
 
   @Test
   public void testCoordinatorStopShouldStopTheStreamProcessor() {
     JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class);
-    StreamProcessorLifecycleListener lifecycleListener = 
Mockito.mock(StreamProcessorLifecycleListener.class);
+    ProcessorLifecycleListener lifecycleListener = 
Mockito.mock(ProcessorLifecycleListener.class);
     MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
     StreamProcessor streamProcessor = new StreamProcessor(config, new 
HashMap<>(), null, lifecycleListener, mockJobCoordinator);
 
     streamProcessor.state = State.RUNNING;
     streamProcessor.jobCoordinatorListener.onCoordinatorStop();
 
-    Assert.assertEquals(State.STOPPED, streamProcessor.state);
-    Mockito.verify(lifecycleListener).onShutdown();
+    assertEquals(State.STOPPED, streamProcessor.state);
+    Mockito.verify(lifecycleListener).afterStop();
+  }
+
+  @Test
+  public void testStreamProcessorWithStreamProcessorListenerFactory() {
+    AtomicReference<MockStreamProcessorLifecycleListener> mockListener = new 
AtomicReference<>();
+    StreamProcessor streamProcessor = new StreamProcessor(mock(Config.class), 
new HashMap<>(), mock(TaskFactory.class),
+        sp -> mockListener.updateAndGet(old -> new 
MockStreamProcessorLifecycleListener(sp)), mock(JobCoordinator.class));
+    assertEquals(streamProcessor, mockListener.get().processor);
+  }
+
+  class MockStreamProcessorLifecycleListener implements 
ProcessorLifecycleListener {
+    final StreamProcessor processor;
+
+    MockStreamProcessorLifecycleListener(StreamProcessor processor) {
+      this.processor = processor;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
 
b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
index 7e6433c..cfa2680 100644
--- 
a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
+++ 
b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
@@ -18,13 +18,15 @@
  */
 package org.apache.samza.runtime;
 
-import org.apache.samza.application.StreamApplication;
+import java.time.Duration;
+import org.apache.samza.application.SamzaApplication;
+import org.apache.samza.application.MockStreamApplication;
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.job.ApplicationStatus;
-import org.apache.samza.operators.StreamGraph;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
 
 public class TestApplicationRunnerMain {
@@ -37,8 +39,8 @@ public class TestApplicationRunnerMain {
         "org.apache.samza.config.factories.PropertiesConfigFactory",
         "--config-path",
         getClass().getResource("/test.properties").getPath(),
-        "-config", ApplicationRunnerMain.STREAM_APPLICATION_CLASS_CONFIG + 
"=org.apache.samza.runtime.TestApplicationRunnerMain$TestStreamApplicationDummy",
-        "-config", 
"app.runner.class=org.apache.samza.runtime.TestApplicationRunnerMain$TestApplicationRunnerInvocationCounts"
+        "-config", String.format("%s=%s", ApplicationConfig.APP_CLASS, 
MockStreamApplication.class.getName()),
+        "-config", String.format("app.runner.class=%s", 
TestApplicationRunnerInvocationCounts.class.getName()),
     });
 
     assertEquals(1, TestApplicationRunnerInvocationCounts.runCount);
@@ -52,8 +54,8 @@ public class TestApplicationRunnerMain {
         "org.apache.samza.config.factories.PropertiesConfigFactory",
         "--config-path",
         getClass().getResource("/test.properties").getPath(),
-        "-config", ApplicationRunnerMain.STREAM_APPLICATION_CLASS_CONFIG + 
"=org.apache.samza.runtime.TestApplicationRunnerMain$TestStreamApplicationDummy",
-        "-config", 
"app.runner.class=org.apache.samza.runtime.TestApplicationRunnerMain$TestApplicationRunnerInvocationCounts",
+        "-config", String.format("%s=%s", ApplicationConfig.APP_CLASS, 
MockStreamApplication.class.getName()),
+        "-config", String.format("app.runner.class=%s", 
TestApplicationRunnerInvocationCounts.class.getName()),
         "--operation=kill"
     });
 
@@ -68,50 +70,47 @@ public class TestApplicationRunnerMain {
         "org.apache.samza.config.factories.PropertiesConfigFactory",
         "--config-path",
         getClass().getResource("/test.properties").getPath(),
-        "-config", ApplicationRunnerMain.STREAM_APPLICATION_CLASS_CONFIG + 
"=org.apache.samza.runtime.TestApplicationRunnerMain$TestStreamApplicationDummy",
-        "-config", 
"app.runner.class=org.apache.samza.runtime.TestApplicationRunnerMain$TestApplicationRunnerInvocationCounts",
+        "-config", String.format("%s=%s", ApplicationConfig.APP_CLASS, 
MockStreamApplication.class.getName()),
+        "-config", String.format("app.runner.class=%s", 
TestApplicationRunnerInvocationCounts.class.getName()),
         "--operation=status"
     });
 
     assertEquals(1, TestApplicationRunnerInvocationCounts.statusCount);
   }
 
-  public static class TestApplicationRunnerInvocationCounts extends 
AbstractApplicationRunner {
+  public static class TestApplicationRunnerInvocationCounts implements 
ApplicationRunner {
     protected static int runCount = 0;
     protected static int killCount = 0;
     protected static int statusCount = 0;
 
-    public TestApplicationRunnerInvocationCounts(Config config) {
-      super(config);
+    public TestApplicationRunnerInvocationCounts(SamzaApplication userApp, 
Config config) {
     }
 
     @Override
-    public void runTask() {
-      throw new UnsupportedOperationException("runTask() not supported in this 
test");
-    }
-
-    @Override
-    public void run(StreamApplication streamApp) {
+    public void run() {
       runCount++;
     }
 
     @Override
-    public void kill(StreamApplication streamApp) {
+    public void kill() {
       killCount++;
     }
 
     @Override
-    public ApplicationStatus status(StreamApplication streamApp) {
+    public ApplicationStatus status() {
       statusCount++;
       return ApplicationStatus.Running;
     }
-  }
-
-  public static class TestStreamApplicationDummy implements StreamApplication {
 
     @Override
-    public void init(StreamGraph graph, Config config) {
+    public void waitForFinish() {
+      waitForFinish(Duration.ofSeconds(0));
+    }
 
+    @Override
+    public boolean waitForFinish(Duration timeout) {
+      return false;
     }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
 
b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index 0335913..19ee74f 100644
--- 
a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ 
b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -19,235 +19,138 @@
 
 package org.apache.samza.runtime;
 
-import com.google.common.collect.ImmutableList;
-
 import java.time.Duration;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
+import org.apache.samza.application.ApplicationDescriptor;
+import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.SamzaApplication;
+import org.apache.samza.application.ApplicationDescriptorUtil;
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.TaskApplication;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.TaskConfig;
-import org.apache.samza.coordinator.CoordinationUtils;
-import org.apache.samza.coordinator.CoordinationUtilsFactory;
-import org.apache.samza.coordinator.DistributedLockWithState;
-import org.apache.samza.execution.ExecutionPlan;
-import org.apache.samza.execution.StreamManager;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.processor.StreamProcessor;
-import org.apache.samza.processor.StreamProcessorLifecycleListener;
-import org.apache.samza.system.StreamSpec;
+import org.apache.samza.execution.LocalJobPlanner;
+import org.apache.samza.task.IdentityStreamTask;
+import org.apache.samza.task.StreamTaskFactory;
+import org.junit.Before;
 import org.junit.Test;
-import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyLong;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.doReturn;
 
 
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(LocalApplicationRunner.class)
 public class TestLocalApplicationRunner {
 
-  private static final String PLAN_JSON =
-      "{" + "\"jobs\":[{" + "\"jobName\":\"test-application\"," + 
"\"jobId\":\"1\"," + "\"operatorGraph\":{"
-          + "\"intermediateStreams\":{%s}," + 
"\"applicationName\":\"test-application\",\"applicationId\":\"1\"}";
-  private static final String STREAM_SPEC_JSON_FORMAT =
-      "\"%s\":{" + "\"streamSpec\":{" + "\"id\":\"%s\"," + 
"\"systemName\":\"%s\"," + "\"physicalName\":\"%s\","
-          + "\"partitionCount\":2}," + "\"sourceJobs\":[\"test-app\"]," + 
"\"targetJobs\":[\"test-target-app\"]},";
-
-  @Test
-  public void testStreamCreation()
-      throws Exception {
-    Config config = new MapConfig(new HashMap<>());
-    LocalApplicationRunner runner = spy(new LocalApplicationRunner(config));
-    StreamApplication app = mock(StreamApplication.class);
-
-    StreamManager streamManager = mock(StreamManager.class);
-    
doReturn(streamManager).when(runner).buildAndStartStreamManager(any(Config.class));
-
-    ExecutionPlan plan = mock(ExecutionPlan.class);
-    
when(plan.getIntermediateStreams()).thenReturn(Collections.singletonList(new 
StreamSpec("test-stream", "test-stream", "test-system")));
-    when(plan.getPlanAsJson()).thenReturn("");
-    when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new 
JobConfig(config)));
-    doReturn(plan).when(runner).getExecutionPlan(any());
-
-    CoordinationUtilsFactory coordinationUtilsFactory = 
mock(CoordinationUtilsFactory.class);
-    JobCoordinatorConfig mockJcConfig = mock(JobCoordinatorConfig.class);
-    
when(mockJcConfig.getCoordinationUtilsFactory()).thenReturn(coordinationUtilsFactory);
-    
PowerMockito.whenNew(JobCoordinatorConfig.class).withAnyArguments().thenReturn(mockJcConfig);
+  private Config config;
+  private SamzaApplication mockApp;
+  private LocalApplicationRunner runner;
+  private LocalJobPlanner localPlanner;
 
-    try {
-      runner.run(app);
-      runner.waitForFinish();
-    } catch (Throwable t) {
-      assertNotNull(t); //no jobs exception
-    }
-
-    ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
-    verify(streamManager).createStreams(captor.capture());
-    List<StreamSpec> streamSpecs = captor.getValue();
-    assertEquals(streamSpecs.size(), 1);
-    assertEquals(streamSpecs.get(0).getId(), "test-stream");
-    verify(streamManager).stop();
-  }
-
-  @Test
-  public void testStreamCreationWithCoordination()
-      throws Exception {
-    Config config = new MapConfig(new HashMap<>());
-    LocalApplicationRunner runner = spy(new LocalApplicationRunner(config));
-
-    StreamApplication app = mock(StreamApplication.class);
-
-    StreamManager streamManager = mock(StreamManager.class);
-    
doReturn(streamManager).when(runner).buildAndStartStreamManager(any(Config.class));
-
-    ExecutionPlan plan = mock(ExecutionPlan.class);
-    
when(plan.getIntermediateStreams()).thenReturn(Collections.singletonList(new 
StreamSpec("test-stream", "test-stream", "test-system")));
-    when(plan.getPlanAsJson()).thenReturn("");
-    when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new 
JobConfig(config)));
-    doReturn(plan).when(runner).getExecutionPlan(any());
-
-    CoordinationUtils coordinationUtils = mock(CoordinationUtils.class);
-    CoordinationUtilsFactory coordinationUtilsFactory = 
mock(CoordinationUtilsFactory.class);
-    JobCoordinatorConfig mockJcConfig = mock(JobCoordinatorConfig.class);
-    
when(mockJcConfig.getCoordinationUtilsFactory()).thenReturn(coordinationUtilsFactory);
-    
PowerMockito.whenNew(JobCoordinatorConfig.class).withAnyArguments().thenReturn(mockJcConfig);
-
-    DistributedLockWithState lock = mock(DistributedLockWithState.class);
-    when(lock.lockIfNotSet(anyLong(), anyObject())).thenReturn(true);
-    when(coordinationUtils.getLockWithState(anyString())).thenReturn(lock);
-    when(coordinationUtilsFactory.getCoordinationUtils(anyString(), 
anyString(), anyObject()))
-        .thenReturn(coordinationUtils);
-
-    try {
-      runner.run(app);
-      runner.waitForFinish();
-    } catch (Throwable t) {
-      assertNotNull(t); //no jobs exception
-    }
-
-    ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
-    verify(streamManager).createStreams(captor.capture());
-
-    List<StreamSpec> streamSpecs = captor.getValue();
-    assertEquals(streamSpecs.size(), 1);
-    assertEquals(streamSpecs.get(0).getId(), "test-stream");
-    verify(streamManager).stop();
+  @Before
+  public void setUp() {
+    config = new MapConfig();
+    mockApp = mock(StreamApplication.class);
+    prepareTest();
   }
 
   @Test
   public void testRunStreamTask()
       throws Exception {
-    final Map<String, String> config = new HashMap<>();
-    config.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, 
UUIDGenerator.class.getName());
-    config.put(TaskConfig.TASK_CLASS(), 
"org.apache.samza.task.IdentityStreamTask");
-
-    LocalApplicationRunner runner = new LocalApplicationRunner(new 
MapConfig(config));
+    final Map<String, String> cfgs = new HashMap<>();
+    cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, 
UUIDGenerator.class.getName());
+    cfgs.put(JobConfig.JOB_NAME(), "test-task-job");
+    config = new MapConfig(cfgs);
+    mockApp = (TaskApplication) appDesc -> 
appDesc.setTaskFactory((StreamTaskFactory) () -> new IdentityStreamTask());
+    prepareTest();
 
     StreamProcessor sp = mock(StreamProcessor.class);
-    ArgumentCaptor<StreamProcessorLifecycleListener> captor =
-        ArgumentCaptor.forClass(StreamProcessorLifecycleListener.class);
+
+    ArgumentCaptor<StreamProcessor.StreamProcessorLifecycleListenerFactory> 
captor =
+        
ArgumentCaptor.forClass(StreamProcessor.StreamProcessorLifecycleListenerFactory.class);
 
     doAnswer(i ->
       {
-        StreamProcessorLifecycleListener listener = captor.getValue();
-        listener.onStart();
-        listener.onShutdown();
+        ProcessorLifecycleListener listener = 
captor.getValue().createInstance(sp);
+        listener.afterStart();
+        listener.afterStop();
         return null;
       }).when(sp).start();
 
-    LocalApplicationRunner spy = spy(runner);
-    doReturn(sp).when(spy).createStreamProcessor(any(Config.class), 
captor.capture());
+    doReturn(sp).when(runner).createStreamProcessor(anyObject(), anyObject(), 
captor.capture());
+    doReturn(ApplicationStatus.SuccessfulFinish).when(runner).status();
 
-    spy.runTask();
+    runner.run();
 
-    assertEquals(ApplicationStatus.SuccessfulFinish, spy.status(null));
+    assertEquals(ApplicationStatus.SuccessfulFinish, runner.status());
   }
 
   @Test
   public void testRunComplete()
       throws Exception {
-    HashMap<String, String> configMap = new HashMap<>();
-    configMap.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, 
UUIDGenerator.class.getName());
-    Config config = new MapConfig(configMap);
-    LocalApplicationRunner runner = spy(new LocalApplicationRunner(new 
MapConfig(config)));
-    StreamApplication app = mock(StreamApplication.class);
-
-    // buildAndStartStreamManager already includes start, so not going to 
verify it gets called
-    StreamManager streamManager = mock(StreamManager.class);
-    
when(runner.buildAndStartStreamManager(any(Config.class))).thenReturn(streamManager);
-    ExecutionPlan plan = mock(ExecutionPlan.class);
-    when(plan.getIntermediateStreams()).thenReturn(Collections.emptyList());
-    when(plan.getPlanAsJson()).thenReturn("");
-    when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new 
JobConfig(new MapConfig(config))));
-    doReturn(plan).when(runner).getExecutionPlan(any());
+    Map<String, String> cfgs = new HashMap<>();
+    cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, 
UUIDGenerator.class.getName());
+    config = new MapConfig(cfgs);
+    ProcessorLifecycleListenerFactory mockFactory = (pContext, cfg) -> 
mock(ProcessorLifecycleListener.class);
+    mockApp = (StreamApplication) appDesc -> {
+      appDesc.withProcessorLifecycleListenerFactory(mockFactory);
+    };
+    prepareTest();
+
+    // return the jobConfigs from the planner
+    doReturn(Collections.singletonList(new JobConfig(new 
MapConfig(config)))).when(localPlanner).prepareJobs();
 
     StreamProcessor sp = mock(StreamProcessor.class);
-    ArgumentCaptor<StreamProcessorLifecycleListener> captor =
-        ArgumentCaptor.forClass(StreamProcessorLifecycleListener.class);
+    ArgumentCaptor<StreamProcessor.StreamProcessorLifecycleListenerFactory> 
captor =
+        
ArgumentCaptor.forClass(StreamProcessor.StreamProcessorLifecycleListenerFactory.class);
 
     doAnswer(i ->
       {
-        StreamProcessorLifecycleListener listener = captor.getValue();
-        listener.onStart();
-        listener.onShutdown();
+        ProcessorLifecycleListener listener = 
captor.getValue().createInstance(sp);
+        listener.afterStart();
+        listener.afterStop();
         return null;
       }).when(sp).start();
 
     doReturn(sp).when(runner).createStreamProcessor(anyObject(), anyObject(), 
captor.capture());
 
-    runner.run(app);
+    runner.run();
     runner.waitForFinish();
 
-    assertEquals(runner.status(app), ApplicationStatus.SuccessfulFinish);
-    verify(streamManager).stop();
+    assertEquals(runner.status(), ApplicationStatus.SuccessfulFinish);
   }
 
   @Test
   public void testRunFailure()
       throws Exception {
-    final Map<String, String> configMap = new HashMap<>();
-    configMap.put(ApplicationConfig.PROCESSOR_ID, "0");
-    MapConfig config = new MapConfig(configMap);
-    LocalApplicationRunner runner = spy(new LocalApplicationRunner(config));
-    StreamApplication app = mock(StreamApplication.class);
-
-    // buildAndStartStreamManager already includes start, so not going to 
verify it gets called
-    StreamManager streamManager = mock(StreamManager.class);
-    
when(runner.buildAndStartStreamManager(any(Config.class))).thenReturn(streamManager);
-    ExecutionPlan plan = mock(ExecutionPlan.class);
-    when(plan.getIntermediateStreams()).thenReturn(Collections.emptyList());
-    when(plan.getPlanAsJson()).thenReturn("");
-    when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new 
JobConfig(config)));
-    doReturn(plan).when(runner).getExecutionPlan(any());
+    Map<String, String> cfgs = new HashMap<>();
+    cfgs.put(ApplicationConfig.PROCESSOR_ID, "0");
+    config = new MapConfig(cfgs);
+    ProcessorLifecycleListenerFactory mockFactory = (pContext, cfg) -> 
mock(ProcessorLifecycleListener.class);
+    mockApp = (StreamApplication) appDesc -> {
+      appDesc.withProcessorLifecycleListenerFactory(mockFactory);
+    };
+    prepareTest();
+
+    // return the jobConfigs from the planner
+    doReturn(Collections.singletonList(new JobConfig(new 
MapConfig(config)))).when(localPlanner).prepareJobs();
 
     StreamProcessor sp = mock(StreamProcessor.class);
-    ArgumentCaptor<StreamProcessorLifecycleListener> captor =
-        ArgumentCaptor.forClass(StreamProcessorLifecycleListener.class);
+    ArgumentCaptor<StreamProcessor.StreamProcessorLifecycleListenerFactory> 
captor =
+        
ArgumentCaptor.forClass(StreamProcessor.StreamProcessorLifecycleListenerFactory.class);
 
     doAnswer(i ->
       {
@@ -257,79 +160,17 @@ public class TestLocalApplicationRunner {
     doReturn(sp).when(runner).createStreamProcessor(anyObject(), anyObject(), 
captor.capture());
 
     try {
-      runner.run(app);
+      runner.run();
       runner.waitForFinish();
     } catch (Throwable th) {
       assertNotNull(th);
     }
 
-    assertEquals(runner.status(app), ApplicationStatus.UnsuccessfulFinish);
-    verify(streamManager).stop();
-  }
-
-  public static Set<StreamProcessor> getProcessors(LocalApplicationRunner 
runner) {
-    return runner.getProcessors();
-  }
-
-  /**
-   * A test case to verify if the plan results in different hash if there is 
change in topological sort order.
-   * Note: the overall JOB PLAN remains the same outside the scope of 
intermediate streams the sake of these test cases.
-   */
-  @Test
-  public void testPlanIdWithShuffledStreamSpecs() {
-    List<StreamSpec> streamSpecs = ImmutableList.of(new 
StreamSpec("test-stream-1", "stream-1", "testStream"),
-        new StreamSpec("test-stream-2", "stream-2", "testStream"),
-        new StreamSpec("test-stream-3", "stream-3", "testStream"));
-    String planIdBeforeShuffle = getExecutionPlanId(streamSpecs);
-
-    List<StreamSpec> shuffledStreamSpecs = ImmutableList.of(new 
StreamSpec("test-stream-2", "stream-2", "testStream"),
-        new StreamSpec("test-stream-1", "stream-1", "testStream"),
-        new StreamSpec("test-stream-3", "stream-3", "testStream"));
-
-
-    assertFalse("Expected both of the latch ids to be different",
-        planIdBeforeShuffle.equals(getExecutionPlanId(shuffledStreamSpecs)));
-  }
-
-  /**
-   * A test case to verify if the plan results in same hash in case of same 
plan.
-   * Note: the overall JOB PLAN remains the same outside the scope of 
intermediate streams the sake of these test cases.
-   */
-  @Test
-  public void testGeneratePlanIdWithSameStreamSpecs() {
-    List<StreamSpec> streamSpecs = ImmutableList.of(new 
StreamSpec("test-stream-1", "stream-1", "testStream"),
-        new StreamSpec("test-stream-2", "stream-2", "testStream"),
-        new StreamSpec("test-stream-3", "stream-3", "testStream"));
-    String planIdForFirstAttempt = getExecutionPlanId(streamSpecs);
-    String planIdForSecondAttempt = getExecutionPlanId(streamSpecs);
-
-    assertEquals("Expected latch ids to match!", "1447946713", 
planIdForFirstAttempt);
-    assertEquals("Expected latch ids to match for the second attempt!", 
planIdForFirstAttempt, planIdForSecondAttempt);
-  }
-
-  /**
-   * A test case to verify plan results in different hash in case of different 
intermediate stream.
-   * Note: the overall JOB PLAN remains the same outside the scope of 
intermediate streams the sake of these test cases.
-   */
-  @Test
-  public void testGeneratePlanIdWithDifferentStreamSpecs() {
-    List<StreamSpec> streamSpecs = ImmutableList.of(new 
StreamSpec("test-stream-1", "stream-1", "testStream"),
-        new StreamSpec("test-stream-2", "stream-2", "testStream"),
-        new StreamSpec("test-stream-3", "stream-3", "testStream"));
-    String planIdBeforeShuffle = getExecutionPlanId(streamSpecs);
-
-    List<StreamSpec> updatedStreamSpecs = ImmutableList.of(new 
StreamSpec("test-stream-1", "stream-1", "testStream"),
-        new StreamSpec("test-stream-4", "stream-4", "testStream"),
-        new StreamSpec("test-stream-3", "stream-3", "testStream"));
-
-
-    assertFalse("Expected both of the latch ids to be different",
-        planIdBeforeShuffle.equals(getExecutionPlanId(updatedStreamSpecs)));
+    assertEquals(runner.status(), ApplicationStatus.UnsuccessfulFinish);
   }
 
   @Test
   public void testWaitForFinishReturnsBeforeTimeout() {
-    LocalApplicationRunner runner = new LocalApplicationRunner(new 
MapConfig());
     long timeoutInMs = 1000;
 
     runner.getShutdownLatch().countDown();
@@ -339,23 +180,15 @@ public class TestLocalApplicationRunner {
 
   @Test
   public void testWaitForFinishTimesout() {
-    LocalApplicationRunner runner = new LocalApplicationRunner(new 
MapConfig());
     long timeoutInMs = 100;
     boolean finished = runner.waitForFinish(Duration.ofMillis(timeoutInMs));
     assertFalse("Application finished before the timeout.", finished);
   }
 
-  private String getExecutionPlanId(List<StreamSpec> updatedStreamSpecs) {
-    String intermediateStreamJson =
-        
updatedStreamSpecs.stream().map(this::streamSpecToJson).collect(Collectors.joining(","));
-
-    int planId = String.format(PLAN_JSON, intermediateStreamJson).hashCode();
-
-    return String.valueOf(planId);
+  private void prepareTest() {
+    ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc = 
ApplicationDescriptorUtil.getAppDescriptor(mockApp, config);
+    localPlanner = spy(new LocalJobPlanner(appDesc));
+    runner = spy(new LocalApplicationRunner(appDesc, localPlanner));
   }
 
-  private String streamSpecToJson(StreamSpec streamSpec) {
-    return String.format(STREAM_SPEC_JSON_FORMAT, streamSpec.getId(), 
streamSpec.getId(), streamSpec.getSystemName(),
-        streamSpec.getPhysicalName());
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java
 
b/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java
index 2734d56..ae525fb 100644
--- 
a/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java
+++ 
b/samza-core/src/test/java/org/apache/samza/runtime/TestRemoteApplicationRunner.java
@@ -22,6 +22,7 @@ package org.apache.samza.runtime;
 import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
@@ -29,30 +30,45 @@ import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.job.StreamJob;
 import org.apache.samza.job.StreamJobFactory;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
 
 
 /**
  * A test class for {@link RemoteApplicationRunner}.
  */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(RemoteApplicationRunner.class)
 public class TestRemoteApplicationRunner {
+
+  private RemoteApplicationRunner runner;
+
+  @Before
+  public void setUp() {
+    Map<String, String> config = new HashMap<>();
+    StreamApplication userApp = appDesc -> { };
+    runner = spy(new RemoteApplicationRunner(userApp, new MapConfig(config)));
+  }
+
   @Test
   public void testWaitForFinishReturnsBeforeTimeout() {
-    RemoteApplicationRunner runner = spy(new RemoteApplicationRunner(new 
MapConfig()));
     
doReturn(ApplicationStatus.SuccessfulFinish).when(runner).getApplicationStatus(any(JobConfig.class));
-
     boolean finished = runner.waitForFinish(Duration.ofMillis(5000));
     assertTrue("Application did not finish before the timeout.", finished);
   }
 
   @Test
   public void testWaitForFinishTimesout() {
-    RemoteApplicationRunner runner = spy(new RemoteApplicationRunner(new 
MapConfig()));
     
doReturn(ApplicationStatus.Running).when(runner).getApplicationStatus(any(JobConfig.class));
-
     boolean finished = runner.waitForFinish(Duration.ofMillis(1000));
     assertFalse("Application finished before the timeout.", finished);
   }
@@ -64,11 +80,14 @@ public class TestRemoteApplicationRunner {
     m.put(JobConfig.STREAM_JOB_FACTORY_CLASS(), 
MockStreamJobFactory.class.getName());
 
     m.put(JobConfig.JOB_ID(), "newJob");
-    RemoteApplicationRunner runner = new RemoteApplicationRunner(new 
MapConfig());
+
+    StreamApplication userApp = appDesc -> { };
+    runner = spy(new RemoteApplicationRunner(userApp, new MapConfig(m)));
+
     Assert.assertEquals(ApplicationStatus.New, runner.getApplicationStatus(new 
JobConfig(new MapConfig(m))));
 
     m.put(JobConfig.JOB_ID(), "runningJob");
-    runner = new RemoteApplicationRunner(new JobConfig(new MapConfig(m)));
+    runner = spy(new RemoteApplicationRunner(userApp, new MapConfig(m)));
     Assert.assertEquals(ApplicationStatus.Running, 
runner.getApplicationStatus(new JobConfig(new MapConfig(m))));
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/task/MockAsyncStreamTask.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/task/MockAsyncStreamTask.java 
b/samza-core/src/test/java/org/apache/samza/task/MockAsyncStreamTask.java
new file mode 100644
index 0000000..e449ecc
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/task/MockAsyncStreamTask.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.task;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+
+/**
+ * Test implementation class for {@link AsyncStreamTask}
+ */
+public class MockAsyncStreamTask implements AsyncStreamTask {
+  @Override
+  public void processAsync(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator, TaskCallback callback) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/task/MockStreamTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/MockStreamTask.java 
b/samza-core/src/test/java/org/apache/samza/task/MockStreamTask.java
new file mode 100644
index 0000000..d089c4b
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/task/MockStreamTask.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.task;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+
+/**
+ * Test implementation class for {@link StreamTask}
+ */
+public class MockStreamTask implements StreamTask {
+  @Override
+  public void process(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator) throws Exception {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java 
b/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java
index 0b91315..f96ab19 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestTaskFactoryUtil.java
@@ -21,24 +21,20 @@ package org.apache.samza.task;
 import java.lang.reflect.Field;
 import java.util.concurrent.ExecutorService;
 import org.apache.samza.SamzaException;
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.ApplicationConfig;
-import org.apache.samza.config.Config;
+import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
+import org.apache.samza.application.TaskApplicationDescriptorImpl;
 import org.apache.samza.config.ConfigException;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.operators.StreamGraphSpec;
-import org.apache.samza.testUtils.TestAsyncStreamTask;
-import org.apache.samza.testUtils.TestStreamTask;
+import org.apache.samza.operators.OperatorSpecGraph;
 import org.junit.Test;
 
-import java.util.HashMap;
-
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 
 /**
  * Test methods to create {@link StreamTaskFactory} or {@link 
AsyncStreamTaskFactory} based on task class configuration
@@ -47,22 +43,12 @@ public class TestTaskFactoryUtil {
 
   @Test
   public void testStreamTaskClass() {
-    Config config = new MapConfig(new HashMap<String, String>() {
-      {
-        this.put("task.class", "org.apache.samza.testUtils.TestStreamTask");
-      }
-    });
-    Object retFactory = TaskFactoryUtil.createTaskFactory(config);
+    TaskFactory retFactory = 
TaskFactoryUtil.getTaskFactory(MockStreamTask.class.getName());
     assertTrue(retFactory instanceof StreamTaskFactory);
-    assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof 
TestStreamTask);
+    assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof 
MockStreamTask);
 
-    config = new MapConfig(new HashMap<String, String>() {
-      {
-        this.put("task.class", "no.such.class");
-      }
-    });
     try {
-      TaskFactoryUtil.createTaskFactory(config);
+      TaskFactoryUtil.getTaskFactory("no.such.class");
       fail("Should have failed w/ no.such.class");
     } catch (ConfigException cfe) {
       // expected
@@ -70,156 +56,13 @@ public class TestTaskFactoryUtil {
   }
 
   @Test
-  public void testCreateStreamApplication() throws Exception {
-    Config config = new MapConfig(new HashMap<String, String>() {
-      {
-        this.put(ApplicationConfig.APP_CLASS, 
"org.apache.samza.testUtils.TestStreamApplication");
-      }
-    });
-    StreamApplication streamApp = 
TaskFactoryUtil.createStreamApplication(config);
-    assertNotNull(streamApp);
-    StreamGraphSpec graph = new StreamGraphSpec(config);
-    streamApp.init(graph, config);
-    Object retFactory = 
TaskFactoryUtil.createTaskFactory(graph.getOperatorSpecGraph(), null);
-    assertTrue(retFactory instanceof StreamTaskFactory);
-    assertTrue(((StreamTaskFactory) retFactory).createInstance() instanceof 
StreamOperatorTask);
-
-    config = new MapConfig(new HashMap<String, String>() {
-      {
-        this.put(ApplicationConfig.APP_CLASS, 
"org.apache.samza.testUtils.InvalidStreamApplication");
-      }
-    });
-    try {
-      TaskFactoryUtil.createStreamApplication(config);
-      fail("Should have failed w/ no.such.class");
-    } catch (ConfigException ce) {
-      // expected
-    }
-
-    config = new MapConfig(new HashMap<String, String>() {
-      {
-        this.put(ApplicationConfig.APP_CLASS, "no.such.class");
-      }
-    });
-    try {
-      TaskFactoryUtil.createStreamApplication(config);
-      fail("Should have failed w/ no.such.class");
-    } catch (ConfigException ce) {
-      // expected
-    }
-
-    config = new MapConfig(new HashMap<String, String>() {
-      {
-        this.put(ApplicationConfig.APP_CLASS, "");
-      }
-    });
-    streamApp = TaskFactoryUtil.createStreamApplication(config);
-    assertNull(streamApp);
-
-    config = new MapConfig(new HashMap<>());
-    streamApp = TaskFactoryUtil.createStreamApplication(config);
-    assertNull(streamApp);
-  }
-
-  @Test
-  public void testCreateStreamApplicationWithTaskClass() throws Exception {
-    Config config = new MapConfig(new HashMap<String, String>() {
-      {
-        this.put(ApplicationConfig.APP_CLASS, 
"org.apache.samza.testUtils.TestStreamApplication");
-      }
-    });
-    StreamApplication streamApp = 
TaskFactoryUtil.createStreamApplication(config);
-    assertNotNull(streamApp);
-
-    config = new MapConfig(new HashMap<String, String>() {
-      {
-        this.put("task.class", 
"org.apache.samza.testUtils.TestAsyncStreamTask");
-        this.put(ApplicationConfig.APP_CLASS, 
"org.apache.samza.testUtils.TestStreamApplication");
-      }
-    });
-    try {
-      TaskFactoryUtil.createStreamApplication(config);
-      fail("should have failed with invalid config");
-    } catch (ConfigException ce) {
-      // expected
-    }
-
-    config = new MapConfig(new HashMap<String, String>() {
-      {
-        this.put("task.class", "no.such.class");
-        this.put(ApplicationConfig.APP_CLASS, 
"org.apache.samza.testUtils.TestStreamApplication");
-      }
-    });
-    try {
-      TaskFactoryUtil.createStreamApplication(config);
-      fail("should have failed with invalid config");
-    } catch (ConfigException ce) {
-      // expected
-    }
-
-
-    config = new MapConfig(new HashMap<String, String>() {
-      {
-        this.put("task.class", "");
-        this.put(ApplicationConfig.APP_CLASS, 
"org.apache.samza.testUtils.TestStreamApplication");
-      }
-    });
-    streamApp = TaskFactoryUtil.createStreamApplication(config);
-    assertNotNull(streamApp);
-
-  }
-
-  @Test
-  public void testStreamTaskClassWithInvalidStreamApplication() throws 
Exception {
-
-    Config config = new MapConfig(new HashMap<String, String>() {
-      {
-        this.put(ApplicationConfig.APP_CLASS, 
"org.apache.samza.testUtils.InvalidStreamApplication");
-      }
-    });
-    try {
-      TaskFactoryUtil.createStreamApplication(config);
-      fail("Should have failed w/ no.such.class");
-    } catch (ConfigException ce) {
-      // expected
-    }
-
-  }
-
-  @Test
   public void testAsyncStreamTask() {
-    Config config = new MapConfig(new HashMap<String, String>() {
-      {
-        this.put("task.class", 
"org.apache.samza.testUtils.TestAsyncStreamTask");
-      }
-    });
-    Object retFactory = TaskFactoryUtil.createTaskFactory(config);
+    TaskFactory retFactory = 
TaskFactoryUtil.getTaskFactory(MockAsyncStreamTask.class.getName());
     assertTrue(retFactory instanceof AsyncStreamTaskFactory);
-    assertTrue(((AsyncStreamTaskFactory) retFactory).createInstance() 
instanceof TestAsyncStreamTask);
+    assertTrue(((AsyncStreamTaskFactory) retFactory).createInstance() 
instanceof MockAsyncStreamTask);
 
-    config = new MapConfig(new HashMap<String, String>() {
-      {
-        this.put("task.class", "no.such.class");
-      }
-    });
     try {
-      TaskFactoryUtil.createTaskFactory(config);
-      fail("Should have failed w/ no.such.class");
-    } catch (ConfigException cfe) {
-      // expected
-    }
-  }
-
-  @Test
-  public void testAsyncStreamTaskWithInvalidStreamGraphBuilder() throws 
Exception {
-
-    Config config = new MapConfig(new HashMap<String, String>() {
-      {
-        this.put(ApplicationConfig.APP_CLASS, 
"org.apache.samza.testUtils.InvalidStreamApplication");
-      }
-    });
-    try {
-      TaskFactoryUtil.createStreamApplication(config);
+      TaskFactoryUtil.getTaskFactory("no.such.class");
       fail("Should have failed w/ no.such.class");
     } catch (ConfigException cfe) {
       // expected
@@ -228,7 +71,7 @@ public class TestTaskFactoryUtil {
 
   @Test
   public void testFinalizeTaskFactory() throws NoSuchFieldException, 
IllegalAccessException {
-    Object mockFactory = mock(Object.class);
+    TaskFactory mockFactory = mock(TaskFactory.class);
     try {
       TaskFactoryUtil.finalizeTaskFactory(mockFactory, true, null);
       fail("Should have failed with validation");
@@ -260,4 +103,34 @@ public class TestTaskFactoryUtil {
     retFactory = TaskFactoryUtil.finalizeTaskFactory(mockAsyncStreamFactory, 
false, null);
     assertEquals(retFactory, mockAsyncStreamFactory);
   }
+
+  // test getTaskFactory with StreamApplicationDescriptor
+  @Test
+  public void testGetTaskFactoryWithStreamAppDescriptor() {
+    StreamApplicationDescriptorImpl mockStreamApp = 
mock(StreamApplicationDescriptorImpl.class);
+    OperatorSpecGraph mockSpecGraph = mock(OperatorSpecGraph.class);
+    when(mockStreamApp.getOperatorSpecGraph()).thenReturn(mockSpecGraph);
+    TaskFactory streamTaskFactory = 
TaskFactoryUtil.getTaskFactory(mockStreamApp);
+    assertTrue(streamTaskFactory instanceof StreamTaskFactory);
+    StreamTask streamTask = ((StreamTaskFactory) 
streamTaskFactory).createInstance();
+    assertTrue(streamTask instanceof StreamOperatorTask);
+    verify(mockSpecGraph).clone();
+  }
+
+  // test getTaskFactory with TaskApplicationDescriptor
+  @Test
+  public void testGetTaskFactoryWithTaskAppDescriptor() {
+    TaskApplicationDescriptorImpl mockTaskApp = 
mock(TaskApplicationDescriptorImpl.class);
+    TaskFactory mockTaskFactory = mock(TaskFactory.class);
+    when(mockTaskApp.getTaskFactory()).thenReturn(mockTaskFactory);
+    TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(mockTaskApp);
+    assertEquals(mockTaskFactory, taskFactory);
+  }
+
+  // test getTaskFactory with invalid ApplicationDescriptorImpl
+  @Test(expected = IllegalArgumentException.class)
+  public void testGetTaskFactoryWithInvalidAddDescriptorImpl() {
+    ApplicationDescriptorImpl mockInvalidApp = 
mock(ApplicationDescriptorImpl.class);
+    TaskFactoryUtil.getTaskFactory(mockInvalidApp);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/testUtils/TestAsyncStreamTask.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/testUtils/TestAsyncStreamTask.java 
b/samza-core/src/test/java/org/apache/samza/testUtils/TestAsyncStreamTask.java
deleted file mode 100644
index 81f3fd4..0000000
--- 
a/samza-core/src/test/java/org/apache/samza/testUtils/TestAsyncStreamTask.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.testUtils;
-
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.task.AsyncStreamTask;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCallback;
-import org.apache.samza.task.TaskCoordinator;
-
-/**
- * Test implementation class for {@link AsyncStreamTask}
- */
-public class TestAsyncStreamTask implements AsyncStreamTask {
-  @Override
-  public void processAsync(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator, TaskCallback callback) {
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamApplication.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamApplication.java
 
b/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamApplication.java
deleted file mode 100644
index a1cba7d..0000000
--- 
a/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamApplication.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.testUtils;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.application.StreamApplication;
-
-/**
- * Test implementation class for {@link StreamApplication}
- */
-public class TestStreamApplication implements StreamApplication {
-  @Override
-  public void init(StreamGraph graph, Config config) {
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamTask.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamTask.java 
b/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamTask.java
deleted file mode 100644
index ce0980a..0000000
--- a/samza-core/src/test/java/org/apache/samza/testUtils/TestStreamTask.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.testUtils;
-
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskCoordinator;
-
-/**
- * Test implementation class for {@link StreamTask}
- */
-public class TestStreamTask implements StreamTask {
-  @Override
-  public void process(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator) throws Exception {
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 9aca45e..ff57047 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -187,6 +187,7 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
     @volatile var onContainerStopCalled = false
     @volatile var onContainerStartCalled = false
     @volatile var onContainerFailedThrowable: Throwable = null
+    @volatile var onContainerBeforeStartCalled = false
 
     val container = new SamzaContainer(
       containerContext = containerContext,
@@ -198,23 +199,29 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
       metrics = new SamzaContainerMetrics)
 
     val containerListener = new SamzaContainerListener {
-      override def onContainerFailed(t: Throwable): Unit = {
+      override def afterFailure(t: Throwable): Unit = {
         onContainerFailedCalled = true
         onContainerFailedThrowable = t
       }
 
-      override def onContainerStop(): Unit = {
+      override def afterStop(): Unit = {
         onContainerStopCalled = true
       }
 
-      override def onContainerStart(): Unit = {
+      override def afterStart(): Unit = {
         onContainerStartCalled = true
       }
+
+      override def beforeStart(): Unit = {
+        onContainerBeforeStartCalled = true
+      }
+
     }
     container.setContainerListener(containerListener)
 
     container.run
     assertTrue(task.wasShutdown)
+    assertTrue(onContainerBeforeStartCalled)
     assertFalse(onContainerStartCalled)
     assertFalse(onContainerStopCalled)
 
@@ -266,6 +273,7 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
     @volatile var onContainerStopCalled = false
     @volatile var onContainerStartCalled = false
     @volatile var onContainerFailedThrowable: Throwable = null
+    @volatile var onContainerBeforeStartCalled = false
 
     val mockRunLoop = mock[RunLoop]
     when(mockRunLoop.run).thenThrow(new RuntimeException("Trigger a shutdown, 
please."))
@@ -279,23 +287,31 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
       producerMultiplexer = producerMultiplexer,
       metrics = new SamzaContainerMetrics)
     val containerListener = new SamzaContainerListener {
-      override def onContainerFailed(t: Throwable): Unit = {
+      override def afterFailure(t: Throwable): Unit = {
         onContainerFailedCalled = true
         onContainerFailedThrowable = t
       }
 
-      override def onContainerStop(): Unit = {
+      override def afterStop(): Unit = {
         onContainerStopCalled = true
       }
 
-      override def onContainerStart(): Unit = {
+      override def afterStart(): Unit = {
         onContainerStartCalled = true
       }
+
+      /**
+        * Method invoked before the {@link 
org.apache.samza.container.SamzaContainer} is started
+        */
+      override def beforeStart(): Unit = {
+        onContainerBeforeStartCalled = true
+      }
     }
     container.setContainerListener(containerListener)
 
     container.run
     assertTrue(task.wasShutdown)
+    assertTrue(onContainerBeforeStartCalled)
     assertTrue(onContainerStartCalled)
 
     assertFalse(onContainerStopCalled)
@@ -352,6 +368,7 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
     @volatile var onContainerStopCalled = false
     @volatile var onContainerStartCalled = false
     @volatile var onContainerFailedThrowable: Throwable = null
+    @volatile var onContainerBeforeStartCalled = false
 
     val container = new SamzaContainer(
       containerContext = containerContext,
@@ -362,25 +379,32 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
       producerMultiplexer = producerMultiplexer,
       metrics = new SamzaContainerMetrics)
     val containerListener = new SamzaContainerListener {
-      override def onContainerFailed(t: Throwable): Unit = {
+      override def afterFailure(t: Throwable): Unit = {
         onContainerFailedCalled = true
         onContainerFailedThrowable = t
       }
 
-      override def onContainerStop(): Unit = {
+      override def afterStop(): Unit = {
         onContainerStopCalled = true
       }
 
-      override def onContainerStart(): Unit = {
+      override def afterStart(): Unit = {
         onContainerStartCalled = true
       }
+
+      /**
+        * Method invoked before the {@link 
org.apache.samza.container.SamzaContainer} is started
+        */
+      override def beforeStart(): Unit = {
+        onContainerBeforeStartCalled = true
+      }
     }
     container.setContainerListener(containerListener)
 
     container.run
 
     assertTrue(task.wasShutdown)
-
+    assertTrue(onContainerBeforeStartCalled)
     assertFalse(onContainerStopCalled)
     assertFalse(onContainerStartCalled)
 
@@ -429,6 +453,7 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
     @volatile var onContainerStopCalled = false
     @volatile var onContainerStartCalled = false
     @volatile var onContainerFailedThrowable: Throwable = null
+    @volatile var onContainerBeforeStartCalled = false
 
     val mockRunLoop = mock[RunLoop]
     when(mockRunLoop.run).thenAnswer(new Answer[Unit] {
@@ -446,22 +471,30 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
       producerMultiplexer = producerMultiplexer,
       metrics = new SamzaContainerMetrics)
       val containerListener = new SamzaContainerListener {
-        override def onContainerFailed(t: Throwable): Unit = {
+        override def afterFailure(t: Throwable): Unit = {
           onContainerFailedCalled = true
           onContainerFailedThrowable = t
         }
 
-        override def onContainerStop(): Unit = {
+        override def afterStop(): Unit = {
           onContainerStopCalled = true
         }
 
-        override def onContainerStart(): Unit = {
+        override def afterStart(): Unit = {
           onContainerStartCalled = true
         }
+
+        /**
+          * Method invoked before the {@link 
org.apache.samza.container.SamzaContainer} is started
+          */
+        override def beforeStart(): Unit = {
+          onContainerBeforeStartCalled = true
+        }
       }
     container.setContainerListener(containerListener)
 
     container.run
+    assertTrue(onContainerBeforeStartCalled)
     assertFalse(onContainerFailedCalled)
     assertTrue(onContainerStartCalled)
     assertTrue(onContainerStopCalled)
@@ -507,6 +540,7 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
     @volatile var onContainerStopCalled = false
     @volatile var onContainerStartCalled = false
     @volatile var onContainerFailedThrowable: Throwable = null
+    @volatile var onContainerBeforeStartCalled = false
 
     val mockRunLoop = mock[RunLoop]
     when(mockRunLoop.run).thenAnswer(new Answer[Unit] {
@@ -525,24 +559,34 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
       metrics = new SamzaContainerMetrics)
 
     val containerListener = new SamzaContainerListener {
-        override def onContainerFailed(t: Throwable): Unit = {
+        override def afterFailure(t: Throwable): Unit = {
           onContainerFailedCalled = true
           onContainerFailedThrowable = t
         }
 
-        override def onContainerStop(): Unit = {
+        override def afterStop(): Unit = {
           onContainerStopCalled = true
         }
 
-        override def onContainerStart(): Unit = {
+        override def afterStart(): Unit = {
           onContainerStartCalled = true
         }
+
+      /**
+        * Method invoked before the {@link 
org.apache.samza.container.SamzaContainer} is started
+        */
+      override def beforeStart(): Unit = {
+        onContainerBeforeStartCalled = true
       }
+    }
     container.setContainerListener(containerListener)
 
     container.run
 
+    assertTrue(onContainerBeforeStartCalled)
+    assertTrue(onContainerStartCalled)
     assertTrue(onContainerFailedCalled)
+    assertFalse(onContainerStopCalled)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java 
b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
index 181971a..9a871d7 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
@@ -20,13 +20,10 @@
 package org.apache.samza.sql.runner;
 
 import java.util.List;
-
-import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.sql.translator.QueryTranslator;
+import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
+import org.apache.samza.sql.translator.QueryTranslator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,14 +36,14 @@ public class SamzaSqlApplication implements 
StreamApplication {
   private static final Logger LOG = 
LoggerFactory.getLogger(SamzaSqlApplication.class);
 
   @Override
-  public void init(StreamGraph streamGraph, Config config) {
+  public void describe(StreamApplicationDescriptor appDesc) {
     try {
-      SamzaSqlApplicationConfig sqlConfig = new 
SamzaSqlApplicationConfig(config);
+      SamzaSqlApplicationConfig sqlConfig = new 
SamzaSqlApplicationConfig(appDesc.getConfig());
       QueryTranslator queryTranslator = new QueryTranslator(sqlConfig);
       List<SamzaSqlQueryParser.QueryInfo> queries = sqlConfig.getQueryInfo();
       for (SamzaSqlQueryParser.QueryInfo query : queries) {
         LOG.info("Translating the query {} to samza stream graph", 
query.getSelectQuery());
-        queryTranslator.translate(query, streamGraph);
+        queryTranslator.translate(query, appDesc);
       }
     } catch (RuntimeException e) {
       LOG.error("SamzaSqlApplication threw exception.", e);

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
index 044c7cf..027fd23 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
@@ -19,20 +19,21 @@
 
 package org.apache.samza.sql.runner;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.lang3.Validate;
-import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.job.ApplicationStatus;
-import org.apache.samza.runtime.AbstractApplicationRunner;
+import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunners;
 import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.runtime.RemoteApplicationRunner;
-import org.apache.samza.sql.interfaces.SqlIOResolver;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.interfaces.SqlIOResolver;
 import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,26 +46,18 @@ import org.slf4j.LoggerFactory;
  * This runner invokes the SamzaSqlConfig re-writer if it is invoked on a 
standalone mode (i.e. localRunner == true)
  * otherwise directly calls the RemoteApplicationRunner which automatically 
performs the config rewriting .
  */
-public class SamzaSqlApplicationRunner extends AbstractApplicationRunner {
+public class SamzaSqlApplicationRunner implements ApplicationRunner {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(SamzaSqlApplicationRunner.class);
 
-  private final Config sqlConfig;
-  private final ApplicationRunner appRunner;
-  private final Boolean localRunner;
+  private final ApplicationRunner runner;
 
   public static final String RUNNER_CONFIG = "app.runner.class";
   public static final String CFG_FMT_SAMZA_STREAM_SYSTEM = 
"streams.%s.samza.system";
 
-  public SamzaSqlApplicationRunner(Config config) {
-    this(false, config);
-  }
-
   public SamzaSqlApplicationRunner(Boolean localRunner, Config config) {
-    super(config);
-    this.localRunner = localRunner;
-    sqlConfig = computeSamzaConfigs(localRunner, config);
-    appRunner = ApplicationRunner.fromConfig(sqlConfig);
+    this.runner = ApplicationRunners.getApplicationRunner(new 
SamzaSqlApplication(),
+        computeSamzaConfigs(localRunner, config));
   }
 
   public static Config computeSamzaConfigs(Boolean localRunner, Config config) 
{
@@ -107,30 +100,34 @@ public class SamzaSqlApplicationRunner extends 
AbstractApplicationRunner {
   }
 
   public void runAndWaitForFinish() {
-    Validate.isTrue(localRunner, "This method can be called only in standalone 
mode.");
-    SamzaSqlApplication app = new SamzaSqlApplication();
-    run(app);
-    appRunner.waitForFinish();
+    Validate.isTrue(runner instanceof LocalApplicationRunner, "This method can 
be called only in standalone mode.");
+    run();
+    waitForFinish();
   }
 
   @Override
-  public void runTask() {
-    appRunner.runTask();
+  public void run() {
+    runner.run();
   }
 
   @Override
-  public void run(StreamApplication streamApp) {
-    Validate.isInstanceOf(SamzaSqlApplication.class, streamApp);
-    appRunner.run(streamApp);
+  public void kill() {
+    runner.kill();
   }
 
   @Override
-  public void kill(StreamApplication streamApp) {
-    appRunner.kill(streamApp);
+  public ApplicationStatus status() {
+    return runner.status();
   }
 
   @Override
-  public ApplicationStatus status(StreamApplication streamApp) {
-    return appRunner.status(streamApp);
+  public void waitForFinish() {
+    runner.waitForFinish();
   }
+
+  @Override
+  public boolean waitForFinish(Duration timeout) {
+    return runner.waitForFinish(timeout);
+  }
+
 }

Reply via email to