http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
 
b/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
index af20fd7..1954cc3 100644
--- 
a/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
+++ 
b/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
@@ -19,9 +19,8 @@
 package org.apache.samza.test.integration;
 
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
+import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.operators.KV;
-import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.system.kafka.KafkaInputDescriptor;
@@ -38,9 +37,9 @@ public class TestStandaloneIntegrationApplication implements 
StreamApplication {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TestStandaloneIntegrationApplication.class);
 
   @Override
-  public void init(StreamGraph graph, Config config) {
+  public void describe(StreamApplicationDescriptor appDesc) {
     String systemName = "testSystemName";
-    String inputStreamName = config.get("input.stream.name");
+    String inputStreamName = appDesc.getConfig().get("input.stream.name");
     String outputStreamName = "standaloneIntegrationTestKafkaOutputTopic";
     LOGGER.info("Publishing message from: {} to: {}.", inputStreamName, 
outputStreamName);
     KafkaSystemDescriptor kafkaSystemDescriptor = new 
KafkaSystemDescriptor(systemName);
@@ -50,6 +49,6 @@ public class TestStandaloneIntegrationApplication implements 
StreamApplication {
         kafkaSystemDescriptor.getInputDescriptor(inputStreamName, noOpSerde);
     KafkaOutputDescriptor<KV<Object, Object>> osd =
         kafkaSystemDescriptor.getOutputDescriptor(inputStreamName, noOpSerde);
-    graph.getInputStream(isd).sendTo(graph.getOutputStream(osd));
+    appDesc.getInputStream(isd).sendTo(appDesc.getOutputStream(osd));
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
 
b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
index 189ae9b..66cf061 100644
--- 
a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
+++ 
b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
@@ -46,6 +46,7 @@ import org.apache.samza.config.TaskConfigJava;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
+import org.apache.samza.runtime.ProcessorLifecycleListener;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
@@ -135,9 +136,14 @@ public class TestZkStreamProcessorBase extends 
StandaloneIntegrationTestHarness
     String jobCoordinatorFactoryClassName = new 
JobCoordinatorConfig(config).getJobCoordinatorFactoryClassName();
     JobCoordinator jobCoordinator = 
Util.getObj(jobCoordinatorFactoryClassName, 
JobCoordinatorFactory.class).getJobCoordinator(config);
 
-    StreamProcessorLifecycleListener listener = new 
StreamProcessorLifecycleListener() {
+    ProcessorLifecycleListener listener = new ProcessorLifecycleListener() {
       @Override
-      public void onStart() {
+      public void beforeStart() {
+
+      }
+
+      @Override
+      public void afterStart() {
         if (waitStart != null) {
             waitStart.countDown();
         }
@@ -145,16 +151,18 @@ public class TestZkStreamProcessorBase extends 
StandaloneIntegrationTestHarness
       }
 
       @Override
-      public void onShutdown() {
+      public void afterStop() {
+        // stopped w/o failure
         if (waitStop != null) {
           waitStop.countDown();
         }
-        LOG.info("onShutdown is called for pid=" + pId);
+        LOG.info("afterStop is called for pid=" + pId + " with successful 
shutdown");
       }
 
       @Override
-      public void onFailure(Throwable t) {
-        LOG.info("onFailure is called for pid=" + pId);
+      public void afterFailure(Throwable t) {
+        // stopped w/ failure
+        LOG.info("afterStop is called for pid=" + pId + " with failure");
       }
     };
 

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
index b86c6af..d2aab11 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
@@ -34,7 +35,8 @@ import org.apache.samza.operators.KV;
 import org.apache.samza.operators.descriptors.GenericInputDescriptor;
 import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor;
 import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunners;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
@@ -94,20 +96,25 @@ public class EndOfStreamIntegrationTest extends 
AbstractIntegrationTestHarness {
     configs.put("serializers.registry.int.class", 
"org.apache.samza.serializers.IntegerSerdeFactory");
     configs.put("serializers.registry.json.class", 
PageViewJsonSerdeFactory.class.getName());
 
-    final LocalApplicationRunner runner = new LocalApplicationRunner(new 
MapConfig(configs));
-    final StreamApplication app = (streamGraph, cfg) -> {
-      DelegatingSystemDescriptor sd = new DelegatingSystemDescriptor("test");
-      GenericInputDescriptor<KV<String, PageView>> isd =
-          sd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new 
NoOpSerde<>()));
-      streamGraph.getInputStream(isd)
-        .map(Values.create())
-        .partitionBy(pv -> pv.getMemberId(), pv -> pv, "p1")
-        .sink((m, collector, coordinator) -> {
-            received.add(m.getValue());
-          });
-    };
-
-    runner.run(app);
+    class PipelineApplication implements StreamApplication {
+
+      @Override
+      public void describe(StreamApplicationDescriptor appDesc) {
+        DelegatingSystemDescriptor sd = new DelegatingSystemDescriptor("test");
+        GenericInputDescriptor<KV<String, PageView>> isd =
+            sd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), 
new NoOpSerde<>()));
+        appDesc.getInputStream(isd)
+            .map(Values.create())
+            .partitionBy(pv -> pv.getMemberId(), pv -> pv, "p1")
+            .sink((m, collector, coordinator) -> {
+                received.add(m.getValue());
+              });
+      }
+    }
+
+    final ApplicationRunner runner = 
ApplicationRunners.getApplicationRunner(new PipelineApplication(), new 
MapConfig(configs));
+
+    runner.run();
     runner.waitForFinish();
 
     assertEquals(received.size(), count * partitionCount);

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
index 5336595..05818e9 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.test.controlmessages;
 
+import org.apache.samza.application.SamzaApplication;
 import scala.collection.JavaConverters;
 
 import java.lang.reflect.Field;
@@ -28,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.samza.Partition;
+import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
@@ -49,8 +51,9 @@ import org.apache.samza.operators.impl.TestOperatorImpl;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.processor.StreamProcessor;
 import org.apache.samza.processor.TestStreamProcessorUtil;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunners;
 import org.apache.samza.runtime.LocalApplicationRunner;
-import org.apache.samza.runtime.TestLocalApplicationRunner;
 import org.apache.samza.serializers.IntegerSerdeFactory;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
@@ -123,7 +126,7 @@ public class WatermarkIntegrationTest extends 
AbstractIntegrationTestHarness {
   @Test
   public void testWatermark() throws Exception {
     Map<String, String> configs = new HashMap<>();
-    configs.put("app.runner.class", 
"org.apache.samza.runtime.LocalApplicationRunner");
+    configs.put("app.runner.class", 
MockLocalApplicationRunner.class.getName());
     configs.put("systems.test.samza.factory", 
TestSystemFactory.class.getName());
     configs.put("streams.PageView.samza.system", "test");
     configs.put("streams.PageView.partitionCount", 
String.valueOf(PARTITION_COUNT));
@@ -147,22 +150,27 @@ public class WatermarkIntegrationTest extends 
AbstractIntegrationTestHarness {
     configs.put("serializers.registry.json.class", 
PageViewJsonSerdeFactory.class.getName());
 
     List<PageView> received = new ArrayList<>();
-    final StreamApplication app = (streamGraph, cfg) -> {
-      DelegatingSystemDescriptor sd = new DelegatingSystemDescriptor("test");
-      GenericInputDescriptor<KV<String, PageView>> isd =
-          sd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new 
NoOpSerde<>()));
-      streamGraph.getInputStream(isd)
-          .map(EndOfStreamIntegrationTest.Values.create())
-          .partitionBy(pv -> pv.getMemberId(), pv -> pv, "p1")
-          .sink((m, collector, coordinator) -> {
-              received.add(m.getValue());
-            });
-    };
-
-    LocalApplicationRunner runner = new LocalApplicationRunner(new 
MapConfig(configs));
-    runner.run(app);
+    class TestStreamApp implements StreamApplication {
+
+      @Override
+      public void describe(StreamApplicationDescriptor appDesc) {
+        DelegatingSystemDescriptor sd = new DelegatingSystemDescriptor("test");
+        GenericInputDescriptor<KV<String, PageView>> isd =
+            sd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), 
new NoOpSerde<>()));
+        appDesc.getInputStream(isd)
+            .map(EndOfStreamIntegrationTest.Values.create())
+            .partitionBy(pv -> pv.getMemberId(), pv -> pv, "p1")
+            .sink((m, collector, coordinator) -> {
+                received.add(m.getValue());
+              });
+      }
+    }
+
+    final ApplicationRunner runner = 
ApplicationRunners.getApplicationRunner(new TestStreamApp(), new 
MapConfig(configs));
+    runner.run();
+
     // processors are only available when the app is running
-    Map<String, StreamOperatorTask> tasks = getTaskOperationGraphs(runner);
+    Map<String, StreamOperatorTask> tasks = 
getTaskOperationGraphs((MockLocalApplicationRunner) runner);
 
     runner.waitForFinish();
     // wait for the completion to ensure that all tasks are actually 
initialized and the OperatorImplGraph is initialized
@@ -185,8 +193,8 @@ public class WatermarkIntegrationTest extends 
AbstractIntegrationTestHarness {
     assertEquals(TestOperatorImpl.getOutputWatermark(sink), 3);
   }
 
-  Map<String, StreamOperatorTask> 
getTaskOperationGraphs(LocalApplicationRunner runner) throws Exception {
-    StreamProcessor processor = 
TestLocalApplicationRunner.getProcessors(runner).iterator().next();
+  Map<String, StreamOperatorTask> 
getTaskOperationGraphs(MockLocalApplicationRunner runner) throws Exception {
+    StreamProcessor processor = runner.getProcessors().iterator().next();
     SamzaContainer container = TestStreamProcessorUtil.getContainer(processor);
     Map<TaskName, TaskInstance> taskInstances = 
JavaConverters.mapAsJavaMapConverter(container.getTaskInstances()).asJava();
     Map<String, StreamOperatorTask> tasks = new HashMap<>();
@@ -214,4 +222,20 @@ public class WatermarkIntegrationTest extends 
AbstractIntegrationTestHarness {
     }
     return null;
   }
+
+  public static class MockLocalApplicationRunner extends 
LocalApplicationRunner {
+
+    /**
+     * Default constructor that is required by any implementation of {@link 
ApplicationRunner}
+     *  @param userApp user application
+     * @param config user configuration
+     */
+    public MockLocalApplicationRunner(SamzaApplication userApp, Config config) 
{
+      super(userApp, config);
+    }
+
+    protected Set<StreamProcessor> getProcessors() {
+      return super.getProcessors();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java
index aca6c40..4caf266 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java
@@ -21,9 +21,9 @@ package org.apache.samza.test.framework;
 
 import java.util.Arrays;
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.system.kafka.KafkaInputDescriptor;
 import org.apache.samza.system.kafka.KafkaSystemDescriptor;
@@ -35,13 +35,14 @@ public class BroadcastAssertApp implements 
StreamApplication {
 
 
   @Override
-  public void init(StreamGraph graph, Config config) {
+  public void describe(StreamApplicationDescriptor appDesc) {
+    Config config = appDesc.getConfig();
     String inputTopic = config.get(INPUT_TOPIC_NAME_PROP);
 
     final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class);
     KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM);
     KafkaInputDescriptor<PageView> isd = ksd.getInputDescriptor(inputTopic, 
serde);
-    final MessageStream<PageView> broadcastPageViews = graph
+    final MessageStream<PageView> broadcastPageViews = appDesc
         .getInputStream(isd)
         .broadcast(serde, "pv");
 

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
index 6fdf887..1000f22 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
@@ -42,22 +42,21 @@ import org.junit.Test;
 
 import static org.apache.samza.test.controlmessages.TestData.PageView;
 
-
 public class StreamApplicationIntegrationTest {
 
-  final StreamApplication pageViewFilter = (streamGraph, cfg) -> {
+  final StreamApplication pageViewFilter = streamAppDesc -> {
     KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");
     KafkaInputDescriptor<KV<String, PageView>> isd =
         ksd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new 
NoOpSerde<>()));
-    MessageStream<KV<String, TestData.PageView>> inputStream = 
streamGraph.getInputStream(isd);
+    MessageStream<KV<String, TestData.PageView>> inputStream = 
streamAppDesc.getInputStream(isd);
     
inputStream.map(StreamApplicationIntegrationTest.Values.create()).filter(pv -> 
pv.getPageKey().equals("inbox"));
   };
 
-  final StreamApplication pageViewRepartition = (streamGraph, cfg) -> {
+  final StreamApplication pageViewRepartition = streamAppDesc -> {
     KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test");
     KafkaInputDescriptor<KV<String, PageView>> isd =
         ksd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new 
NoOpSerde<>()));
-    MessageStream<KV<String, TestData.PageView>> inputStream = 
streamGraph.getInputStream(isd);
+    MessageStream<KV<String, TestData.PageView>> inputStream = 
streamAppDesc.getInputStream(isd);
     inputStream
         .map(Values.create())
         .partitionBy(PageView::getMemberId, pv -> pv, "p1")

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
 
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
index 810d2c2..7f13282 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
@@ -18,6 +18,15 @@
  */
 package org.apache.samza.test.framework;
 
+import java.io.File;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import kafka.utils.TestUtils;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -31,23 +40,13 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.KafkaConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.execution.TestStreamManager;
-import org.apache.samza.runtime.AbstractApplicationRunner;
 import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunners;
 import org.apache.samza.system.kafka.KafkaSystemAdmin;
 import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
 import scala.Option;
 import scala.Option$;
 
-import java.io.File;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
 /**
  * Harness for writing integration tests for {@link StreamApplication}s.
  *
@@ -210,10 +209,9 @@ public class StreamApplicationIntegrationTestHarness 
extends AbstractIntegration
   }
 
   /**
-   * Executes the provided {@link StreamApplication} as a {@link 
org.apache.samza.job.local.ThreadJob}. The
-   * {@link StreamApplication} runs in its own separate thread.
+   * Executes the provided {@link 
org.apache.samza.application.StreamApplication} as a {@link 
org.apache.samza.job.local.ThreadJob}. The
+   * {@link org.apache.samza.application.StreamApplication} runs in its own 
separate thread.
    *
-   * @param streamApplication the application to run
    * @param appName the name of the application
    * @param overriddenConfigs configs to override
    * @return RunApplicationContext which contains objects created within 
runApplication, to be used for verification
@@ -223,7 +221,7 @@ public class StreamApplicationIntegrationTestHarness 
extends AbstractIntegration
       String appName,
       Map<String, String> overriddenConfigs) {
     Map<String, String> configMap = new HashMap<>();
-    configMap.put("job.factory.class", 
"org.apache.samza.job.local.ThreadJobFactory");
+    configMap.put("app.runner.class", 
"org.apache.samza.runtime.LocalApplicationRunner");
     configMap.put("job.name", appName);
     configMap.put("app.class", 
streamApplication.getClass().getCanonicalName());
     configMap.put("serializers.registry.json.class", 
"org.apache.samza.serializers.JsonSerdeFactory");
@@ -256,17 +254,13 @@ public class StreamApplicationIntegrationTestHarness 
extends AbstractIntegration
     }
 
     Config config = new MapConfig(configMap);
-    AbstractApplicationRunner runner = (AbstractApplicationRunner) 
ApplicationRunner.fromConfig(config);
-    runner.run(streamApplication);
+    ApplicationRunner runner = 
ApplicationRunners.getApplicationRunner(streamApplication, config);
+    runner.run();
 
     MessageStreamAssert.waitForComplete();
     return new RunApplicationContext(runner, config);
   }
 
-  public void setNumEmptyPolls(int numEmptyPolls) {
-    this.numEmptyPolls = numEmptyPolls;
-  }
-
   /**
    * Shutdown and clear Zookeeper and Kafka broker state.
    */
@@ -283,15 +277,15 @@ public class StreamApplicationIntegrationTestHarness 
extends AbstractIntegration
    * runApplication in order to do verification.
    */
   protected static class RunApplicationContext {
-    private final AbstractApplicationRunner runner;
+    private final ApplicationRunner runner;
     private final Config config;
 
-    private RunApplicationContext(AbstractApplicationRunner runner, Config 
config) {
+    private RunApplicationContext(ApplicationRunner runner, Config config) {
       this.runner = runner;
       this.config = config;
     }
 
-    public AbstractApplicationRunner getRunner() {
+    public ApplicationRunner getRunner() {
       return this.runner;
     }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java 
b/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java
index 91234b7..e72a965 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java
@@ -25,9 +25,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
+import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.TimerRegistry;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.TimerFunction;
@@ -40,11 +39,11 @@ public class TestTimerApp implements StreamApplication {
   public static final String PAGE_VIEWS = "page-views";
 
   @Override
-  public void init(StreamGraph graph, Config config) {
+  public void describe(StreamApplicationDescriptor appDesc) {
     final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class);
     KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("kafka");
     KafkaInputDescriptor<PageView> isd = ksd.getInputDescriptor(PAGE_VIEWS, 
serde);
-    final MessageStream<PageView> pageViews = graph.getInputStream(isd);
+    final MessageStream<PageView> pageViews = appDesc.getInputStream(isd);
     final MessageStream<PageView> output = pageViews.flatMap(new 
FlatmapTimerFn());
 
     MessageStreamAssert.that("Output from timer function should container all 
complete messages", output, serde)

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/framework/TimerTest.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/framework/TimerTest.java 
b/samza-test/src/test/java/org/apache/samza/test/framework/TimerTest.java
index a48409c..d4e0e14 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/TimerTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/TimerTest.java
@@ -19,11 +19,14 @@
 
 package org.apache.samza.test.framework;
 
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
 import org.junit.Before;
 import org.junit.Test;
 
-
-import static org.apache.samza.test.framework.TestTimerApp.PAGE_VIEWS;
+import static org.apache.samza.test.framework.TestTimerApp.*;
 
 public class TimerTest extends StreamApplicationIntegrationTestHarness {
 
@@ -44,7 +47,14 @@ public class TimerTest extends 
StreamApplicationIntegrationTestHarness {
   }
 
   @Test
-  public void testJob() {
-    runApplication(new TestTimerApp(), "TimerTest", null);
+  public void testJob() throws InterruptedException {
+    Map<String, String> configs = new HashMap<>();
+    configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
"org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
+    configs.put("job.systemstreampartition.grouper.factory", 
"org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory");
+    configs.put("task.name.grouper.factory", 
"org.apache.samza.container.grouper.task.SingleContainerGrouperFactory");
+    configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, 
"org.apache.samza.standalone.PassthroughCoordinationUtilsFactory");
+    configs.put(JobConfig.PROCESSOR_ID(), "0");
+
+    runApplication(new TestTimerApp(), "TimerTest", configs);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
index 8ee7e00..c63c11f 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
@@ -19,17 +19,17 @@
 
 package org.apache.samza.test.operator;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
 import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
@@ -42,9 +42,6 @@ import org.apache.samza.test.operator.data.AdClick;
 import org.apache.samza.test.operator.data.PageView;
 import org.apache.samza.test.operator.data.UserPageAdClick;
 
-import java.time.Duration;
-import org.apache.samza.util.CommandLine;
-
 
 /**
  * A {@link StreamApplication} that demonstrates a partitionBy, stream-stream 
join and a windowed count.
@@ -57,21 +54,11 @@ public class RepartitionJoinWindowApp implements 
StreamApplication {
 
   private final List<String> intermediateStreamIds = new ArrayList<>();
 
-  public static void main(String[] args) throws Exception {
-    CommandLine cmdLine = new CommandLine();
-    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-
-    RepartitionJoinWindowApp application = new RepartitionJoinWindowApp();
-    LocalApplicationRunner runner = new LocalApplicationRunner(config);
-
-    runner.run(application);
-    runner.waitForFinish();
-  }
-
   @Override
-  public void init(StreamGraph graph, Config config) {
+  public void describe(StreamApplicationDescriptor appDesc) {
     // offset.default = oldest required for tests since checkpoint topic is 
empty on start and messages are published
     // before the application is run
+    Config config = appDesc.getConfig();
     String inputTopic1 = config.get(INPUT_TOPIC_1_CONFIG_KEY);
     String inputTopic2 = config.get(INPUT_TOPIC_2_CONFIG_KEY);
     String outputTopic = config.get(OUTPUT_TOPIC_CONFIG_KEY);
@@ -79,8 +66,8 @@ public class RepartitionJoinWindowApp implements 
StreamApplication {
     KafkaInputDescriptor<PageView> id1 = ksd.getInputDescriptor(inputTopic1, 
new JsonSerdeV2<>(PageView.class));
     KafkaInputDescriptor<AdClick> id2 = ksd.getInputDescriptor(inputTopic2, 
new JsonSerdeV2<>(AdClick.class));
 
-    MessageStream<PageView> pageViews = graph.getInputStream(id1);
-    MessageStream<AdClick> adClicks = graph.getInputStream(id2);
+    MessageStream<PageView> pageViews = appDesc.getInputStream(id1);
+    MessageStream<AdClick> adClicks = appDesc.getInputStream(id2);
 
     MessageStream<KV<String, PageView>> pageViewsRepartitionedByViewId = 
pageViews
         .partitionBy(PageView::getViewId, pv -> pv,

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
index ae3669f..79a25e7 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
@@ -21,11 +21,9 @@ package org.apache.samza.test.operator;
 
 import java.time.Duration;
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
+import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.operators.KV;
-import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
@@ -34,7 +32,6 @@ import org.apache.samza.system.kafka.KafkaInputDescriptor;
 import org.apache.samza.system.kafka.KafkaOutputDescriptor;
 import org.apache.samza.system.kafka.KafkaSystemDescriptor;
 import org.apache.samza.test.operator.data.PageView;
-import org.apache.samza.util.CommandLine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,30 +45,21 @@ public class RepartitionWindowApp implements 
StreamApplication {
   static final String INPUT_TOPIC = "page-views";
   static final String OUTPUT_TOPIC = "Result";
 
-  public static void main(String[] args) {
-    CommandLine cmdLine = new CommandLine();
-    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    RepartitionWindowApp reparApp = new RepartitionWindowApp();
-    LocalApplicationRunner runner = new LocalApplicationRunner(config);
-
-    runner.run(reparApp);
-    runner.waitForFinish();
-  }
 
   @Override
-  public void init(StreamGraph graph, Config config) {
+  public void describe(StreamApplicationDescriptor appDesc) {
     KVSerde<String, PageView> inputSerde = KVSerde.of(new 
StringSerde("UTF-8"), new JsonSerdeV2<>(PageView.class));
     KVSerde<String, String> outputSerde = KVSerde.of(new StringSerde(), new 
StringSerde());
     KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM);
     KafkaInputDescriptor<KV<String, PageView>> id = 
ksd.getInputDescriptor(INPUT_TOPIC, inputSerde);
     KafkaOutputDescriptor<KV<String, String>> od = 
ksd.getOutputDescriptor(OUTPUT_TOPIC, outputSerde);
 
-    graph.getInputStream(id)
+    appDesc.getInputStream(id)
         .map(KV::getValue)
         .partitionBy(PageView::getUserId, m -> m, inputSerde, "p1")
         .window(Windows.keyedSessionWindow(m -> m.getKey(), 
Duration.ofSeconds(3), () -> 0, (m, c) -> c + 1, new StringSerde("UTF-8"), new 
IntegerSerde()), "w1")
         .map(wp -> KV.of(wp.getKey().getKey().toString(), 
String.valueOf(wp.getMessage())))
-        .sendTo(graph.getOutputStream(od));
+        .sendTo(appDesc.getOutputStream(od));
 
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java 
b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
index 3eb2662..f116f1d 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
@@ -20,14 +20,15 @@
 package org.apache.samza.test.operator;
 
 import java.time.Duration;
+import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunners;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
@@ -50,23 +51,21 @@ public class SessionWindowApp implements StreamApplication {
   public static void main(String[] args) {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    SessionWindowApp app = new SessionWindowApp();
-    LocalApplicationRunner runner = new LocalApplicationRunner(config);
-
-    runner.run(app);
+    ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new 
SessionWindowApp(), config);
+    runner.run();
     runner.waitForFinish();
   }
 
   @Override
-  public void init(StreamGraph graph, Config config) {
+  public void describe(StreamApplicationDescriptor appDesc) {
     JsonSerdeV2<PageView> inputSerde = new JsonSerdeV2<>(PageView.class);
     KVSerde<String, Integer> outputSerde = KVSerde.of(new StringSerde(), new 
IntegerSerde());
     KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM);
     KafkaInputDescriptor<PageView> id = ksd.getInputDescriptor(INPUT_TOPIC, 
inputSerde);
     KafkaOutputDescriptor<KV<String, Integer>> od = 
ksd.getOutputDescriptor(OUTPUT_TOPIC, outputSerde);
 
-    MessageStream<PageView> pageViews = graph.getInputStream(id);
-    OutputStream<KV<String, Integer>> outputStream = graph.getOutputStream(od);
+    MessageStream<PageView> pageViews = appDesc.getInputStream(id);
+    OutputStream<KV<String, Integer>> outputStream = 
appDesc.getOutputStream(od);
 
     pageViews
         .filter(m -> !FILTER_KEY.equals(m.getUserId()))

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
index 5fee9cf..144f125 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
@@ -19,11 +19,16 @@
 package org.apache.samza.test.operator;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.samza.Partition;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.TaskConfig;
 import org.apache.samza.system.SystemStreamMetadata;
 import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
 import org.apache.samza.system.kafka.KafkaSystemAdmin;
@@ -33,8 +38,7 @@ import org.apache.samza.util.ExponentialSleepStrategy;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.Collections;
-import java.util.List;
+import static org.junit.Assert.assertEquals;
 
 
 /**
@@ -77,6 +81,10 @@ public class TestRepartitionJoinWindowApp extends 
StreamApplicationIntegrationTe
     RepartitionJoinWindowApp app = new RepartitionJoinWindowApp();
     String appName = "UserPageAdClickCounter";
     Map<String, String> configs = new HashMap<>();
+    configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
"org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
+    configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, 
"org.apache.samza.standalone.PassthroughCoordinationUtilsFactory");
+    configs.put(JobConfig.PROCESSOR_ID(), "0");
+    configs.put(TaskConfig.GROUPER_FACTORY(), 
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
     configs.put("systems.kafka.samza.delete.committed.messages", "false");
     configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_1_CONFIG_KEY, 
inputTopicName1);
     configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_2_CONFIG_KEY, 
inputTopicName2);
@@ -86,7 +94,7 @@ public class TestRepartitionJoinWindowApp extends 
StreamApplicationIntegrationTe
 
     // consume and validate result
     List<ConsumerRecord<String, String>> messages = 
consumeMessages(Collections.singletonList(outputTopicName), 2);
-    Assert.assertEquals(2, messages.size());
+    assertEquals(2, messages.size());
 
     Assert.assertFalse(KafkaSystemAdmin.deleteMessagesCalled());
   }
@@ -103,22 +111,26 @@ public class TestRepartitionJoinWindowApp extends 
StreamApplicationIntegrationTe
     RepartitionJoinWindowApp app = new RepartitionJoinWindowApp();
     final String appName = "UserPageAdClickCounter2";
     Map<String, String> configs = new HashMap<>();
+    configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
"org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
+    configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, 
"org.apache.samza.standalone.PassthroughCoordinationUtilsFactory");
+    configs.put(JobConfig.PROCESSOR_ID(), "0");
+    configs.put(TaskConfig.GROUPER_FACTORY(), 
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
     configs.put("systems.kafka.samza.delete.committed.messages", "true");
     configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_1_CONFIG_KEY, 
inputTopicName1);
     configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_2_CONFIG_KEY, 
inputTopicName2);
     configs.put(RepartitionJoinWindowApp.OUTPUT_TOPIC_CONFIG_KEY, 
outputTopicName);
 
-    RunApplicationContext runApplicationContext = runApplication(app, appName, 
configs);
+    runApplication(app, appName, configs);
 
     // consume and validate result
     List<ConsumerRecord<String, String>> messages = 
consumeMessages(Collections.singletonList(outputTopicName), 2);
-    Assert.assertEquals(2, messages.size());
+    assertEquals(2, messages.size());
 
     for (ConsumerRecord<String, String> message : messages) {
       String key = message.key();
       String value = message.value();
       Assert.assertTrue(key.equals("u1") || key.equals("u2"));
-      Assert.assertEquals("2", value);
+      assertEquals("2", value);
     }
 
     // Verify that messages in the intermediate stream will be deleted in 10 
seconds
@@ -137,7 +149,7 @@ public class TestRepartitionJoinWindowApp extends 
StreamApplicationIntegrationTe
           remainingMessageNum += Long.parseLong(metadata.getUpcomingOffset()) 
- Long.parseLong(metadata.getOldestOffset());
         }
       }
-      Assert.assertEquals(0, remainingMessageNum);
+      assertEquals(0, remainingMessageNum);
     }
   }
 
@@ -147,6 +159,10 @@ public class TestRepartitionJoinWindowApp extends 
StreamApplicationIntegrationTe
     String inputTopicName2 = "ad-clicks";
     String outputTopicName = "user-ad-click-counts";
     Map<String, String> configs = new HashMap<>();
+    configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
"org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
+    configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, 
"org.apache.samza.standalone.PassthroughCoordinationUtilsFactory");
+    configs.put(JobConfig.PROCESSOR_ID(), "0");
+    configs.put(TaskConfig.GROUPER_FACTORY(), 
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
     configs.put(BroadcastAssertApp.INPUT_TOPIC_NAME_PROP, inputTopicName1);
 
     initializeTopics(inputTopicName1, inputTopicName2, outputTopicName);

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
index 6373292..2e1de96 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
@@ -18,11 +18,13 @@
  */
 package org.apache.samza.test.operator;
 
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
-import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.test.framework.StreamApplicationIntegrationTestHarness;
 import org.apache.samza.test.operator.data.PageView;
@@ -30,9 +32,6 @@ import org.codehaus.jackson.map.ObjectMapper;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.Collections;
-import java.util.List;
-
 import static org.apache.samza.test.operator.RepartitionWindowApp.*;
 
 /**
@@ -63,10 +62,11 @@ public class TestRepartitionWindowApp extends 
StreamApplicationIntegrationTestHa
     Map<String, String> configs = new HashMap<>();
     configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
"org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
     configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, 
"org.apache.samza.standalone.PassthroughCoordinationUtilsFactory");
+    configs.put(JobConfig.PROCESSOR_ID(), "0");
     configs.put(TaskConfig.GROUPER_FACTORY(), 
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
 
     // run the application
-    runApplication(new RepartitionWindowApp(), APP_NAME, new 
MapConfig(configs));
+    runApplication(new RepartitionWindowApp(), APP_NAME, configs);
 
     // consume and validate result
     List<ConsumerRecord<String, String>> messages = 
consumeMessages(Collections.singletonList(OUTPUT_TOPIC), 2);

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
 
b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
index d77a2a6..0184013 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
@@ -20,14 +20,15 @@
 package org.apache.samza.test.operator;
 
 import java.time.Duration;
+import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunners;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
@@ -51,23 +52,22 @@ public class TumblingWindowApp implements StreamApplication 
{
   public static void main(String[] args) {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    TumblingWindowApp app = new TumblingWindowApp();
-    LocalApplicationRunner runner = new LocalApplicationRunner(config);
+    ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new 
TumblingWindowApp(), config);
 
-    runner.run(app);
+    runner.run();
     runner.waitForFinish();
   }
 
   @Override
-  public void init(StreamGraph graph, Config config) {
+  public void describe(StreamApplicationDescriptor appDesc) {
     JsonSerdeV2<PageView> inputSerde = new JsonSerdeV2<>(PageView.class);
     KVSerde<String, Integer> outputSerde = KVSerde.of(new StringSerde(), new 
IntegerSerde());
     KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM);
     KafkaInputDescriptor<PageView> id = ksd.getInputDescriptor(INPUT_TOPIC, 
inputSerde);
     KafkaOutputDescriptor<KV<String, Integer>> od = 
ksd.getOutputDescriptor(OUTPUT_TOPIC, outputSerde);
 
-    MessageStream<PageView> pageViews = graph.getInputStream(id);
-    OutputStream<KV<String, Integer>> outputStream = graph.getOutputStream(od);
+    MessageStream<PageView> pageViews = appDesc.getInputStream(id);
+    OutputStream<KV<String, Integer>> outputStream = 
appDesc.getOutputStream(od);
 
     pageViews
         .filter(m -> !FILTER_KEY.equals(m.getUserId()))

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java
 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java
index 1b778e8..51f33b5 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java
@@ -23,15 +23,13 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.Serializable;
 import java.util.concurrent.CountDownLatch;
+import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.descriptors.GenericInputDescriptor;
-import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.StringSerde;
@@ -43,7 +41,7 @@ import org.apache.samza.system.kafka.KafkaSystemDescriptor;
 /**
  * Test class to create an {@link StreamApplication} instance
  */
-public class TestStreamApplication implements StreamApplication, Serializable {
+public class TestStreamApplication implements StreamApplication {
 
   private final String systemName;
   private final String inputTopic;
@@ -61,47 +59,57 @@ public class TestStreamApplication implements 
StreamApplication, Serializable {
   }
 
   @Override
-  public void init(StreamGraph graph, Config config) {
+  public void describe(StreamApplicationDescriptor streamAppDesc) {
     KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(systemName);
     KafkaInputDescriptor<String> isd = ksd.getInputDescriptor(inputTopic, new 
NoOpSerde<>());
     KafkaOutputDescriptor<String> osd = ksd.getOutputDescriptor(outputTopic, 
new StringSerde());
-    MessageStream<String> inputStream = graph.getInputStream(isd);
-    OutputStream<String> outputStream = graph.getOutputStream(osd);
-    inputStream.map(new MapFunction<String, String>() {
-      transient CountDownLatch latch1;
-      transient CountDownLatch latch2;
-      transient StreamApplicationCallback callback;
-
-      @Override
-      public String apply(String message) {
-        TestKafkaEvent incomingMessage = TestKafkaEvent.fromString(message);
-        if (callback != null) {
-          callback.onMessage(incomingMessage);
-        }
-        if (latch1 != null) {
-          latch1.countDown();
-        }
-        if (latch2 != null) {
-          latch2.countDown();
-        }
-        return incomingMessage.toString();
-      }
-
-      private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-        in.defaultReadObject();
-        SharedContextFactories.SharedContextFactory contextFactory =
-            
SharedContextFactories.getGlobalSharedContextFactory(appName).getProcessorSharedContextFactory(processorName);
-        this.latch1 = (CountDownLatch) 
contextFactory.getSharedObject("processedMsgLatch");
-        this.latch2 = (CountDownLatch) 
contextFactory.getSharedObject("kafkaMsgsConsumedLatch");
-        this.callback = (StreamApplicationCallback) 
contextFactory.getSharedObject("callback");
-      }
-    }).sendTo(outputStream);
+    MessageStream<String> inputStream = streamAppDesc.getInputStream(isd);
+    OutputStream<String> outputStream = streamAppDesc.getOutputStream(osd);
+    inputStream.map(new TestMapFunction(appName, 
processorName)).sendTo(outputStream);
   }
 
   public interface StreamApplicationCallback {
     void onMessage(TestKafkaEvent m);
   }
 
+  public static class TestMapFunction implements MapFunction<String, String> {
+    private final String appName;
+    private final String processorName;
+
+    private transient CountDownLatch latch1;
+    private transient CountDownLatch latch2;
+    private transient StreamApplicationCallback callback;
+
+    TestMapFunction(String appName, String processorName) {
+      this.appName = appName;
+      this.processorName = processorName;
+    }
+
+    @Override
+    public String apply(String message) {
+      TestKafkaEvent incomingMessage = TestKafkaEvent.fromString(message);
+      if (callback != null) {
+        callback.onMessage(incomingMessage);
+      }
+      if (latch1 != null) {
+        latch1.countDown();
+      }
+      if (latch2 != null) {
+        latch2.countDown();
+      }
+      return incomingMessage.toString();
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+      in.defaultReadObject();
+      SharedContextFactories.SharedContextFactory contextFactory =
+          
SharedContextFactories.getGlobalSharedContextFactory(appName).getProcessorSharedContextFactory(processorName);
+      this.latch1 = (CountDownLatch) 
contextFactory.getSharedObject("processedMsgLatch");
+      this.latch2 = (CountDownLatch) 
contextFactory.getSharedObject("kafkaMsgsConsumedLatch");
+      this.callback = (StreamApplicationCallback) 
contextFactory.getSharedObject("callback");
+    }
+  }
+
   public static class TestKafkaEvent implements Serializable {
 
     // Actual content of the event.
@@ -142,7 +150,7 @@ public class TestStreamApplication implements 
StreamApplication, Serializable {
       StreamApplicationCallback callback,
       CountDownLatch kafkaEventsConsumedLatch,
       Config config) {
-    String appName = String.format("%s-%s", 
config.get(ApplicationConfig.APP_NAME), config.get(ApplicationConfig.APP_ID));
+    String appName = new ApplicationConfig(config).getGlobalAppId();
     String processorName = config.get(JobConfig.PROCESSOR_ID());
     registerLatches(processedMessageLatch, kafkaEventsConsumedLatch, callback, 
appName, processorName);
 

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
index c37132f..fc62b0a 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
@@ -42,7 +42,7 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.processor.StreamProcessor;
-import org.apache.samza.processor.StreamProcessorLifecycleListener;
+import org.apache.samza.runtime.ProcessorLifecycleListener;
 import org.apache.samza.task.AsyncStreamTaskAdapter;
 import org.apache.samza.task.AsyncStreamTaskFactory;
 import org.apache.samza.task.StreamTaskFactory;
@@ -52,10 +52,7 @@ import org.junit.Assert;
 import org.junit.Test;
 import scala.Option$;
 
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.*;
 
 
 public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
@@ -232,7 +229,7 @@ public class TestStreamProcessor extends 
StandaloneIntegrationTestHarness {
     KafkaConsumer consumer;
     KafkaProducer producer;
     StreamProcessor processor;
-    StreamProcessorLifecycleListener listener;
+    ProcessorLifecycleListener listener;
 
     private TestStubs(String bootstrapServer) {
       shutdownLatch = new CountDownLatch(1);
@@ -266,13 +263,14 @@ public class TestStreamProcessor extends 
StandaloneIntegrationTestHarness {
     }
 
     private void initProcessorListener() {
-      listener = mock(StreamProcessorLifecycleListener.class);
-      doNothing().when(listener).onStart();
-      doNothing().when(listener).onFailure(anyObject());
+      listener = mock(ProcessorLifecycleListener.class);
+      doNothing().when(listener).afterStart();
+      doNothing().when(listener).afterFailure(any());
       doAnswer(invocation -> {
+          // stopped successfully
           shutdownLatch.countDown();
           return null;
-        }).when(listener).onShutdown();
+        }).when(listener).afterStop();
     }
 
     private void initProducer(String bootstrapServer) {

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index e34ee4a..3b2d08a 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -23,7 +23,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -37,7 +37,6 @@ import org.I0Itec.zkclient.ZkClient;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.samza.SamzaException;
-import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
@@ -50,10 +49,10 @@ import org.apache.samza.container.TaskName;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.TaskModel;
-import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunners;
 import org.apache.samza.test.StandaloneIntegrationTestHarness;
 import org.apache.samza.test.StandaloneTestUtils;
-import 
org.apache.samza.test.processor.TestStreamApplication.StreamApplicationCallback;
 import org.apache.samza.util.NoOpMetricsRegistry;
 import org.apache.samza.zk.ZkJobCoordinatorFactory;
 import org.apache.samza.zk.ZkKeyBuilder;
@@ -70,10 +69,11 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 /**
- * Integration tests for {@link LocalApplicationRunner}.
+ * Integration tests for {@link 
org.apache.samza.runtime.LocalApplicationRunner} with {@link 
ZkJobCoordinatorFactory}.
  *
- * Brings up embedded ZooKeeper, Kafka broker and launches multiple {@link 
StreamApplication} through
- * {@link LocalApplicationRunner} to verify the guarantees made in stand alone 
execution environment.
+ * Brings up embedded ZooKeeper, Kafka broker and launches multiple {@link 
org.apache.samza.application.StreamApplication}
+ * through {@link org.apache.samza.runtime.LocalApplicationRunner} to verify 
the guarantees made in stand alone execution
+ * environment.
  */
 public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarness {
 
@@ -157,6 +157,7 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
   }
 
   public void tearDown() {
+    SharedContextFactories.clearAll();
     for (String kafkaTopic : ImmutableList.of(inputKafkaTopic, 
outputKafkaTopic)) {
       LOGGER.info("Deleting kafka topic: {}.", kafkaTopic);
       AdminUtils.deleteTopic(zkUtils(), kafkaTopic);
@@ -174,7 +175,7 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     for (int eventIndex = startIndex; eventIndex < endIndex; eventIndex++) {
       try {
         LOGGER.info("Publish kafka event with index : {} for stream processor: 
{}.", eventIndex, streamProcessorId);
-        producer.send(new ProducerRecord(topic, new 
TestKafkaEvent(streamProcessorId, 
String.valueOf(eventIndex)).toString().getBytes()));
+        producer.send(new ProducerRecord(topic, new 
TestStreamApplication.TestKafkaEvent(streamProcessorId, 
String.valueOf(eventIndex)).toString().getBytes()));
       } catch (Exception  e) {
         LOGGER.error("Publishing to kafka topic: {} resulted in exception: 
{}.", new Object[]{topic, e});
         throw new SamzaException(e);
@@ -195,6 +196,7 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
         .put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
TEST_JOB_COORDINATOR_FACTORY)
         .put(ApplicationConfig.APP_NAME, appName)
         .put(ApplicationConfig.APP_ID, appId)
+        .put("app.runner.class", 
"org.apache.samza.runtime.LocalApplicationRunner")
         .put(String.format("systems.%s.samza.factory", systemName), 
TEST_SYSTEM_FACTORY)
         .put(JobConfig.JOB_NAME(), appName)
         .put(JobConfig.JOB_ID(), appId)
@@ -210,13 +212,13 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
 
   /**
    * sspGrouper is set to GroupBySystemStreamPartitionFactory.
-   * Run a stream application(streamApp1) consuming messages from input 
topic(effectively one container).
+   * Run a stream application(appRunner1) consuming messages from input 
topic(effectively one container).
    *
-   * In the callback triggered by streamApp1 after processing a message, bring 
up an another stream application(streamApp2).
+   * In the callback triggered by appRunner1 after processing a message, bring 
up an another stream application(appRunner2).
    *
    * Assertions:
-   *           A) JobModel generated before and after the addition of 
streamApp2 should be equal.
-   *           B) Second stream application(streamApp2) should not join the 
group and process any message.
+   *           A) JobModel generated before and after the addition of 
appRunner2 should be equal.
+   *           B) Second stream application(appRunner2) should not join the 
group and process any message.
    */
 
   @Test
@@ -234,28 +236,27 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     final CountDownLatch secondProcessorRegistered = new CountDownLatch(1);
 
     zkUtils.subscribeToProcessorChange((parentPath, currentChilds) -> {
-        // When streamApp2 with id: PROCESSOR_IDS[1] is registered, start 
processing message in streamApp1.
+        // When appRunner2 with id: PROCESSOR_IDS[1] is registered, run 
processing message in appRunner1.
         if (currentChilds.contains(PROCESSOR_IDS[1])) {
           secondProcessorRegistered.countDown();
         }
       });
 
-    // Set up stream app 2.
+    // Set up stream app appRunner2.
     CountDownLatch processedMessagesLatch = new 
CountDownLatch(NUM_KAFKA_EVENTS);
-    Config testAppConfig2 = new MapConfig(applicationConfig2, testConfig);
-    LocalApplicationRunner localApplicationRunner2 = new 
LocalApplicationRunner(testAppConfig2);
-    StreamApplication streamApp2 = TestStreamApplication.getInstance(
-        TEST_SYSTEM, inputSinglePartitionKafkaTopic, 
outputSinglePartitionKafkaTopic,
-        processedMessagesLatch, null, null, testAppConfig2);
+    Config localTestConfig2 = new MapConfig(applicationConfig2, testConfig);
+    ApplicationRunner appRunner2 = 
ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+        TEST_SYSTEM, inputSinglePartitionKafkaTopic, 
outputSinglePartitionKafkaTopic, processedMessagesLatch,
+        null, null, localTestConfig2), localTestConfig2);
 
-    // Callback handler for streamApp1.
-    StreamApplicationCallback streamApplicationCallback = message -> {
+    // Callback handler for appRunner1.
+    TestStreamApplication.StreamApplicationCallback callback = m -> {
       if (hasSecondProcessorJoined.compareAndSet(false, true)) {
         previousJobModelVersion[0] = zkUtils.getJobModelVersion();
         previousJobModel[0] = zkUtils.getJobModel(previousJobModelVersion[0]);
-        localApplicationRunner2.run(streamApp2);
+        appRunner2.run();
         try {
-          // Wait for streamApp2 to register with zookeeper.
+          // Wait for appRunner2 to register with zookeeper.
           secondProcessorRegistered.await();
         } catch (InterruptedException e) {
         }
@@ -264,13 +265,12 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
 
     CountDownLatch kafkaEventsConsumedLatch = new 
CountDownLatch(NUM_KAFKA_EVENTS * 2);
 
-    // Set up stream app 1.
-    Config testAppConfig1 = new MapConfig(applicationConfig1, testConfig);
-    LocalApplicationRunner localApplicationRunner1 = new 
LocalApplicationRunner(testAppConfig1);
-    StreamApplication streamApp1 = TestStreamApplication.getInstance(
-        TEST_SYSTEM, inputSinglePartitionKafkaTopic, 
outputSinglePartitionKafkaTopic,
-        null, streamApplicationCallback, kafkaEventsConsumedLatch, 
testAppConfig1);
-    localApplicationRunner1.run(streamApp1);
+    // Set up stream app appRunner1.
+    Config localTestConfig1 = new MapConfig(applicationConfig1, testConfig);
+    ApplicationRunner appRunner1 = 
ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+        TEST_SYSTEM, inputSinglePartitionKafkaTopic, 
outputSinglePartitionKafkaTopic, null,
+        callback, kafkaEventsConsumedLatch, localTestConfig1), 
localTestConfig1);
+    appRunner1.run();
 
     kafkaEventsConsumedLatch.await();
 
@@ -281,12 +281,12 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     assertEquals(previousJobModel[0], updatedJobModel);
     assertEquals(new MapConfig(), updatedJobModel.getConfig());
     assertEquals(NUM_KAFKA_EVENTS, processedMessagesLatch.getCount());
-    localApplicationRunner1.kill(streamApp1);
-    localApplicationRunner1.waitForFinish();
-    localApplicationRunner2.kill(streamApp2);
-    localApplicationRunner2.waitForFinish();
-    assertEquals(localApplicationRunner1.status(streamApp1), 
ApplicationStatus.SuccessfulFinish);
-    assertEquals(localApplicationRunner2.status(streamApp2), 
ApplicationStatus.UnsuccessfulFinish);
+    appRunner1.kill();
+    appRunner1.waitForFinish();
+    appRunner2.kill();
+    appRunner2.waitForFinish();
+    assertEquals(appRunner1.status(), ApplicationStatus.SuccessfulFinish);
+    assertEquals(appRunner2.status(), ApplicationStatus.UnsuccessfulFinish);
   }
 
   /**
@@ -309,7 +309,8 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS * 2, 
PROCESSOR_IDS[0]);
 
     // Configuration, verification variables
-    MapConfig testConfig = new 
MapConfig(ImmutableMap.of(JobConfig.SSP_GROUPER_FACTORY(), 
"org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory", 
JobConfig.JOB_DEBOUNCE_TIME_MS(), "10"));
+    MapConfig testConfig = new 
MapConfig(ImmutableMap.of(JobConfig.SSP_GROUPER_FACTORY(),
+        
"org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory", 
JobConfig.JOB_DEBOUNCE_TIME_MS(), "10"));
     // Declared as final array to update it from streamApplication 
callback(Variable should be declared final to access in lambda block).
     final JobModel[] previousJobModel = new JobModel[1];
     final String[] previousJobModelVersion = new String[1];
@@ -317,44 +318,43 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     final CountDownLatch secondProcessorRegistered = new CountDownLatch(1);
 
     zkUtils.subscribeToProcessorChange((parentPath, currentChilds) -> {
-        // When streamApp2 with id: PROCESSOR_IDS[1] is registered, start 
processing message in streamApp1.
+        // When appRunner2 with id: PROCESSOR_IDS[1] is registered, start 
processing message in appRunner1.
         if (currentChilds.contains(PROCESSOR_IDS[1])) {
           secondProcessorRegistered.countDown();
         }
       });
 
-    // Set up streamApp2.
+    // Set up appRunner2.
     CountDownLatch processedMessagesLatch = new 
CountDownLatch(NUM_KAFKA_EVENTS * 2);
     Config testAppConfig2 = new MapConfig(applicationConfig2, testConfig);
-    LocalApplicationRunner localApplicationRunner2 = new 
LocalApplicationRunner(testAppConfig2);
-    StreamApplication streamApp2 = TestStreamApplication.getInstance(
-        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, 
processedMessagesLatch, null, null, testAppConfig2);
+    ApplicationRunner appRunner2 = 
ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, 
processedMessagesLatch, null,
+        null, testAppConfig2), testAppConfig2);
 
-    // Callback handler for streamApp1.
-    StreamApplicationCallback streamApplicationCallback = message -> {
+    // Callback handler for appRunner1.
+    TestStreamApplication.StreamApplicationCallback streamApplicationCallback 
= message -> {
       if (hasSecondProcessorJoined.compareAndSet(false, true)) {
         previousJobModelVersion[0] = zkUtils.getJobModelVersion();
         previousJobModel[0] = zkUtils.getJobModel(previousJobModelVersion[0]);
-        localApplicationRunner2.run(streamApp2);
+        appRunner2.run();
         try {
-          // Wait for streamApp2 to register with zookeeper.
+          // Wait for appRunner2 to register with zookeeper.
           secondProcessorRegistered.await();
         } catch (InterruptedException e) {
         }
       }
     };
 
-    // This is the latch for the messages received by streamApp1. Since 
streamApp1 is run first, it gets one event
-    // redelivered due to re-balancing done by Zk after the streamApp2 joins 
(See the callback above).
+    // This is the latch for the messages received by appRunner1. Since 
appRunner1 is run first, it gets one event
+    // redelivered due to re-balancing done by Zk after the appRunner2 joins 
(See the callback above).
     CountDownLatch kafkaEventsConsumedLatch = new 
CountDownLatch(NUM_KAFKA_EVENTS * 2 + 1);
 
-    // Set up stream app 1.
+    // Set up stream app appRunner1.
     Config testAppConfig1 = new MapConfig(applicationConfig1, testConfig);
-    LocalApplicationRunner localApplicationRunner1 = new 
LocalApplicationRunner(testAppConfig1);
-    StreamApplication streamApp1 = TestStreamApplication.getInstance(
-        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, null,
-        streamApplicationCallback, kafkaEventsConsumedLatch, testAppConfig1);
-    localApplicationRunner1.run(streamApp1);
+    ApplicationRunner appRunner1 = 
ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, null, 
streamApplicationCallback,
+        kafkaEventsConsumedLatch, testAppConfig1), testAppConfig1);
+    appRunner1.run();
 
     kafkaEventsConsumedLatch.await();
 
@@ -386,12 +386,12 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
 
     processedMessagesLatch.await();
 
-    assertEquals(ApplicationStatus.Running, 
localApplicationRunner2.status(streamApp2));
-    localApplicationRunner1.kill(streamApp1);
-    localApplicationRunner1.waitForFinish();
-    localApplicationRunner2.kill(streamApp2);
-    localApplicationRunner2.waitForFinish();
-    assertEquals(localApplicationRunner1.status(streamApp1), 
ApplicationStatus.SuccessfulFinish);
+    assertEquals(ApplicationStatus.Running, appRunner2.status());
+    appRunner1.kill();
+    appRunner1.waitForFinish();
+    appRunner2.kill();
+    appRunner2.waitForFinish();
+    assertEquals(ApplicationStatus.SuccessfulFinish, appRunner1.status());
   }
 
   @Test
@@ -405,23 +405,18 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
     CountDownLatch processedMessagesLatch3 = new CountDownLatch(1);
 
-    StreamApplication streamApp1 = TestStreamApplication.getInstance(
-        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic,
-        processedMessagesLatch1, null, kafkaEventsConsumedLatch, 
applicationConfig1);
-    StreamApplication streamApp2 = TestStreamApplication.getInstance(
-        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic,
-        processedMessagesLatch2, null, kafkaEventsConsumedLatch, 
applicationConfig2);
-    StreamApplication streamApp3 = TestStreamApplication.getInstance(
-        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic,
-        processedMessagesLatch3, null, kafkaEventsConsumedLatch, 
applicationConfig3);
+    ApplicationRunner appRunner1 = 
ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, 
processedMessagesLatch1, null, kafkaEventsConsumedLatch,
+        applicationConfig1), applicationConfig1);
+    ApplicationRunner appRunner2 = 
ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, 
processedMessagesLatch2, null, kafkaEventsConsumedLatch,
+        applicationConfig2), applicationConfig2);
+    ApplicationRunner appRunner3 = 
ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, 
processedMessagesLatch3, null, kafkaEventsConsumedLatch,
+        applicationConfig3), applicationConfig3);
 
-    // Create LocalApplicationRunners
-    LocalApplicationRunner applicationRunner1 = new 
LocalApplicationRunner(applicationConfig1);
-    LocalApplicationRunner applicationRunner2 = new 
LocalApplicationRunner(applicationConfig2);
-
-    // Run stream applications.
-    applicationRunner1.run(streamApp1);
-    applicationRunner2.run(streamApp2);
+    appRunner1.run();
+    appRunner2.run();
 
     // Wait until all processors have processed a message.
     processedMessagesLatch1.await();
@@ -439,21 +434,18 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     assertEquals(2, processorIdsFromZK.size());
     assertEquals(PROCESSOR_IDS[0], processorIdsFromZK.get(0));
 
-    // Kill the leader. Since streamApp1 is the first to join the cluster, 
it's the leader.
-    applicationRunner1.kill(streamApp1);
-    applicationRunner1.waitForFinish();
-
-    assertEquals(applicationRunner1.status(streamApp1), 
ApplicationStatus.SuccessfulFinish);
+    // Kill the leader. Since appRunner1 is the first to join the cluster, 
it's the leader.
+    appRunner1.kill();
+    appRunner1.waitForFinish();
+    assertEquals(ApplicationStatus.SuccessfulFinish, appRunner1.status());
 
     kafkaEventsConsumedLatch.await();
     publishKafkaEvents(inputKafkaTopic, 0, 2 * NUM_KAFKA_EVENTS, 
PROCESSOR_IDS[0]);
 
-    LocalApplicationRunner applicationRunner3 = new 
LocalApplicationRunner(applicationConfig3);
-    applicationRunner3.run(streamApp3);
+    appRunner3.run();
     processedMessagesLatch3.await();
 
     // Verifications after killing the leader.
-    assertEquals(ApplicationStatus.SuccessfulFinish, 
applicationRunner1.status(streamApp1));
     processorIdsFromZK = 
zkUtils.getActiveProcessorsIDs(ImmutableList.of(PROCESSOR_IDS[1], 
PROCESSOR_IDS[2]));
     assertEquals(2, processorIdsFromZK.size());
     assertEquals(PROCESSOR_IDS[1], processorIdsFromZK.get(0));
@@ -462,12 +454,12 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     assertEquals(Sets.newHashSet("0000000001", "0000000002"), 
jobModel.getContainers().keySet());
     assertEquals(2, jobModel.getContainers().size());
 
-    applicationRunner2.kill(streamApp2);
-    applicationRunner2.waitForFinish();
-    assertEquals(applicationRunner2.status(streamApp2), 
ApplicationStatus.SuccessfulFinish);
-    applicationRunner3.kill(streamApp3);
-    applicationRunner3.waitForFinish();
-    assertEquals(applicationRunner3.status(streamApp2), 
ApplicationStatus.SuccessfulFinish);
+    appRunner2.kill();
+    appRunner2.waitForFinish();
+    assertEquals(ApplicationStatus.SuccessfulFinish, appRunner2.status());
+    appRunner3.kill();
+    appRunner3.waitForFinish();
+    assertEquals(ApplicationStatus.SuccessfulFinish, appRunner3.status());
   }
 
   @Test
@@ -480,42 +472,37 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
     CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
 
-    StreamApplication streamApp1 = TestStreamApplication.getInstance(
-        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic,
-        processedMessagesLatch1, null, kafkaEventsConsumedLatch, 
applicationConfig1);
-    StreamApplication streamApp2 = TestStreamApplication.getInstance(
-        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic,
-        processedMessagesLatch2, null, kafkaEventsConsumedLatch, 
applicationConfig2);
-
-    // Create LocalApplicationRunners
-    LocalApplicationRunner applicationRunner1 = new 
LocalApplicationRunner(applicationConfig1);
-    LocalApplicationRunner applicationRunner2 = new 
LocalApplicationRunner(applicationConfig2);
+    ApplicationRunner appRunner1 = 
ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, 
processedMessagesLatch1, null, kafkaEventsConsumedLatch,
+        applicationConfig1), applicationConfig1);
+    ApplicationRunner appRunner2 = 
ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, 
processedMessagesLatch2, null, kafkaEventsConsumedLatch,
+        applicationConfig2), applicationConfig2);
 
     // Run stream applications.
-    applicationRunner1.run(streamApp1);
-    applicationRunner2.run(streamApp2);
+    appRunner1.run();
+    appRunner2.run();
 
-    // Wait for message processing to start in both the processors.
+    // Wait for message processing to run in both the processors.
     processedMessagesLatch1.await();
     processedMessagesLatch2.await();
 
-    MapConfig appConfig = new ApplicationConfig(new 
MapConfig(applicationConfig2, ImmutableMap.of(ZkConfig.ZK_SESSION_TIMEOUT_MS, 
"10")));
-    LocalApplicationRunner applicationRunner3 = new 
LocalApplicationRunner(appConfig);
-
     // Create a stream app with same processor id as SP2 and run it. It should 
fail.
-    StreamApplication streamApp3 = TestStreamApplication.getInstance(
-        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic,
-        null, null, kafkaEventsConsumedLatch, applicationConfig2);
+    publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, 2 * 
NUM_KAFKA_EVENTS, PROCESSOR_IDS[2]);
+    kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS);
+    ApplicationRunner appRunner3 = 
ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, null, null, 
kafkaEventsConsumedLatch,
+        applicationConfig2), applicationConfig2);
     // Fail when the duplicate processor joins.
     expectedException.expect(SamzaException.class);
     try {
-      applicationRunner3.run(streamApp3);
+      appRunner3.run();
     } finally {
-      applicationRunner1.kill(streamApp1);
-      applicationRunner2.kill(streamApp2);
+      appRunner1.kill();
+      appRunner2.kill();
 
-      applicationRunner1.waitForFinish();
-      applicationRunner2.waitForFinish();
+      appRunner1.waitForFinish();
+      appRunner2.waitForFinish();
     }
   }
 
@@ -533,24 +520,24 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]);
     Config applicationConfig2 = new MapConfig(configMap);
 
-    LocalApplicationRunner applicationRunner1 = new 
LocalApplicationRunner(applicationConfig1);
-    LocalApplicationRunner applicationRunner2 = new 
LocalApplicationRunner(applicationConfig2);
+    List<TestStreamApplication.TestKafkaEvent> messagesProcessed = new 
ArrayList<>();
+    TestStreamApplication.StreamApplicationCallback streamApplicationCallback 
= messagesProcessed::add;
 
     // Create StreamApplication from configuration.
     CountDownLatch kafkaEventsConsumedLatch = new 
CountDownLatch(NUM_KAFKA_EVENTS);
     CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
     CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
 
-    StreamApplication streamApp1 = TestStreamApplication.getInstance(
-        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic,
-        processedMessagesLatch1, null, kafkaEventsConsumedLatch, 
applicationConfig1);
-    StreamApplication streamApp2 = TestStreamApplication.getInstance(
-        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic,
-        processedMessagesLatch2, null, kafkaEventsConsumedLatch, 
applicationConfig2);
+    ApplicationRunner appRunner1 = 
ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, 
processedMessagesLatch1, null, kafkaEventsConsumedLatch,
+        applicationConfig1), applicationConfig1);
+    ApplicationRunner appRunner2 = 
ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, 
processedMessagesLatch2, null, kafkaEventsConsumedLatch,
+        applicationConfig2), applicationConfig2);
 
     // Run stream application.
-    applicationRunner1.run(streamApp1);
-    applicationRunner2.run(streamApp2);
+    appRunner1.run();
+    appRunner2.run();
 
     processedMessagesLatch1.await();
     processedMessagesLatch2.await();
@@ -559,18 +546,23 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     String jobModelVersion = zkUtils.getJobModelVersion();
     JobModel jobModel = zkUtils.getJobModel(jobModelVersion);
 
-    applicationRunner1.kill(streamApp1);
-    applicationRunner1.waitForFinish();
+    appRunner1.kill();
+    appRunner1.waitForFinish();
 
-    assertEquals(applicationRunner1.status(streamApp1), 
ApplicationStatus.SuccessfulFinish);
+    int lastProcessedMessageId = -1;
+    for (TestStreamApplication.TestKafkaEvent message : messagesProcessed) {
+      lastProcessedMessageId = Math.max(lastProcessedMessageId, 
Integer.parseInt(message.getEventData()));
+    }
+    messagesProcessed.clear();
+
+    assertEquals(ApplicationStatus.SuccessfulFinish, appRunner1.status());
 
-    LocalApplicationRunner applicationRunner4 = new 
LocalApplicationRunner(applicationConfig1);
     processedMessagesLatch1 = new CountDownLatch(1);
     publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, 2 * 
NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
-    streamApp1 = TestStreamApplication.getInstance(
-        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic,
-        processedMessagesLatch1, null, kafkaEventsConsumedLatch, 
applicationConfig1);
-    applicationRunner4.run(streamApp1);
+    ApplicationRunner appRunner3 = 
ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, 
processedMessagesLatch1, null, kafkaEventsConsumedLatch,
+        applicationConfig1), applicationConfig1);
+    appRunner3.run();
 
     processedMessagesLatch1.await();
 
@@ -581,12 +573,12 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     assertEquals(Integer.parseInt(jobModelVersion) + 1, 
Integer.parseInt(newJobModelVersion));
     assertEquals(jobModel.getContainers(), newJobModel.getContainers());
 
-    applicationRunner2.kill(streamApp2);
-    applicationRunner2.waitForFinish();
-    assertEquals(applicationRunner2.status(streamApp2), 
ApplicationStatus.SuccessfulFinish);
-    applicationRunner4.kill(streamApp1);
-    applicationRunner4.waitForFinish();
-    assertEquals(applicationRunner4.status(streamApp1), 
ApplicationStatus.SuccessfulFinish);
+    appRunner2.kill();
+    appRunner2.waitForFinish();
+    assertEquals(ApplicationStatus.SuccessfulFinish, appRunner2.status());
+    appRunner3.kill();
+    appRunner3.waitForFinish();
+    assertEquals(ApplicationStatus.SuccessfulFinish, appRunner3.status());
   }
 
   @Test
@@ -602,21 +594,20 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]);
     Config applicationConfig2 = new MapConfig(configMap);
 
-    LocalApplicationRunner applicationRunner1 = new 
LocalApplicationRunner(applicationConfig1);
-    LocalApplicationRunner applicationRunner2 = new 
LocalApplicationRunner(applicationConfig2);
-
     // Create StreamApplication from configuration.
     CountDownLatch kafkaEventsConsumedLatch = new 
CountDownLatch(NUM_KAFKA_EVENTS);
     CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
     CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
 
-    StreamApplication streamApp1 = 
TestStreamApplication.getInstance(TEST_SYSTEM, inputKafkaTopic, 
outputKafkaTopic,
-        processedMessagesLatch1, null, kafkaEventsConsumedLatch, 
applicationConfig1);
-    StreamApplication streamApp2 = 
TestStreamApplication.getInstance(TEST_SYSTEM, inputKafkaTopic, 
outputKafkaTopic,
-        processedMessagesLatch2, null, kafkaEventsConsumedLatch, 
applicationConfig2);
+    ApplicationRunner appRunner1 = 
ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, 
processedMessagesLatch1, null, kafkaEventsConsumedLatch,
+        applicationConfig1), applicationConfig1);
+    ApplicationRunner appRunner2 = 
ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, 
processedMessagesLatch2, null, kafkaEventsConsumedLatch,
+        applicationConfig2), applicationConfig2);
 
-    applicationRunner1.run(streamApp1);
-    applicationRunner2.run(streamApp2);
+    appRunner1.run();
+    appRunner2.run();
 
     processedMessagesLatch1.await();
     processedMessagesLatch2.await();
@@ -628,12 +619,12 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
     configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[2]);
     Config applicationConfig3 = new MapConfig(configMap);
 
-    LocalApplicationRunner applicationRunner3 = new 
LocalApplicationRunner(applicationConfig3);
     CountDownLatch processedMessagesLatch3 = new CountDownLatch(1);
 
-    StreamApplication streamApp3 = 
TestStreamApplication.getInstance(TEST_SYSTEM, inputKafkaTopic, 
outputKafkaTopic,
-        processedMessagesLatch3, null, kafkaEventsConsumedLatch, 
applicationConfig3);
-    applicationRunner3.run(streamApp3);
+    ApplicationRunner appRunner3 = 
ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+        TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, 
processedMessagesLatch3, null, kafkaEventsConsumedLatch,
+        applicationConfig3), applicationConfig3);
+    appRunner3.run();
 
     publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, 2 * 
NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
 
@@ -643,26 +634,12 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
      * If the processing has started in the third stream processor, then other 
two stream processors should be stopped.
      */
     // TODO: This is a bug! Status should be unsuccessful finish.
-    assertEquals(applicationRunner1.status(streamApp1), 
ApplicationStatus.SuccessfulFinish);
-    assertEquals(applicationRunner2.status(streamApp2), 
ApplicationStatus.SuccessfulFinish);
-
-    applicationRunner3.kill(streamApp3);
-    applicationRunner3.waitForFinish();
-    assertEquals(applicationRunner3.status(streamApp3), 
ApplicationStatus.SuccessfulFinish);
-  }
+    assertEquals(ApplicationStatus.SuccessfulFinish, appRunner1.status());
+    assertEquals(ApplicationStatus.SuccessfulFinish, appRunner2.status());
 
-  private static class TestKafkaEvent implements Serializable {
-
-    // Actual content of the event.
-    private String eventData;
-
-    // Contains Integer value, which is greater than previous message id.
-    private String eventId;
-
-    TestKafkaEvent(String eventId, String eventData) {
-      this.eventId = eventData;
-      this.eventData = eventData;
-    }
+    appRunner3.kill();
+    appRunner3.waitForFinish();
+    assertEquals(ApplicationStatus.SuccessfulFinish, appRunner3.status());
   }
 
 }

Reply via email to