http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java 
b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
index e1e1c55..1e021f5 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
@@ -103,7 +103,7 @@ public abstract class OperatorSpec<M, OM> implements 
Serializable {
   }
 
   /**
-   * Get the unique ID of this operator in the {@link 
org.apache.samza.operators.StreamGraph}.
+   * Get the unique ID of this operator in the {@link 
org.apache.samza.application.StreamApplicationDescriptorImpl}.
    * @return  the unique operator ID
    */
   public final String getOpId() {

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
index 17c6903..4db8e60 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
@@ -18,9 +18,9 @@
  */
 package org.apache.samza.operators.stream;
 
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
 import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
@@ -44,9 +44,9 @@ public class IntermediateMessageStreamImpl<M> extends 
MessageStreamImpl<M> imple
   private final OutputStreamImpl<M> outputStream;
   private final boolean isKeyed;
 
-  public IntermediateMessageStreamImpl(StreamGraphSpec graph, 
InputOperatorSpec inputOperatorSpec,
+  public IntermediateMessageStreamImpl(StreamApplicationDescriptorImpl 
appDesc, InputOperatorSpec inputOperatorSpec,
       OutputStreamImpl<M> outputStream) {
-    super(graph, (OperatorSpec<?, M>) inputOperatorSpec);
+    super(appDesc, (OperatorSpec<?, M>) inputOperatorSpec);
     this.outputStream = outputStream;
     if (inputOperatorSpec.isKeyed() != outputStream.isKeyed()) {
       LOGGER.error("Input and output streams for intermediate stream {} aren't 
keyed consistently. Input: {}, Output: {}",

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java 
b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index ed67d80..4ef9f9c 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -40,8 +40,8 @@ import org.apache.samza.coordinator.JobCoordinatorFactory;
 import org.apache.samza.coordinator.JobCoordinatorListener;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.MetricsReporter;
-import org.apache.samza.task.AsyncStreamTaskFactory;
-import org.apache.samza.task.StreamTaskFactory;
+import org.apache.samza.runtime.ProcessorLifecycleListener;
+import org.apache.samza.task.TaskFactory;
 import org.apache.samza.util.ScalaJavaUtil;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
@@ -95,8 +95,8 @@ public class StreamProcessor {
   private static final String CONTAINER_THREAD_NAME_FORMAT = "Samza 
StreamProcessor Container Thread-%d";
 
   private final JobCoordinator jobCoordinator;
-  private final StreamProcessorLifecycleListener processorListener;
-  private final Object taskFactory;
+  private final ProcessorLifecycleListener processorListener;
+  private final TaskFactory taskFactory;
   private final Map<String, MetricsReporter> customMetricsReporter;
   private final Config config;
   private final long taskShutdownMs;
@@ -105,7 +105,6 @@ public class StreamProcessor {
   private final Object lock = new Object();
 
   private Throwable containerException = null;
-  private boolean processorOnStartCalled = false;
 
   volatile CountDownLatch containerShutdownLatch = new CountDownLatch(1);
 
@@ -153,54 +152,56 @@ public class StreamProcessor {
    *
    * <b>Note:</b> Lifecycle of the ExecutorService is fully managed by the 
StreamProcessor.
    *
-   * @param config                 configuration required to launch {@link 
JobCoordinator} and {@link SamzaContainer}.
+   * @param config configuration required to launch {@link JobCoordinator} and 
{@link SamzaContainer}.
    * @param customMetricsReporters metricReporter instances that will be used 
by SamzaContainer and JobCoordinator to report metrics.
-   * @param asyncStreamTaskFactory The {@link AsyncStreamTaskFactory} to be 
used for creating task instances.
-   * @param processorListener      listener to the StreamProcessor life cycle.
+   * @param taskFactory the {@link TaskFactory} to be used for creating task 
instances.
+   * @param processorListener listener to the StreamProcessor life cycle.
    */
-  public StreamProcessor(Config config, Map<String, MetricsReporter> 
customMetricsReporters,
-                         AsyncStreamTaskFactory asyncStreamTaskFactory, 
StreamProcessorLifecycleListener processorListener) {
-    this(config, customMetricsReporters, asyncStreamTaskFactory, 
processorListener, null);
+  public StreamProcessor(Config config, Map<String, MetricsReporter> 
customMetricsReporters, TaskFactory taskFactory,
+      ProcessorLifecycleListener processorListener) {
+    this(config, customMetricsReporters, taskFactory, processorListener, null);
   }
 
   /**
-   * Same as {@link #StreamProcessor(Config, Map, AsyncStreamTaskFactory, 
StreamProcessorLifecycleListener)}, except task
-   * instances are created using the provided {@link StreamTaskFactory}.
-   * @param config - config
+   * Same as {@link #StreamProcessor(Config, Map, TaskFactory, 
ProcessorLifecycleListener)}, except the
+   * {@link JobCoordinator} is given for this {@link StreamProcessor}.
+   * @param config configuration required to launch {@link JobCoordinator} and 
{@link SamzaContainer}
    * @param customMetricsReporters metric Reporter
-   * @param streamTaskFactory task factory to instantiate the Task
-   * @param processorListener  listener to the StreamProcessor life cycle
+   * @param taskFactory task factory to instantiate the Task
+   * @param processorListener listener to the StreamProcessor life cycle
+   * @param jobCoordinator the instance of {@link JobCoordinator}
    */
-  public StreamProcessor(Config config, Map<String, MetricsReporter> 
customMetricsReporters,
-                         StreamTaskFactory streamTaskFactory, 
StreamProcessorLifecycleListener processorListener) {
-    this(config, customMetricsReporters, streamTaskFactory, processorListener, 
null);
+  public StreamProcessor(Config config, Map<String, MetricsReporter> 
customMetricsReporters, TaskFactory taskFactory,
+      ProcessorLifecycleListener processorListener, JobCoordinator 
jobCoordinator) {
+    this(config, customMetricsReporters, taskFactory, sp -> processorListener, 
jobCoordinator);
   }
 
-  /* package private */
-  private JobCoordinator getJobCoordinator() {
-    String jobCoordinatorFactoryClassName = new 
JobCoordinatorConfig(config).getJobCoordinatorFactoryClassName();
-    return Util.getObj(jobCoordinatorFactoryClassName, 
JobCoordinatorFactory.class).getJobCoordinator(config);
-  }
-
-  @VisibleForTesting
-  JobCoordinator getCurrentJobCoordinator() {
-    return jobCoordinator;
-  }
-
-  StreamProcessor(Config config, Map<String, MetricsReporter> 
customMetricsReporters, Object taskFactory,
-                  StreamProcessorLifecycleListener processorListener, 
JobCoordinator jobCoordinator) {
-    Preconditions.checkNotNull(processorListener, "ProcessorListener cannot be 
null.");
+  /**
+   * Same as {@link #StreamProcessor(Config, Map, TaskFactory, 
ProcessorLifecycleListener, JobCoordinator)}, except
+   * there is a {@link StreamProcessorLifecycleListenerFactory} as input 
instead of {@link ProcessorLifecycleListener}.
+   * This is useful to create a {@link ProcessorLifecycleListener} with a 
reference to this {@link StreamProcessor}
+   *
+   * @param config configuration required to launch {@link JobCoordinator} and 
{@link SamzaContainer}
+   * @param customMetricsReporters metric Reporter
+   * @param taskFactory task factory to instantiate the Task
+   * @param listenerFactory listener to the StreamProcessor life cycle
+   * @param jobCoordinator the instance of {@link JobCoordinator}
+   */
+  public StreamProcessor(Config config, Map<String, MetricsReporter> 
customMetricsReporters, TaskFactory taskFactory,
+      StreamProcessorLifecycleListenerFactory listenerFactory, JobCoordinator 
jobCoordinator) {
+    Preconditions.checkNotNull(listenerFactory, 
"StreamProcessorListenerFactory cannot be null.");
     this.taskFactory = taskFactory;
     this.config = config;
     this.taskShutdownMs = new TaskConfigJava(config).getShutdownMs();
     this.customMetricsReporter = customMetricsReporters;
-    this.processorListener = processorListener;
-    this.jobCoordinator = (jobCoordinator != null) ? jobCoordinator : 
getJobCoordinator();
+    this.jobCoordinator = (jobCoordinator != null) ? jobCoordinator : 
createJobCoordinator();
     this.jobCoordinatorListener = createJobCoordinatorListener();
     this.jobCoordinator.setListener(jobCoordinatorListener);
     ThreadFactory threadFactory = new 
ThreadFactoryBuilder().setNameFormat(CONTAINER_THREAD_NAME_FORMAT).setDaemon(true).build();
     this.executorService = Executors.newSingleThreadExecutor(threadFactory);
+    // TODO: remove the dependency on jobCoordinator for processorId after 
fixing SAMZA-1835
     this.processorId = this.jobCoordinator.getProcessorId();
+    this.processorListener = listenerFactory.createInstance(this);
   }
 
   /**
@@ -214,6 +215,7 @@ public class StreamProcessor {
   public void start() {
     synchronized (lock) {
       if (state == State.NEW) {
+        processorListener.beforeStart();
         state = State.STARTED;
         jobCoordinator.start();
       } else {
@@ -239,9 +241,9 @@ public class StreamProcessor {
    * <br>
    * If container is running,
    * <ol>
-   *   <li>container is shutdown cleanly and {@link 
SamzaContainerListener#onContainerStop()} will trigger
+   *   <li>container is shutdown cleanly and {@link 
SamzaContainerListener#afterStop()} will trigger
    *   {@link JobCoordinator#stop()}</li>
-   *   <li>container fails to shutdown cleanly and {@link 
SamzaContainerListener#onContainerFailed(Throwable)} will
+   *   <li>container fails to shutdown cleanly and {@link 
SamzaContainerListener#afterFailure(Throwable)} will
    *   trigger {@link JobCoordinator#stop()}</li>
    * </ol>
    * If container is not running, then this method will simply shutdown the 
{@link JobCoordinator}.
@@ -269,10 +271,26 @@ public class StreamProcessor {
     }
   }
 
+  @VisibleForTesting
+  JobCoordinator getCurrentJobCoordinator() {
+    return jobCoordinator;
+  }
+
+  @VisibleForTesting
+  SamzaContainer getContainer() {
+    return container;
+  }
+
+  @VisibleForTesting
   SamzaContainer createSamzaContainer(String processorId, JobModel jobModel) {
     return SamzaContainer.apply(processorId, jobModel, config, 
ScalaJavaUtil.toScalaMap(customMetricsReporter), taskFactory);
   }
 
+  private JobCoordinator createJobCoordinator() {
+    String jobCoordinatorFactoryClassName = new 
JobCoordinatorConfig(config).getJobCoordinatorFactoryClassName();
+    return Util.getObj(jobCoordinatorFactoryClassName, 
JobCoordinatorFactory.class).getJobCoordinator(config);
+  }
+
   /**
    * Stops the {@link SamzaContainer}.
    * @return true if {@link SamzaContainer} had shutdown within 
task.shutdown.ms. false otherwise.
@@ -346,9 +364,9 @@ public class StreamProcessor {
           state = State.STOPPED;
         }
         if (containerException != null)
-          processorListener.onFailure(containerException);
+          processorListener.afterFailure(containerException);
         else
-          processorListener.onShutdown();
+          processorListener.afterStop();
 
       }
 
@@ -360,30 +378,40 @@ public class StreamProcessor {
           executorService.shutdownNow();
           state = State.STOPPED;
         }
-        processorListener.onFailure(throwable);
+        processorListener.afterFailure(throwable);
       }
     };
   }
 
-  /* package private for testing */
-  SamzaContainer getContainer() {
-    return container;
+  /**
+   * Interface to create a {@link ProcessorLifecycleListener}
+   */
+  @FunctionalInterface
+  public interface StreamProcessorLifecycleListenerFactory {
+    ProcessorLifecycleListener createInstance(StreamProcessor processor);
   }
 
   class ContainerListener implements SamzaContainerListener {
 
+    private boolean processorOnStartCalled = false;
+
+    @Override
+    public void beforeStart() {
+      // processorListener.beforeStart() is invoked in StreamProcessor.start()
+    }
+
     @Override
-    public void onContainerStart() {
+    public void afterStart() {
       LOGGER.warn("Received container start notification for container: {} in 
stream processor: {}.", container, processorId);
       if (!processorOnStartCalled) {
-        processorListener.onStart();
+        processorListener.afterStart();
         processorOnStartCalled = true;
       }
       state = State.RUNNING;
     }
 
     @Override
-    public void onContainerStop() {
+    public void afterStop() {
       containerShutdownLatch.countDown();
       synchronized (lock) {
         if (state == State.IN_REBALANCE) {
@@ -397,7 +425,7 @@ public class StreamProcessor {
     }
 
     @Override
-    public void onContainerFailed(Throwable t) {
+    public void afterFailure(Throwable t) {
       containerShutdownLatch.countDown();
       synchronized (lock) {
         LOGGER.error(String.format("Container: %s failed with an exception. 
Stopping the stream processor: %s. Original exception:", container, 
processorId), containerException);

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifecycleListener.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifecycleListener.java
 
b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifecycleListener.java
deleted file mode 100644
index 7a4da7d..0000000
--- 
a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessorLifecycleListener.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.processor;
-
-import org.apache.samza.annotation.InterfaceStability;
-
-
-/**
- * This class listens to the life cycle events in a {@link StreamProcessor},
- */
-@InterfaceStability.Evolving
-public interface StreamProcessorLifecycleListener {
-  /**
-   * Callback when the {@link StreamProcessor} is started
-   * This callback is invoked only once when {@link 
org.apache.samza.container.SamzaContainer} starts for the first time
-   * in the {@link StreamProcessor}. When there is a re-balance of 
tasks/partitions among the processors, the container
-   * may temporarily be "paused" and re-started again. For such re-starts, 
this callback is NOT invoked.
-   */
-  void onStart();
-
-  /**
-   * Callback when the {@link StreamProcessor} is shut down.
-   */
-  void onShutdown();
-
-  /**
-   * Callback when the {@link StreamProcessor} fails
-   * @param t Cause of the failure
-   */
-  void onFailure(Throwable t);
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
 
b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
deleted file mode 100644
index dfcfba4..0000000
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.runtime;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.io.File;
-import java.io.PrintWriter;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.ApplicationConfig;
-import org.apache.samza.config.ApplicationConfig.ApplicationMode;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.ShellCommandConfig;
-import org.apache.samza.config.StreamConfig;
-import org.apache.samza.execution.ExecutionPlan;
-import org.apache.samza.execution.ExecutionPlanner;
-import org.apache.samza.execution.JobNode;
-import org.apache.samza.execution.StreamManager;
-import org.apache.samza.operators.OperatorSpecGraph;
-import org.apache.samza.operators.StreamGraphSpec;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Defines common, core behavior for implementations of the {@link 
ApplicationRunner} API.
- */
-public abstract class AbstractApplicationRunner extends ApplicationRunner {
-  private static final Logger log = 
LoggerFactory.getLogger(AbstractApplicationRunner.class);
-
-  /**
-   * The {@link ApplicationRunner} is supposed to run a single {@link 
StreamApplication} instance in the full life-cycle
-   */
-  protected final StreamGraphSpec graphSpec;
-
-  public AbstractApplicationRunner(Config config) {
-    super(config);
-    this.graphSpec = new StreamGraphSpec(config);
-  }
-
-  public ExecutionPlan getExecutionPlan(StreamApplication app) throws 
Exception {
-    return getExecutionPlan(app, null);
-  }
-
-  /* package private */
-  ExecutionPlan getExecutionPlan(StreamApplication app, String runId) throws 
Exception {
-    // build stream graph
-    app.init(graphSpec, config);
-    OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph();
-
-    // generated application configs are stored in cfg
-    Map<String, String> cfg = new HashMap<>();
-    if (StringUtils.isNoneEmpty(runId)) {
-      cfg.put(ApplicationConfig.APP_RUN_ID, runId);
-    }
-
-    StreamConfig streamConfig = new StreamConfig(config);
-    Set<String> inputStreams = new 
HashSet<>(specGraph.getInputOperators().keySet());
-    inputStreams.removeAll(specGraph.getOutputStreams().keySet());
-    ApplicationMode mode = 
inputStreams.stream().allMatch(streamConfig::getIsBounded)
-        ? ApplicationMode.BATCH : ApplicationMode.STREAM;
-    cfg.put(ApplicationConfig.APP_MODE, mode.name());
-
-    // merge user-provided configuration with input/output descriptor 
generated configuration
-    // descriptor generated configuration has higher priority
-    Map<String, String> systemStreamConfigs = new HashMap<>();
-    graphSpec.getInputDescriptors().forEach((key, value) -> 
systemStreamConfigs.putAll(value.toConfig()));
-    graphSpec.getOutputDescriptors().forEach((key, value) -> 
systemStreamConfigs.putAll(value.toConfig()));
-    graphSpec.getSystemDescriptors().forEach(sd -> 
systemStreamConfigs.putAll(sd.toConfig()));
-    graphSpec.getDefaultSystemDescriptor().ifPresent(dsd ->
-        systemStreamConfigs.put(JobConfig.JOB_DEFAULT_SYSTEM(), 
dsd.getSystemName()));
-    cfg.putAll(systemStreamConfigs);
-
-    // create the physical execution plan and merge with overrides. This works 
for a single-stage job now
-    // TODO: This should all be consolidated with ExecutionPlanner after 
fixing SAMZA-1811
-    Config mergedConfig = JobNode.mergeJobConfig(config, new MapConfig(cfg));
-    StreamManager streamManager = buildAndStartStreamManager(mergedConfig);
-    try {
-      ExecutionPlanner planner = new ExecutionPlanner(mergedConfig, 
streamManager);
-      return planner.plan(specGraph);
-    } finally {
-      streamManager.stop();
-    }
-  }
-
-  /**
-   * Write the execution plan JSON to a file
-   * @param planJson JSON representation of the plan
-   */
-  final void writePlanJsonFile(String planJson) {
-    try {
-      String content = "plan='" + planJson + "'";
-      String planPath = System.getenv(ShellCommandConfig.EXECUTION_PLAN_DIR());
-      if (planPath != null && !planPath.isEmpty()) {
-        // Write the plan json to plan path
-        File file = new File(planPath + "/plan.json");
-        file.setReadable(true, false);
-        PrintWriter writer = new PrintWriter(file, "UTF-8");
-        writer.println(content);
-        writer.close();
-      }
-    } catch (Exception e) {
-      log.warn("Failed to write execution plan json to file", e);
-    }
-  }
-
-  @VisibleForTesting
-  StreamManager buildAndStartStreamManager(Config config) {
-    StreamManager streamManager = new StreamManager(config);
-    streamManager.start();
-    return streamManager;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java 
b/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java
index 13e6d38..17a9dc1 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java
@@ -21,21 +21,17 @@ package org.apache.samza.runtime;
 
 import joptsimple.OptionSet;
 import joptsimple.OptionSpec;
-import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.ApplicationUtil;
 import org.apache.samza.config.Config;
-import org.apache.samza.job.JobRunner$;
 import org.apache.samza.util.CommandLine;
 import org.apache.samza.util.Util;
 
 
 /**
  * This class contains the main() method used by run-app.sh.
- * For a StreamApplication, it creates the {@link ApplicationRunner} based on 
the config, and then run the application.
- * For a Samza job using low level task API, it will create the JobRunner to 
run it.
+ * It creates the {@link ApplicationRunner} based on the config, and then run 
the application.
  */
 public class ApplicationRunnerMain {
-  // TODO: have the app configs consolidated in one place
-  public static final String STREAM_APPLICATION_CLASS_CONFIG = "app.class";
 
   public static class ApplicationRunnerCommandLine extends CommandLine {
     public OptionSpec operationOpt =
@@ -58,25 +54,21 @@ public class ApplicationRunnerMain {
     Config config = Util.rewriteConfig(orgConfig);
     ApplicationRunnerOperation op = cmdLine.getOperation(options);
 
-    if (config.containsKey(STREAM_APPLICATION_CLASS_CONFIG)) {
-      ApplicationRunner runner = ApplicationRunner.fromConfig(config);
-      StreamApplication app =
-          (StreamApplication) 
Class.forName(config.get(STREAM_APPLICATION_CLASS_CONFIG)).newInstance();
-      switch (op) {
-        case RUN:
-          runner.run(app);
-          break;
-        case KILL:
-          runner.kill(app);
-          break;
-        case STATUS:
-          System.out.println(runner.status(app));
-          break;
-        default:
-          throw new IllegalArgumentException("Unrecognized operation: " + op);
-      }
-    } else {
-      JobRunner$.MODULE$.main(args);
+    ApplicationRunner appRunner =
+        
ApplicationRunners.getApplicationRunner(ApplicationUtil.fromConfig(config), 
config);
+
+    switch (op) {
+      case RUN:
+        appRunner.run();
+        break;
+      case KILL:
+        appRunner.kill();
+        break;
+      case STATUS:
+        System.out.println(appRunner.status());
+        break;
+      default:
+        throw new IllegalArgumentException("Unrecognized operation: " + op);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 8a9c151..7100482 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -21,36 +21,29 @@ package org.apache.samza.runtime;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.time.Duration;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.samza.SamzaException;
-import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.ApplicationDescriptor;
+import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.SamzaApplication;
+import org.apache.samza.application.ApplicationDescriptorUtil;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.JobCoordinatorConfig;
-import org.apache.samza.config.TaskConfig;
-import org.apache.samza.coordinator.CoordinationUtils;
-import org.apache.samza.coordinator.DistributedLockWithState;
-import org.apache.samza.execution.ExecutionPlan;
-import org.apache.samza.execution.StreamManager;
+import org.apache.samza.execution.LocalJobPlanner;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.metrics.MetricsReporter;
-import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.processor.StreamProcessor;
-import org.apache.samza.processor.StreamProcessorLifecycleListener;
-import org.apache.samza.system.StreamSpec;
-import org.apache.samza.task.AsyncStreamTaskFactory;
-import org.apache.samza.task.StreamTaskFactory;
+import org.apache.samza.task.TaskFactory;
 import org.apache.samza.task.TaskFactoryUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,176 +51,81 @@ import org.slf4j.LoggerFactory;
 /**
  * This class implements the {@link ApplicationRunner} that runs the 
applications in standalone environment
  */
-public class LocalApplicationRunner extends AbstractApplicationRunner {
+public class LocalApplicationRunner implements ApplicationRunner {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(LocalApplicationRunner.class);
-  private static final String APPLICATION_RUNNER_PATH_SUFFIX = 
"/ApplicationRunnerData";
 
-  private final String uid;
+  private final ApplicationDescriptorImpl<? extends ApplicationDescriptor> 
appDesc;
+  private final LocalJobPlanner planner;
   private final Set<StreamProcessor> processors = 
ConcurrentHashMap.newKeySet();
   private final CountDownLatch shutdownLatch = new CountDownLatch(1);
   private final AtomicInteger numProcessorsToStart = new AtomicInteger();
   private final AtomicReference<Throwable> failure = new AtomicReference<>();
-  private final Map<String, MetricsReporter> customMetricsReporters;
 
   private ApplicationStatus appStatus = ApplicationStatus.New;
 
-  final class LocalStreamProcessorLifeCycleListener implements 
StreamProcessorLifecycleListener {
-    StreamProcessor processor;
-
-    void setProcessor(StreamProcessor processor) {
-      this.processor = processor;
-    }
-
-    @Override
-    public void onStart() {
-      if (numProcessorsToStart.decrementAndGet() == 0) {
-        appStatus = ApplicationStatus.Running;
-      }
-    }
-
-    @Override
-    public void onShutdown() {
-      processors.remove(processor);
-      processor = null;
-
-      if (processors.isEmpty()) {
-        shutdownAndNotify();
-      }
-    }
-
-    @Override
-    public void onFailure(Throwable t) {
-      processors.remove(processor);
-      processor = null;
-
-      if (failure.compareAndSet(null, t)) {
-        // shutdown the other processors
-        processors.forEach(StreamProcessor::stop);
-      }
-
-      if (processors.isEmpty()) {
-        shutdownAndNotify();
-      }
-    }
-
-    private void shutdownAndNotify() {
-      if (failure.get() != null) {
-        appStatus = ApplicationStatus.unsuccessfulFinish(failure.get());
-      } else {
-        if (appStatus == ApplicationStatus.Running) {
-          appStatus = ApplicationStatus.SuccessfulFinish;
-        } else if (appStatus == ApplicationStatus.New) {
-          // the processor is shutdown before started
-          appStatus = ApplicationStatus.UnsuccessfulFinish;
-        }
-      }
-
-      shutdownLatch.countDown();
-    }
-  }
-
-  public LocalApplicationRunner(Config config) {
-    this(config, new HashMap<>());
-  }
-
-  public LocalApplicationRunner(Config config, Map<String, MetricsReporter> 
customMetricsReporters) {
-    super(config);
-    this.uid = UUID.randomUUID().toString();
-    this.customMetricsReporters = customMetricsReporters;
+  /**
+   * Constructors a {@link LocalApplicationRunner} to run the {@code app} with 
the {@code config}.
+   *
+   * @param app application to run
+   * @param config configuration for the application
+   */
+  public LocalApplicationRunner(SamzaApplication app, Config config) {
+    this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config);
+    this.planner = new LocalJobPlanner(appDesc);
   }
 
-  @Override
-  public void runTask() {
-    JobConfig jobConfig = new JobConfig(this.config);
-
-    // validation
-    String taskName = new TaskConfig(config).getTaskClass().getOrElse(null);
-    if (taskName == null) {
-      throw new SamzaException("Neither APP nor task.class are defined 
defined");
-    }
-    LOG.info("LocalApplicationRunner will run " + taskName);
-    LocalStreamProcessorLifeCycleListener listener = new 
LocalStreamProcessorLifeCycleListener();
-
-    StreamProcessor processor = createStreamProcessor(jobConfig, listener);
-
-    numProcessorsToStart.set(1);
-    listener.setProcessor(processor);
-    processor.start();
+  /**
+   * Constructor only used in unit test to allow injection of {@link 
LocalJobPlanner}
+   */
+  @VisibleForTesting
+  LocalApplicationRunner(ApplicationDescriptorImpl<? extends 
ApplicationDescriptor> appDesc, LocalJobPlanner planner) {
+    this.appDesc = appDesc;
+    this.planner = planner;
   }
 
   @Override
-  public void run(StreamApplication app) {
-
+  public void run() {
     try {
-      // 1. initialize and plan
-      ExecutionPlan plan = getExecutionPlan(app);
+      List<JobConfig> jobConfigs = planner.prepareJobs();
 
-      String executionPlanJson = plan.getPlanAsJson();
-      writePlanJsonFile(executionPlanJson);
-      LOG.info("Execution Plan: \n" + executionPlanJson);
-      String planId = String.valueOf(executionPlanJson.hashCode());
-
-      if (plan.getJobConfigs().isEmpty()) {
+      // create the StreamProcessors
+      if (jobConfigs.isEmpty()) {
         throw new SamzaException("No jobs to run.");
       }
-
-      plan.getJobConfigs().forEach(jobConfig -> {
-          StreamManager streamManager = null;
-          try {
-            // 2. create the necessary streams
-            streamManager = buildAndStartStreamManager(jobConfig);
-            createStreams(planId, plan.getIntermediateStreams(), 
streamManager);
-
-            // 3. create the StreamProcessors
-            LOG.debug("Starting job {} StreamProcessor with config {}", 
jobConfig.getName(), jobConfig);
-            LocalStreamProcessorLifeCycleListener listener = new 
LocalStreamProcessorLifeCycleListener();
-            StreamProcessor processor = createStreamProcessor(jobConfig, 
graphSpec, listener);
-            listener.setProcessor(processor);
-            processors.add(processor);
-          } finally {
-            if (streamManager != null) {
-              streamManager.stop();
-            }
-          }
+      jobConfigs.forEach(jobConfig -> {
+          LOG.debug("Starting job {} StreamProcessor with config {}", 
jobConfig.getName(), jobConfig);
+          StreamProcessor processor = createStreamProcessor(jobConfig, appDesc,
+              sp -> new LocalStreamProcessorLifecycleListener(sp, jobConfig));
+          processors.add(processor);
         });
       numProcessorsToStart.set(processors.size());
 
-      // 4. start the StreamProcessors
+      // start the StreamProcessors
       processors.forEach(StreamProcessor::start);
     } catch (Throwable throwable) {
       appStatus = ApplicationStatus.unsuccessfulFinish(throwable);
       shutdownLatch.countDown();
-      throw new SamzaException(String.format("Failed to start application: 
%s.", app), throwable);
+      throw new SamzaException(String.format("Failed to start application: %s",
+          new ApplicationConfig(appDesc.getConfig()).getGlobalAppId()), 
throwable);
     }
   }
 
   @Override
-  public void kill(StreamApplication streamApp) {
+  public void kill() {
     processors.forEach(StreamProcessor::stop);
   }
 
   @Override
-  public ApplicationStatus status(StreamApplication streamApp) {
+  public ApplicationStatus status() {
     return appStatus;
   }
 
-  /**
-   * Waits until the application finishes.
-   */
   @Override
   public void waitForFinish() {
-    waitForFinish(Duration.ofMillis(0));
+    this.waitForFinish(Duration.ofSeconds(0));
   }
 
-  /**
-   * Waits for {@code timeout} duration for the application to finish.
-   * If timeout &lt; 1, blocks the caller indefinitely.
-   *
-   * @param timeout time to wait for the application to finish
-   * @return true - application finished before timeout
-   *         false - otherwise
-   */
   @Override
   public boolean waitForFinish(Duration timeout) {
     long timeoutInMs = timeout.toMillis();
@@ -251,102 +149,105 @@ public class LocalApplicationRunner extends 
AbstractApplicationRunner {
     return finished;
   }
 
+  @VisibleForTesting
+  protected Set<StreamProcessor> getProcessors() {
+    return Collections.unmodifiableSet(processors);
+  }
+
+  @VisibleForTesting
+  CountDownLatch getShutdownLatch() {
+    return shutdownLatch;
+  }
+
+  @VisibleForTesting
+  StreamProcessor createStreamProcessor(Config config, 
ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc,
+      StreamProcessor.StreamProcessorLifecycleListenerFactory listenerFactory) 
{
+    TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(appDesc);
+    Map<String, MetricsReporter> reporters = new HashMap<>();
+    // TODO: the null processorId has to be fixed after SAMZA-1835
+    appDesc.getMetricsReporterFactories().forEach((name, factory) ->
+        reporters.put(name, factory.getMetricsReporter(name, null, config)));
+    return new StreamProcessor(config, reporters, taskFactory, 
listenerFactory, null);
+  }
+
   /**
-   * Create intermediate streams using {@link 
org.apache.samza.execution.StreamManager}.
-   * If {@link CoordinationUtils} is provided, this function will first invoke 
leader election, and the leader
-   * will create the streams. All the runner processes will wait on the latch 
that is released after the leader finishes
-   * stream creation.
-   * @param planId a unique identifier representing the plan used for 
coordination purpose
-   * @param intStreams list of intermediate {@link StreamSpec}s
+   * Defines a specific implementation of {@link ProcessorLifecycleListener} 
for local {@link StreamProcessor}s.
    */
-  private void createStreams(String planId,
-      List<StreamSpec> intStreams,
-      StreamManager streamManager) {
-    if (intStreams.isEmpty()) {
-      LOG.info("Set of intermediate streams is empty. Nothing to create.");
-      return;
+  private final class LocalStreamProcessorLifecycleListener implements 
ProcessorLifecycleListener {
+    private final StreamProcessor processor;
+    private final ProcessorLifecycleListener 
userDefinedProcessorLifecycleListener;
+
+    LocalStreamProcessorLifecycleListener(StreamProcessor processor, Config 
jobConfig) {
+      this.userDefinedProcessorLifecycleListener = 
appDesc.getProcessorLifecycleListenerFactory()
+          .createInstance(new ProcessorContext() { }, jobConfig);
+      this.processor = processor;
     }
-    LOG.info("A single processor must create the intermediate streams. 
Processor {} will attempt to acquire the lock.", uid);
-    // Move the scope of coordination utils within stream creation to address 
long idle connection problem.
-    // Refer SAMZA-1385 for more details
-    JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(config);
-    String coordinationId = new ApplicationConfig(config).getGlobalAppId() + 
APPLICATION_RUNNER_PATH_SUFFIX;
-    CoordinationUtils coordinationUtils =
-        
jcConfig.getCoordinationUtilsFactory().getCoordinationUtils(coordinationId, 
uid, config);
-    if (coordinationUtils == null) {
-      LOG.warn("Processor {} failed to create utils. Each processor will 
attempt to create streams.", uid);
-      // each application process will try creating the streams, which
-      // requires stream creation to be idempotent
-      streamManager.createStreams(intStreams);
-      return;
+
+    @Override
+    public void beforeStart() {
+      userDefinedProcessorLifecycleListener.beforeStart();
     }
 
-    DistributedLockWithState lockWithState = 
coordinationUtils.getLockWithState(planId);
-    try {
-      // check if the processor needs to go through leader election and stream 
creation
-      if (lockWithState.lockIfNotSet(1000, TimeUnit.MILLISECONDS)) {
-        LOG.info("lock acquired for streams creation by " + uid);
-        streamManager.createStreams(intStreams);
-        lockWithState.unlockAndSet();
-      } else {
-        LOG.info("Processor {} did not obtain the lock for streams creation. 
They must've been created by another processor.", uid);
+    @Override
+    public void afterStart() {
+      if (numProcessorsToStart.decrementAndGet() == 0) {
+        appStatus = ApplicationStatus.Running;
       }
-    } catch (TimeoutException e) {
-      String msg = String.format("Processor {} failed to get the lock for 
stream initialization", uid);
-      throw new SamzaException(msg, e);
-    } finally {
-      coordinationUtils.close();
+      userDefinedProcessorLifecycleListener.afterStart();
     }
-  }
 
-  /**
-   * Create {@link StreamProcessor} based on {@link StreamApplication} and the 
config
-   * @param config config
-   * @return {@link StreamProcessor]}
-   */
-  /* package private */
-  StreamProcessor createStreamProcessor(
-      Config config,
-      StreamProcessorLifecycleListener listener) {
-    Object taskFactory = TaskFactoryUtil.createTaskFactory(config);
-    return getStreamProcessorInstance(config, taskFactory, listener);
-  }
+    @Override
+    public void afterStop() {
+      processors.remove(processor);
 
-  /**
-   * Create {@link StreamProcessor} based on {@link StreamApplication} and the 
config
-   * @param config config
-   * @param graphBuilder {@link StreamGraphSpec}
-   * @return {@link StreamProcessor]}
-   */
-  /* package private */
-  StreamProcessor createStreamProcessor(
-      Config config,
-      StreamGraphSpec graphBuilder,
-      StreamProcessorLifecycleListener listener) {
-    Object taskFactory = 
TaskFactoryUtil.createTaskFactory(graphBuilder.getOperatorSpecGraph(), 
graphBuilder.getContextManager());
-    return getStreamProcessorInstance(config, taskFactory, listener);
-  }
+      // successful shutdown
+      handleProcessorShutdown(null);
+    }
 
-  private StreamProcessor getStreamProcessorInstance(Config config, Object 
taskFactory, StreamProcessorLifecycleListener listener) {
-    if (taskFactory instanceof StreamTaskFactory) {
-      return new StreamProcessor(
-          config, customMetricsReporters, (StreamTaskFactory) taskFactory, 
listener);
-    } else if (taskFactory instanceof AsyncStreamTaskFactory) {
-      return new StreamProcessor(
-          config, customMetricsReporters, (AsyncStreamTaskFactory) 
taskFactory, listener);
-    } else {
-      throw new SamzaException(String.format("%s is not a valid task factory",
-          taskFactory.getClass().getCanonicalName()));
+    @Override
+    public void afterFailure(Throwable t) {
+      processors.remove(processor);
+
+      // the processor stopped with failure, this is logging the first 
processor's failure as the cause of
+      // the whole application failure
+      if (failure.compareAndSet(null, t)) {
+        // shutdown the other processors
+        processors.forEach(StreamProcessor::stop);
+      }
+
+      // handle the current processor's shutdown failure.
+      handleProcessorShutdown(t);
     }
-  }
 
-  /* package private for testing */
-  Set<StreamProcessor> getProcessors() {
-    return processors;
-  }
+    private void handleProcessorShutdown(Throwable error) {
+      if (processors.isEmpty()) {
+        // all processors are shutdown, setting the application final status
+        setApplicationFinalStatus();
+      }
+      if (error != null) {
+        // current processor shutdown with a failure
+        userDefinedProcessorLifecycleListener.afterFailure(error);
+      } else {
+        // current processor shutdown successfully
+        userDefinedProcessorLifecycleListener.afterStop();
+      }
+      if (processors.isEmpty()) {
+        // no processor is still running. Notify callers waiting on 
waitForFinish()
+        shutdownLatch.countDown();
+      }
+    }
 
-  @VisibleForTesting
-  CountDownLatch getShutdownLatch() {
-    return shutdownLatch;
+    private void setApplicationFinalStatus() {
+      if (failure.get() != null) {
+        appStatus = ApplicationStatus.unsuccessfulFinish(failure.get());
+      } else {
+        if (appStatus == ApplicationStatus.Running) {
+          appStatus = ApplicationStatus.SuccessfulFinish;
+        } else if (appStatus == ApplicationStatus.New) {
+          // the processor is shutdown before started
+          appStatus = ApplicationStatus.UnsuccessfulFinish;
+        }
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index fe75883..98864d2 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -20,10 +20,14 @@
 package org.apache.samza.runtime;
 
 import java.util.HashMap;
+import java.util.Map;
 import java.util.Random;
 import org.apache.log4j.MDC;
 import org.apache.samza.SamzaException;
-import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.ApplicationDescriptor;
+import org.apache.samza.application.ApplicationDescriptorUtil;
+import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.ApplicationUtil;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.ShellCommandConfig;
@@ -31,11 +35,12 @@ import org.apache.samza.container.ContainerHeartbeatClient;
 import org.apache.samza.container.ContainerHeartbeatMonitor;
 import org.apache.samza.container.SamzaContainer;
 import org.apache.samza.container.SamzaContainer$;
-import org.apache.samza.operators.StreamGraphSpec;
-import org.apache.samza.util.SamzaUncaughtExceptionHandler;
 import org.apache.samza.container.SamzaContainerListener;
 import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.task.TaskFactory;
 import org.apache.samza.task.TaskFactoryUtil;
+import org.apache.samza.util.SamzaUncaughtExceptionHandler;
 import org.apache.samza.util.ScalaJavaUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -75,37 +80,51 @@ public class LocalContainerRunner {
     MDC.put("jobName", jobName);
     MDC.put("jobId", jobId);
 
-    StreamApplication streamApp = 
TaskFactoryUtil.createStreamApplication(config);
-    Object taskFactory = getTaskFactory(streamApp, config);
-    run(taskFactory, containerId, jobModel, config);
+    ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc =
+        
ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(config), 
config);
+    run(appDesc, containerId, jobModel, config);
 
     System.exit(0);
   }
 
-  private static void run(Object taskFactory, String containerId, JobModel 
jobModel, Config config) {
+  private static void run(ApplicationDescriptorImpl<? extends 
ApplicationDescriptor> appDesc, String containerId,
+      JobModel jobModel, Config config) {
+    TaskFactory taskFactory = TaskFactoryUtil.getTaskFactory(appDesc);
     SamzaContainer container = SamzaContainer$.MODULE$.apply(
         containerId,
         jobModel,
         config,
-        ScalaJavaUtil.toScalaMap(new HashMap<>()),
+        ScalaJavaUtil.toScalaMap(loadMetricsReporters(appDesc, containerId, 
config)),
         taskFactory);
 
+    ProcessorLifecycleListener listener = 
appDesc.getProcessorLifecycleListenerFactory()
+        .createInstance(new ProcessorContext() { }, config);
+
     container.setContainerListener(
         new SamzaContainerListener() {
           @Override
-          public void onContainerStart() {
+          public void beforeStart() {
+            log.info("Before starting the container.");
+            listener.beforeStart();
+          }
+
+          @Override
+          public void afterStart() {
             log.info("Container Started");
+            listener.afterStart();
           }
 
           @Override
-          public void onContainerStop() {
+          public void afterStop() {
             log.info("Container Stopped");
+            listener.afterStop();
           }
 
           @Override
-          public void onContainerFailed(Throwable t) {
+          public void afterFailure(Throwable t) {
             log.info("Container Failed");
             containerRunnerException = t;
+            listener.afterFailure(t);
           }
         });
 
@@ -126,13 +145,14 @@ public class LocalContainerRunner {
     }
   }
 
-  private static Object getTaskFactory(StreamApplication streamApp, Config 
config) {
-    if (streamApp != null) {
-      StreamGraphSpec graphSpec = new StreamGraphSpec(config);
-      streamApp.init(graphSpec, config);
-      return 
TaskFactoryUtil.createTaskFactory(graphSpec.getOperatorSpecGraph(), 
graphSpec.getContextManager());
-    }
-    return TaskFactoryUtil.createTaskFactory(config);
+  // TODO: this is going away when SAMZA-1168 is done and the initialization 
of metrics reporters are done via
+  // LocalApplicationRunner#createStreamProcessor()
+  private static Map<String, MetricsReporter> loadMetricsReporters(
+      ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc, 
String containerId, Config config) {
+    Map<String, MetricsReporter> reporters = new HashMap<>();
+    appDesc.getMetricsReporterFactories().forEach((name, factory) ->
+        reporters.put(name, factory.getMetricsReporter(name, containerId, 
config)));
+    return reporters;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
 
b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
index 6229abc..69eb5fe 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
@@ -20,18 +20,17 @@
 package org.apache.samza.runtime;
 
 import java.time.Duration;
-import java.util.UUID;
+import java.util.List;
 import org.apache.samza.SamzaException;
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.application.ApplicationDescriptor;
+import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.SamzaApplication;
+import org.apache.samza.application.ApplicationDescriptorUtil;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
-import org.apache.samza.execution.ExecutionPlan;
-import org.apache.samza.execution.StreamManager;
+import org.apache.samza.execution.RemoteJobPlanner;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.job.JobRunner;
-import org.apache.samza.metrics.MetricsRegistryMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,55 +40,38 @@ import static org.apache.samza.job.ApplicationStatus.*;
 /**
  * This class implements the {@link ApplicationRunner} that runs the 
applications in a remote cluster
  */
-public class RemoteApplicationRunner extends AbstractApplicationRunner {
+public class RemoteApplicationRunner implements ApplicationRunner {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(RemoteApplicationRunner.class);
   private static final long DEFAULT_SLEEP_DURATION_MS = 2000;
 
-  public RemoteApplicationRunner(Config config) {
-    super(config);
-  }
-
-  @Override
-  public void runTask() {
-    throw new UnsupportedOperationException("Running StreamTask is not 
implemented for RemoteReplicationRunner");
-  }
+  private final ApplicationDescriptorImpl<? extends ApplicationDescriptor> 
appDesc;
+  private final RemoteJobPlanner planner;
 
   /**
-   * Run the {@link StreamApplication} on the remote cluster
-   * @param app a StreamApplication
+   * Constructors a {@link RemoteApplicationRunner} to run the {@code app} 
with the {@code config}.
+   *
+   * @param app application to run
+   * @param config configuration for the application
    */
+  public RemoteApplicationRunner(SamzaApplication app, Config config) {
+    this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config);
+    this.planner = new RemoteJobPlanner(appDesc);
+  }
+
   @Override
-  public void run(StreamApplication app) {
+  public void run() {
     try {
-      // TODO: run.id needs to be set for standalone: SAMZA-1531
-      // run.id is based on current system time with the most significant bits 
in UUID (8 digits) to avoid collision
-      String runId = String.valueOf(System.currentTimeMillis()) + "-" + 
UUID.randomUUID().toString().substring(0, 8);
-      LOG.info("The run id for this run is {}", runId);
-
-      // 1. initialize and plan
-      ExecutionPlan plan = getExecutionPlan(app, runId);
-      writePlanJsonFile(plan.getPlanAsJson());
-
-      plan.getJobConfigs().forEach(jobConfig -> {
-          StreamManager streamManager = null;
-          try {
-            // 2. create the necessary streams
-            streamManager = buildAndStartStreamManager(jobConfig);
-            if (plan.getApplicationConfig().getAppMode() == 
ApplicationConfig.ApplicationMode.BATCH) {
-              
streamManager.clearStreamsFromPreviousRun(getConfigFromPrevRun());
-            }
-            streamManager.createStreams(plan.getIntermediateStreams());
-
-            // 3. submit jobs for remote execution
-            LOG.info("Starting job {} with config {}", jobConfig.getName(), 
jobConfig);
-            JobRunner runner = new JobRunner(jobConfig);
-            runner.run(true);
-          } finally {
-            if (streamManager != null) {
-              streamManager.stop();
-            }
-          }
+      List<JobConfig> jobConfigs = planner.prepareJobs();
+      if (jobConfigs.isEmpty()) {
+        throw new SamzaException("No jobs to run.");
+      }
+
+      // 3. submit jobs for remote execution
+      jobConfigs.forEach(jobConfig -> {
+          LOG.info("Starting job {} with config {}", jobConfig.getName(), 
jobConfig);
+          JobRunner runner = new JobRunner(jobConfig);
+          runner.run(true);
         });
     } catch (Throwable t) {
       throw new SamzaException("Failed to run application", t);
@@ -97,12 +79,11 @@ public class RemoteApplicationRunner extends 
AbstractApplicationRunner {
   }
 
   @Override
-  public void kill(StreamApplication app) {
-
+  public void kill() {
     // since currently we only support single actual remote job, we can get 
its status without
     // building the execution plan.
     try {
-      JobConfig jc = new JobConfig(config);
+      JobConfig jc = new JobConfig(appDesc.getConfig());
       LOG.info("Killing job {}", jc.getName());
       JobRunner runner = new JobRunner(jc);
       runner.kill();
@@ -112,42 +93,25 @@ public class RemoteApplicationRunner extends 
AbstractApplicationRunner {
   }
 
   @Override
-  public ApplicationStatus status(StreamApplication app) {
-
+  public ApplicationStatus status() {
     // since currently we only support single actual remote job, we can get 
its status without
     // building the execution plan
     try {
-      JobConfig jc = new JobConfig(config);
+      JobConfig jc = new JobConfig(appDesc.getConfig());
       return getApplicationStatus(jc);
     } catch (Throwable t) {
       throw new SamzaException("Failed to get status for application", t);
     }
   }
 
-  /* package private */ ApplicationStatus getApplicationStatus(JobConfig 
jobConfig) {
-    JobRunner runner = new JobRunner(jobConfig);
-    ApplicationStatus status = runner.status();
-    LOG.debug("Status is {} for job {}", new Object[]{status, 
jobConfig.getName()});
-    return status;
-  }
-
-  /**
-   * Waits until the application finishes.
-   */
+  @Override
   public void waitForFinish() {
     waitForFinish(Duration.ofMillis(0));
   }
 
-  /**
-   * Waits for {@code timeout} duration for the application to finish.
-   * If timeout &lt; 1, blocks the caller indefinitely.
-   *
-   * @param timeout time to wait for the application to finish
-   * @return true - application finished before timeout
-   *         false - otherwise
-   */
+  @Override
   public boolean waitForFinish(Duration timeout) {
-    JobConfig jobConfig = new JobConfig(config);
+    JobConfig jobConfig = new JobConfig(appDesc.getConfig());
     boolean finished = true;
     long timeoutInMs = timeout.toMillis();
     long startTimeInMs = System.currentTimeMillis();
@@ -181,15 +145,10 @@ public class RemoteApplicationRunner extends 
AbstractApplicationRunner {
     return finished;
   }
 
-  private Config getConfigFromPrevRun() {
-    CoordinatorStreamSystemConsumer consumer = new 
CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap());
-    consumer.register();
-    consumer.start();
-    consumer.bootstrap();
-    consumer.stop();
-
-    Config cfg = consumer.getConfig();
-    LOG.info("Previous config is: " + cfg.toString());
-    return cfg;
+  /* package private */ ApplicationStatus getApplicationStatus(JobConfig 
jobConfig) {
+    JobRunner runner = new JobRunner(jobConfig);
+    ApplicationStatus status = runner.status();
+    LOG.debug("Status is {} for job {}", new Object[]{status, 
jobConfig.getName()});
+    return status;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java 
b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index dd8d6c3..2ca4e81 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -69,10 +69,9 @@ public class StreamOperatorTask implements StreamTask, 
InitableTask, WindowableT
    * Initializes this task during startup.
    * <p>
    * Implementation: Initializes the runtime {@link OperatorImplGraph} 
according to user-defined {@link OperatorSpecGraph}.
-   * The {@link org.apache.samza.operators.StreamGraphSpec} sets the input and 
output streams and the task-wide
-   * context manager using the {@link org.apache.samza.operators.StreamGraph} 
APIs,
+   * Users set the input and output streams and the task-wide context manager 
using {@link org.apache.samza.application.StreamApplicationDescriptor} APIs,
    * and the logical transforms using the {@link 
org.apache.samza.operators.MessageStream} APIs. After the
-   * {@link org.apache.samza.operators.StreamGraphSpec} is initialized once by 
the application, it then creates
+   * {@link org.apache.samza.application.StreamApplicationDescriptorImpl} is 
initialized once by the application, it then creates
    * an immutable {@link OperatorSpecGraph} accordingly, which is passed in to 
this class to create the {@link OperatorImplGraph}
    * corresponding to the logical DAG.
    *

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java 
b/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java
index 38ae854..834777b 100644
--- a/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java
@@ -18,66 +18,53 @@
  */
 package org.apache.samza.task;
 
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
-import org.apache.samza.config.ApplicationConfig;
-import org.apache.samza.config.Config;
+import org.apache.samza.application.ApplicationDescriptor;
+import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
+import org.apache.samza.application.TaskApplicationDescriptorImpl;
 import org.apache.samza.config.ConfigException;
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.TaskConfig;
-import org.apache.samza.operators.ContextManager;
-import org.apache.samza.operators.OperatorSpecGraph;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.ExecutorService;
 
-import static org.apache.samza.util.ScalaJavaUtil.toScalaFunction;
-import static org.apache.samza.util.ScalaJavaUtil.defaultValue;
-
 /**
- * This class provides utility functions to load task factory classes based on 
config, and to wrap {@link StreamTaskFactory} in {@link AsyncStreamTaskFactory}
- * when running {@link StreamTask}s in multi-thread mode
+ * This class provides utility functions to load task factory classes based on 
config, and to wrap {@link StreamTaskFactory}
+ * in {@link AsyncStreamTaskFactory} when running {@link StreamTask}s in 
multi-thread mode
  */
 public class TaskFactoryUtil {
   private static final Logger log = 
LoggerFactory.getLogger(TaskFactoryUtil.class);
 
   /**
-   * This method creates a task factory class based on the {@link 
StreamApplication}
+   * Creates a {@link TaskFactory} based on {@link ApplicationDescriptorImpl}
    *
-   * @param specGraph the {@link OperatorSpecGraph}
-   * @param contextManager the {@link ContextManager} to set up initial 
context for {@code specGraph}
-   * @return  a task factory object, either a instance of {@link 
StreamTaskFactory} or {@link AsyncStreamTaskFactory}
+   * @param appDesc {@link ApplicationDescriptorImpl} for this application
+   * @return {@link TaskFactory} object defined by {@code appDesc}
    */
-  public static Object createTaskFactory(OperatorSpecGraph specGraph, 
ContextManager contextManager) {
-    return createStreamOperatorTaskFactory(specGraph, contextManager);
+  public static TaskFactory getTaskFactory(ApplicationDescriptorImpl<? extends 
ApplicationDescriptor> appDesc) {
+    if (appDesc instanceof TaskApplicationDescriptorImpl) {
+      return ((TaskApplicationDescriptorImpl) appDesc).getTaskFactory();
+    } else if (appDesc instanceof StreamApplicationDescriptorImpl) {
+      return (StreamTaskFactory) () -> new 
StreamOperatorTask(((StreamApplicationDescriptorImpl) 
appDesc).getOperatorSpecGraph(),
+          ((StreamApplicationDescriptorImpl) appDesc).getContextManager());
+    }
+    throw new 
IllegalArgumentException(String.format("ApplicationDescriptorImpl has to be 
either TaskApplicationDescriptorImpl or "
+        + "StreamApplicationDescriptorImpl. class %s is not supported", 
appDesc.getClass().getName()));
   }
 
   /**
-   * This method creates a task factory class based on the configuration
+   * Creates a {@link TaskFactory} based on the configuration.
+   * <p>
+   * This should only be used to create {@link TaskFactory} defined in 
task.class
    *
-   * @param config  the {@link Config} for this job
-   * @return  a task factory object, either a instance of {@link 
StreamTaskFactory} or {@link AsyncStreamTaskFactory}
-   */
-  public static Object createTaskFactory(Config config) {
-    return fromTaskClassConfig(config);
-  }
-
-  private static StreamTaskFactory 
createStreamOperatorTaskFactory(OperatorSpecGraph specGraph, ContextManager 
contextManager) {
-    return () -> new StreamOperatorTask(specGraph, contextManager);
-  }
-
-  /**
-   * Create {@link StreamTaskFactory} or {@link AsyncStreamTaskFactory} based 
on the configured task.class.
-   * @param config the {@link Config}
-   * @return task factory instance
+   * @param taskClassName  the task class name for this job
+   * @return  a {@link TaskFactory} object, either a instance of {@link 
StreamTaskFactory} or {@link AsyncStreamTaskFactory}
    */
-  private static Object fromTaskClassConfig(Config config) {
-    // if there is configuration to set the job w/ a specific type of task, 
instantiate the corresponding task factory
-    String taskClassName = new 
TaskConfig(config).getTaskClass().getOrElse(toScalaFunction(
-      () -> {
-        throw new ConfigException("No task class defined in the 
configuration.");
-      }));
-
+  public static TaskFactory getTaskFactory(String taskClassName) {
+    Preconditions.checkArgument(StringUtils.isNotBlank(taskClassName), 
"task.class cannot be empty");
     log.info("Got task class name: {}", taskClassName);
 
     boolean isAsyncTaskClass;
@@ -88,28 +75,22 @@ public class TaskFactoryUtil {
     }
 
     if (isAsyncTaskClass) {
-      return new AsyncStreamTaskFactory() {
-        @Override
-        public AsyncStreamTask createInstance() {
-          try {
-            return (AsyncStreamTask) 
Class.forName(taskClassName).newInstance();
-          } catch (Throwable t) {
-            log.error("Error loading AsyncStreamTask class: {}. error: {}", 
taskClassName, t);
-            throw new SamzaException(String.format("Error loading 
AsyncStreamTask class: %s", taskClassName), t);
-          }
+      return (AsyncStreamTaskFactory) () -> {
+        try {
+          return (AsyncStreamTask) Class.forName(taskClassName).newInstance();
+        } catch (Throwable t) {
+          log.error("Error loading AsyncStreamTask class: {}. error: {}", 
taskClassName, t);
+          throw new SamzaException(String.format("Error loading 
AsyncStreamTask class: %s", taskClassName), t);
         }
       };
     }
 
-    return new StreamTaskFactory() {
-      @Override
-      public StreamTask createInstance() {
-        try {
-          return (StreamTask) Class.forName(taskClassName).newInstance();
-        } catch (Throwable t) {
-          log.error("Error loading StreamTask class: {}. error: {}", 
taskClassName, t);
-          throw new SamzaException(String.format("Error loading StreamTask 
class: %s", taskClassName), t);
-        }
+    return (StreamTaskFactory) () -> {
+      try {
+        return (StreamTask) Class.forName(taskClassName).newInstance();
+      } catch (Throwable t) {
+        log.error("Error loading StreamTask class: {}. error: {}", 
taskClassName, t);
+        throw new SamzaException(String.format("Error loading StreamTask 
class: %s", taskClassName), t);
       }
     };
   }
@@ -123,7 +104,7 @@ public class TaskFactoryUtil {
    * @param taskThreadPool  the thread pool to run the {@link 
AsyncStreamTaskAdapter} tasks
    * @return  the finalized task factory object
    */
-  public static Object finalizeTaskFactory(Object factory, boolean 
singleThreadMode, ExecutorService taskThreadPool) {
+  public static TaskFactory finalizeTaskFactory(TaskFactory factory, boolean 
singleThreadMode, ExecutorService taskThreadPool) {
 
     validateFactory(factory);
 
@@ -138,18 +119,13 @@ public class TaskFactoryUtil {
 
     if (!singleThreadMode && !isAsyncTaskClass) {
       log.info("Converting StreamTask to AsyncStreamTaskAdapter when running 
StreamTask with multiple threads");
-      return new AsyncStreamTaskFactory() {
-        @Override
-        public AsyncStreamTask createInstance() {
-          return new AsyncStreamTaskAdapter(((StreamTaskFactory) 
factory).createInstance(), taskThreadPool);
-        }
-      };
+      return (AsyncStreamTaskFactory) () -> new 
AsyncStreamTaskAdapter(((StreamTaskFactory) factory).createInstance(), 
taskThreadPool);
     }
 
     return factory;
   }
 
-  private static void validateFactory(Object factory) {
+  private static void validateFactory(TaskFactory factory) {
     if (factory == null) {
       throw new SamzaException("Either the task class name or the task factory 
instance is required.");
     }
@@ -160,33 +136,4 @@ public class TaskFactoryUtil {
     }
   }
 
-  /**
-   * Returns {@link StreamApplication} if it's configured, otherwise null.
-   * @param config Config
-   * throws {@link ConfigException} if there is misconfiguration of StreamApp.
-   * @return {@link StreamApplication} instance
-   */
-  public static StreamApplication createStreamApplication(Config config) {
-    ApplicationConfig appConfig = new ApplicationConfig(config);
-    if (appConfig.getAppClass() != null && !appConfig.getAppClass().isEmpty()) 
{
-      TaskConfig taskConfig = new TaskConfig(config);
-      String taskClassName = 
taskConfig.getTaskClass().getOrElse(defaultValue(null));
-      if (taskClassName != null && !taskClassName.isEmpty()) {
-        throw new ConfigException("High level StreamApplication API cannot be 
used together with low-level API using task.class.");
-      }
-
-      String appClassName = appConfig.getAppClass();
-      try {
-        Class<?> builderClass = Class.forName(appClassName);
-        return (StreamApplication) builderClass.newInstance();
-      } catch (Throwable t) {
-        String errorMsg = String.format("Failed to create StreamApplication 
class from the config. %s = %s",
-            ApplicationConfig.APP_CLASS, appConfig.getAppClass());
-        log.error(errorMsg, t);
-        throw new ConfigException(errorMsg, t);
-      }
-    } else {
-      return null;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 0c889d2..68de4a6 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -124,7 +124,7 @@ object SamzaContainer extends Logging {
     jobModel: JobModel,
     config: Config,
     customReporters: Map[String, MetricsReporter] = Map[String, 
MetricsReporter](),
-    taskFactory: Object) = {
+    taskFactory: TaskFactory[_]) = {
     val containerModel = jobModel.getContainers.get(containerId)
     val containerName = "samza-container-%s" format containerId
     val maxChangeLogStreamPartitions = jobModel.maxChangeLogStreamPartitions
@@ -786,6 +786,10 @@ class SamzaContainer(
     try {
       info("Starting container.")
 
+      if (containerListener != null) {
+        containerListener.beforeStart()
+      }
+
       val startTime = System.nanoTime()
       status = SamzaContainerStatus.STARTING
 
@@ -809,7 +813,7 @@ class SamzaContainer(
       info("Entering run loop.")
       status = SamzaContainerStatus.STARTED
       if (containerListener != null) {
-        containerListener.onContainerStart()
+        containerListener.afterStart()
       }
       metrics.containerStartupTime.update(System.nanoTime() - startTime)
       runLoop.run
@@ -860,11 +864,11 @@ class SamzaContainer(
     status match {
       case SamzaContainerStatus.STOPPED =>
         if (containerListener != null) {
-          containerListener.onContainerStop()
+          containerListener.afterStop()
         }
       case SamzaContainerStatus.FAILED =>
         if (containerListener != null) {
-          containerListener.onContainerFailed(exceptionSeen)
+          containerListener.afterFailure(exceptionSeen)
         }
     }
   }
@@ -876,8 +880,8 @@ class SamzaContainer(
    * <br>
    * <b>Implementation</b>: Stops the [[RunLoop]], which will eventually 
transition the container from
    * [[SamzaContainerStatus.STARTED]] to either 
[[SamzaContainerStatus.STOPPED]] or [[SamzaContainerStatus.FAILED]]].
-   * Based on the final `status`, 
[[SamzaContainerListener#onContainerStop(boolean)]] or
-   * [[SamzaContainerListener#onContainerFailed(Throwable)]] will be invoked 
respectively.
+   * Based on the final `status`, [[SamzaContainerListener#afterStop()]] or
+    * [[SamzaContainerListener#afterFailure(Throwable]] will be invoked 
respectively.
    *
    * @throws SamzaException, Thrown when the container has already been 
stopped or failed
    */

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 
b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index bf4f252..5f4338c 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -20,8 +20,6 @@
 package org.apache.samza.job
 
 
-import java.util.concurrent.TimeUnit
-
 import org.apache.samza.SamzaException
 import org.apache.samza.config._
 import org.apache.samza.config.JobConfig.Config2Job

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 0b472aa..abd7f65 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -19,6 +19,7 @@
 
 package org.apache.samza.job.local
 
+import org.apache.samza.application.{ApplicationDescriptorUtil, 
ApplicationUtil}
 import org.apache.samza.config.{Config, TaskConfigJava}
 import org.apache.samza.config.JobConfig._
 import org.apache.samza.config.ShellCommandConfig._
@@ -27,8 +28,9 @@ import org.apache.samza.coordinator.JobModelManager
 import org.apache.samza.coordinator.stream.CoordinatorStreamManager
 import org.apache.samza.job.{StreamJob, StreamJobFactory}
 import org.apache.samza.metrics.{JmxServer, MetricsRegistryMap, 
MetricsReporter}
-import org.apache.samza.operators.StreamGraphSpec
+import org.apache.samza.runtime.ProcessorContext
 import org.apache.samza.storage.ChangelogStreamManager
+import org.apache.samza.task.TaskFactory
 import org.apache.samza.task.TaskFactoryUtil
 import org.apache.samza.util.Logging
 
@@ -70,32 +72,36 @@ class ThreadJobFactory extends StreamJobFactory with 
Logging {
 
     val containerId = "0"
     val jmxServer = new JmxServer
-    val streamApp = TaskFactoryUtil.createStreamApplication(config)
-
-    val taskFactory = if (streamApp != null) {
-      val graphSpec = new StreamGraphSpec(config)
-      streamApp.init(graphSpec, config)
-      TaskFactoryUtil.createTaskFactory(graphSpec.getOperatorSpecGraph(), 
graphSpec.getContextManager)
-    } else {
-      TaskFactoryUtil.createTaskFactory(config)
-    }
+
+    val appDesc = 
ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(config), 
config)
+    val taskFactory : TaskFactory[_] = TaskFactoryUtil.getTaskFactory(appDesc)
 
     // Give developers a nice friendly warning if they've specified task.opts 
and are using a threaded job.
     config.getTaskOpts match {
-      case Some(taskOpts) => warn("%s was specified in config, but is not 
being used because job is being executed with ThreadJob. You probably want to 
run %s=%s." format (TASK_JVM_OPTS, STREAM_JOB_FACTORY_CLASS, 
classOf[ProcessJobFactory].getName))
+      case Some(taskOpts) => warn("%s was specified in config, but is not 
being used because job is being executed with ThreadJob. " +
+        "You probably want to run %s=%s." format (TASK_JVM_OPTS, 
STREAM_JOB_FACTORY_CLASS, classOf[ProcessJobFactory].getName))
       case _ => None
     }
 
-    val containerListener = new SamzaContainerListener {
-      override def onContainerFailed(t: Throwable): Unit = {
-        error("Container failed.", t)
-        throw t
-      }
-
-      override def onContainerStop(): Unit = {
-      }
-
-      override def onContainerStart(): Unit = {
+    val containerListener = {
+      val processorLifecycleListener = 
appDesc.getProcessorLifecycleListenerFactory().createInstance(new 
ProcessorContext() { }, config)
+      new SamzaContainerListener {
+        override def afterFailure(t: Throwable): Unit = {
+          processorLifecycleListener.afterFailure(t)
+          throw t
+        }
+
+        override def afterStart(): Unit = {
+          processorLifecycleListener.afterStart()
+        }
+
+        override def afterStop(): Unit = {
+          processorLifecycleListener.afterStop()
+        }
+
+        override def beforeStart(): Unit = {
+          processorLifecycleListener.beforeStart()
+        }
 
       }
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/application/MockStreamApplication.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/application/MockStreamApplication.java
 
b/samza-core/src/test/java/org/apache/samza/application/MockStreamApplication.java
new file mode 100644
index 0000000..ccd88b8
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/application/MockStreamApplication.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application;
+
+/**
+ * Test class of {@link StreamApplication} for unit tests
+ */
+public class MockStreamApplication implements StreamApplication {
+  @Override
+  public void describe(StreamApplicationDescriptor appSpec) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
 
b/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
new file mode 100644
index 0000000..9b590c4
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/application/TestApplicationUtil.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.task.MockStreamTask;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Unit tests for {@link ApplicationUtil}
+ */
+public class TestApplicationUtil {
+
+  @Test
+  public void testStreamAppClass() {
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put(ApplicationConfig.APP_CLASS, 
MockStreamApplication.class.getName());
+    SamzaApplication app = ApplicationUtil.fromConfig(new 
MapConfig(configMap));
+    assertTrue(app instanceof MockStreamApplication);
+
+    configMap.put(TaskConfig.TASK_CLASS(), MockStreamTask.class.getName());
+    app = ApplicationUtil.fromConfig(new MapConfig(configMap));
+    assertTrue(app instanceof MockStreamApplication);
+  }
+
+  @Test
+  public void testTaskAppClass() {
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put(ApplicationConfig.APP_CLASS, 
MockTaskApplication.class.getName());
+    SamzaApplication app = ApplicationUtil.fromConfig(new 
MapConfig(configMap));
+    assertTrue(app instanceof MockTaskApplication);
+
+    configMap.put(TaskConfig.TASK_CLASS(), MockStreamTask.class.getName());
+    app = ApplicationUtil.fromConfig(new MapConfig(configMap));
+    assertTrue(app instanceof MockTaskApplication);
+  }
+
+  @Test
+  public void testTaskClassOnly() {
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put(TaskConfig.TASK_CLASS(), MockStreamTask.class.getName());
+    Config config = new MapConfig(configMap);
+    SamzaApplication app = ApplicationUtil.fromConfig(config);
+    assertTrue(app instanceof TaskApplication);
+    TaskApplicationDescriptorImpl appSpec = new 
TaskApplicationDescriptorImpl((TaskApplication) app, config);
+    assertTrue(appSpec.getTaskFactory().createInstance() instanceof 
MockStreamTask);
+  }
+
+  @Test(expected = ConfigException.class)
+  public void testEmptyTaskClassOnly() {
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put(TaskConfig.TASK_CLASS(), "");
+    ApplicationUtil.fromConfig(new MapConfig(configMap));
+  }
+
+  @Test(expected = ConfigException.class)
+  public void testNoAppClassNoTaskClass() {
+    Map<String, String> configMap = new HashMap<>();
+    ApplicationUtil.fromConfig(new MapConfig(configMap));
+  }
+
+  /**
+   * Test class of {@link TaskApplication} for unit tests
+   */
+  public static class MockTaskApplication implements TaskApplication {
+    @Override
+    public void describe(TaskApplicationDescriptor appSpec) {
+
+    }
+  }
+}
\ No newline at end of file

Reply via email to